🔍 Code Extractor

function get_database_tables_columns

Maturity: 49

Flask route handler that retrieves database schema information including tables, columns, and relationships, filtered and sorted by relevance for data analysis workflows.

File:
/tf/active/vicechatdev/full_smartstat/app.py
Lines:
795 - 890
Complexity:
moderate

Purpose

This endpoint provides curated database metadata for workflow modals in a web application. It discovers or retrieves cached database schema, filters out system/small tables, enriches column information with type metadata, and returns the most relevant tables (by row count) along with their relationships. Designed to help users select appropriate tables and understand data structure for analytical workflows.

Source Code

def get_database_tables_columns():
    """Get tables and their columns for the workflow modal"""
    try:
        # Get relevant tables for workflow (prioritize high-volume data tables)
        schema = None
        
        # Try to get schema from app.discovered_schema first
        if hasattr(app, 'discovered_schema') and app.discovered_schema:
            schema = app.discovered_schema
        else:
            # Fallback: try to load from cache if in-memory schema is not available
            logger.info("In-memory schema not available, attempting to load from cache...")
            try:
                if hasattr(app, 'schema_discovery') and app.schema_discovery:
                    schema = app.schema_discovery.discover_schema()
                    app.discovered_schema = schema  # Cache it in memory
                    logger.info("Schema loaded from cache successfully")
            except Exception as cache_error:
                logger.error(f"Could not load schema from cache: {cache_error}")
        
        if schema:
            
            # Filter and sort tables by relevance for data analysis
            relevant_tables = []
            for table in schema.tables:
                # Skip system tables and small lookup tables
                if (table.row_count > 100 and 
                    not any(skip_word in table.name.lower() for skip_word in 
                           ['log', 'audit', 'temp', 'sys', 'meta', 'config'])):
                    
                    # Get column info with data types
                    columns = []
                    for col in table.columns:
                        columns.append({
                            'name': col['name'],
                            'type': col.get('type', 'unknown'),
                            'is_primary': col['name'] in (table.primary_keys or []),
                            'is_numeric': col.get('type', '').lower() in ['int', 'integer', 'decimal', 'float', 'numeric', 'real', 'double'],
                            'is_date': 'date' in col.get('type', '').lower() or 'time' in col.get('type', '').lower()
                        })
                    
                    relevant_tables.append({
                        'name': table.name,
                        'row_count': table.row_count,
                        'description': table.description or f"Table with {len(table.columns)} columns",
                        'columns': columns
                    })
            
            # Sort by row count descending (highest data volume first)
            relevant_tables.sort(key=lambda x: x['row_count'], reverse=True)
            
            # Get discovered relationships
            relationships = []
            try:
                schema_dict = schema.to_dict()
                all_relationships = schema_dict.get('relationships', [])
                
                # Filter relationships to only include those between relevant tables
                table_names = {table['name'] for table in relevant_tables}
                for rel in all_relationships:
                    from_table = rel.get('from_table')
                    to_table = rel.get('to_table')
                    
                    if from_table in table_names and to_table in table_names:
                        relationships.append({
                            'from_table': from_table,
                            'from_column': rel.get('from_column'),
                            'to_table': to_table,
                            'to_column': rel.get('to_column'),
                            'type': rel.get('type', 'unknown'),
                            'confidence': rel.get('confidence', 0)
                        })
                
                # Sort by confidence descending
                relationships.sort(key=lambda x: x['confidence'], reverse=True)
                
            except Exception as e:
                logger.warning(f"Could not get relationships for UI: {e}")
            
            return jsonify({
                'success': True,
                'tables': relevant_tables[:15],  # Limit to top 15 tables
                'relationships': relationships[:20]  # Limit to top 20 relationships
            })
        else:
            return jsonify({
                'success': False,
                'error': 'No database schema available'
            }), 400
            
    except Exception as e:
        logger.error(f"Error getting database tables/columns: {str(e)}")
        return jsonify({
            'success': False,
            'error': str(e)
        }), 500

Return Value

Returns a Flask JSON response object. On success (200): {'success': True, 'tables': [list of up to 15 table objects with name, row_count, description, and columns array], 'relationships': [list of up to 20 relationship objects with from_table, from_column, to_table, to_column, type, and confidence]}. On failure (400/500): {'success': False, 'error': 'error message string'}. Each column object includes name, type, is_primary, is_numeric, and is_date boolean flags.

Dependencies

  • flask
  • logging

Required Imports

from flask import jsonify
import logging

Usage Example

# Flask application setup
from flask import Flask, jsonify
import logging

app = Flask(__name__)
logger = logging.getLogger(__name__)

# Mock schema for demonstration
class MockColumn:
    def __init__(self, name, col_type):
        self.data = {'name': name, 'type': col_type}
    def __getitem__(self, key):
        return self.data.get(key)
    def get(self, key, default=None):
        return self.data.get(key, default)

class MockTable:
    def __init__(self, name, row_count):
        self.name = name
        self.row_count = row_count
        self.description = f"Table {name}"
        self.columns = [MockColumn('id', 'integer'), MockColumn('name', 'varchar')]
        self.primary_keys = ['id']

class MockSchema:
    def __init__(self):
        self.tables = [MockTable('users', 1000), MockTable('orders', 5000)]
    def to_dict(self):
        return {'relationships': []}

app.discovered_schema = MockSchema()

@app.route('/database_tables_columns', methods=['GET'])
def get_database_tables_columns():
    # Function implementation here
    pass

# Client usage:
# GET request to http://your-app/database_tables_columns
# Returns JSON with tables and relationships

Best Practices

  • Ensure app.discovered_schema is populated before calling this endpoint for optimal performance
  • Implement proper schema caching strategy using app.schema_discovery to avoid repeated discovery operations
  • The function filters tables with row_count > 100 and excludes system tables - adjust these thresholds based on your database size
  • Returns limited results (15 tables, 20 relationships) to prevent overwhelming the UI - increase limits if needed
  • Handle the case where schema is unavailable gracefully in the frontend
  • The function assumes schema objects have specific attributes (tables, columns, primary_keys, relationships) - ensure your schema discovery service provides these
  • Column type detection for is_numeric and is_date uses simple string matching - may need enhancement for custom database types
  • Consider implementing rate limiting on this endpoint if schema discovery is expensive
  • Log messages use a 'logger' variable - ensure proper logging configuration in your application

Similar Components

AI-powered semantic similarity - components with related functionality:

  • function get_database_schema 80.2% similar

    Flask route handler that retrieves and returns comprehensive database schema information, including tables, columns, relationships, and statistics.

    From: /tf/active/vicechatdev/full_smartstat/app.py
  • function get_database_schema_viewer 75.6% similar

    Flask route handler that retrieves and formats detailed database schema information from a discovered schema stored in the Flask app object, returning it as JSON for visualization purposes.

    From: /tf/active/vicechatdev/full_smartstat/app.py
  • function database_schema 63.3% similar

    Flask route handler that renders the database schema viewer page template.

    From: /tf/active/vicechatdev/full_smartstat/app.py
  • function refresh_database_schema 58.3% similar

    Flask route endpoint that forces a refresh of the database schema by invoking the schema discovery service with LLM-generated descriptions.

    From: /tf/active/vicechatdev/full_smartstat/app.py
  • function enhanced_sql_workflow 56.4% similar

    Flask route handler that initiates an enhanced SQL workflow with iterative optimization, executing data extraction and analysis in a background thread while providing real-time progress tracking.

    From: /tf/active/vicechatdev/full_smartstat/app.py
← Back to Browse