🔍 Code Extractor

function enhanced_sql_workflow

Maturity: 55

Flask route handler that initiates an enhanced SQL workflow with iterative optimization, executing data extraction and analysis in a background thread while providing real-time progress tracking.

File:
/tf/active/vicechatdev/full_smartstat/app.py
Lines:
624 - 695
Complexity:
complex

Purpose

This endpoint serves as the entry point for executing complex SQL-based data workflows. It accepts user requests for data analysis, validates parameters, initializes progress tracking, and spawns a background thread to execute the EnhancedSQLWorkflow. The function handles various configuration options including schema files, database connections, row limits, quality thresholds, and table/column selections. It's designed for asynchronous execution with progress monitoring capabilities.

Source Code

def enhanced_sql_workflow(session_id):
    """Execute enhanced SQL workflow with iterative optimization"""
    try:
        data = request.get_json()
        
        # Import the enhanced workflow
        from enhanced_sql_workflow import EnhancedSQLWorkflow
        
        # Extract parameters
        combined_request = data.get('combined_request', '')
        schema_file = data.get('schema_file', 'database_schema_20251003_120434.json')
        connection_config = data.get('connection_config', 'sql_config.py')
        max_rows_raw = data.get('max_rows', 10000)
        
        # Handle unlimited option
        if max_rows_raw == 'unlimited':
            max_rows = None  # None means no limit
            logger.info("Using unlimited rows for data extraction")
        else:
            max_rows = int(max_rows_raw) if max_rows_raw else 10000
        
        if not combined_request:
            return jsonify({
                'success': False,
                'error': 'Combined request is required'
            }), 400
        
        # Initialize progress tracking
        app.enhanced_workflows = getattr(app, 'enhanced_workflows', {})
        app.enhanced_workflows[session_id] = {
            'status': 'in_progress',
            'current_step': 'parsing',
            'progress': 10,
            'message': 'Parsing data and analysis requirements...',
            'iterations': [],
            'start_time': datetime.now().isoformat()
        }
        
        # Execute workflow in background using threading
        import threading
        parameters = data.copy()  # Include all form data as parameters
        parameters['max_rows'] = max_rows
        
        # Log the received parameters for debugging
        logger.info(f"Enhanced workflow parameters: {list(parameters.keys())}")
        if parameters.get('quality_threshold'):
            logger.info(f"Quality threshold: {parameters['quality_threshold']}%")
        if parameters.get('target_tables'):
            logger.info(f"Target tables: {parameters['target_tables']}")
        if parameters.get('specific_columns'):
            logger.info(f"Specific columns: {len(parameters['specific_columns'])} selected")
        
        workflow_thread = threading.Thread(
            target=execute_enhanced_workflow_background,
            args=(session_id, combined_request, schema_file, connection_config, parameters)
        )
        workflow_thread.daemon = True
        workflow_thread.start()
        
        # Return immediately to allow progress monitoring
        return jsonify({
            'success': True,
            'workflow_id': session_id,
            'message': 'Enhanced workflow started'
        })
            
    except Exception as e:
        logger.error(f"Error in enhanced SQL workflow: {str(e)}")
        return jsonify({
            'success': False,
            'error': str(e)
        }), 500

Parameters

Name Type Default Kind
session_id - - positional_or_keyword

Parameter Details

session_id: Unique identifier for the workflow session, passed as a URL path parameter. Used to track progress and associate results with a specific user session. Expected to be a string (typically UUID format).

Return Value

Returns a Flask JSON response tuple. On success: {'success': True, 'workflow_id': session_id, 'message': 'Enhanced workflow started'} with HTTP 200. On validation error: {'success': False, 'error': error_message} with HTTP 400. On exception: {'success': False, 'error': error_string} with HTTP 500. The workflow_id can be used to poll for progress updates.

Dependencies

  • flask
  • logging
  • datetime
  • threading
  • enhanced_sql_workflow

Required Imports

from flask import Flask, request, jsonify
import logging
from datetime import datetime
import threading

Conditional/Optional Imports

These imports are only needed under specific conditions:

from enhanced_sql_workflow import EnhancedSQLWorkflow

Condition: imported inside the function when the endpoint is called, required for workflow execution

Required (conditional)

Usage Example

# Flask application setup
from flask import Flask, request, jsonify
import logging

app = Flask(__name__)
logger = logging.getLogger(__name__)

# Define the background execution function
def execute_enhanced_workflow_background(session_id, combined_request, schema_file, connection_config, parameters):
    # Implementation of background workflow execution
    pass

# Register the route
@app.route('/enhanced_sql_workflow/<session_id>', methods=['POST'])
def enhanced_sql_workflow(session_id):
    # Function implementation here
    pass

# Client usage example
import requests

session_id = 'unique-session-123'
url = f'http://localhost:5000/enhanced_sql_workflow/{session_id}'

payload = {
    'combined_request': 'Analyze sales data for Q4 2023',
    'schema_file': 'database_schema_20251003_120434.json',
    'connection_config': 'sql_config.py',
    'max_rows': 10000,
    'quality_threshold': 85,
    'target_tables': ['sales', 'customers'],
    'specific_columns': ['date', 'amount', 'customer_id']
}

response = requests.post(url, json=payload)
result = response.json()

if result['success']:
    workflow_id = result['workflow_id']
    print(f"Workflow started: {workflow_id}")
    # Poll for progress using workflow_id
else:
    print(f"Error: {result['error']}")

Best Practices

  • Always provide a unique session_id for each workflow execution to avoid conflicts
  • The 'combined_request' parameter is required and should contain a clear description of the data analysis requirements
  • Use 'unlimited' string value for max_rows to remove row limits, otherwise provide an integer
  • Ensure the execute_enhanced_workflow_background function is properly implemented to handle the actual workflow execution
  • Monitor workflow progress by polling the app.enhanced_workflows[session_id] dictionary
  • The function uses daemon threads which will terminate when the main program exits - ensure proper cleanup
  • Handle the asynchronous nature by implementing a separate progress monitoring endpoint
  • Validate that schema_file and connection_config files exist before starting the workflow
  • Consider implementing timeout mechanisms for long-running workflows
  • Log all parameters for debugging purposes, especially quality_threshold, target_tables, and specific_columns
  • The function stores workflow state in app.enhanced_workflows - ensure this is thread-safe in production
  • Error handling returns appropriate HTTP status codes (400 for validation, 500 for server errors)

Similar Components

AI-powered semantic similarity - components with related functionality:

  • function enhanced_workflow_progress 78.1% similar

    Flask route handler that retrieves and returns the current progress status of an enhanced SQL workflow, including step completion, progress percentage, and final results if completed.

    From: /tf/active/vicechatdev/full_smartstat/app.py
  • function execute_enhanced_workflow_background 77.6% similar

    Executes an enhanced SQL workflow in a background thread, retrieving data from a database, processing it through an AI-powered workflow, and automatically triggering statistical analysis on the results.

    From: /tf/active/vicechatdev/full_smartstat/app.py
  • function load_sql_workflow_data 77.3% similar

    Flask route handler that processes user natural language queries by leveraging an LLM to generate and execute SQL queries against a database, returning the results for a specific session.

    From: /tf/active/vicechatdev/full_smartstat/app.py
  • function analyze_data 68.9% similar

    Flask route handler that initiates an asynchronous data analysis process based on user query, creating a background thread to perform the analysis and returning an analysis ID for progress tracking.

    From: /tf/active/vicechatdev/full_smartstat/app.py
  • class EnhancedSQLWorkflow 68.8% similar

    Enhanced SQL workflow with iterative optimization

    From: /tf/active/vicechatdev/full_smartstat/enhanced_sql_workflow.py
← Back to Browse