function enhanced_sql_workflow
Flask route handler that initiates an enhanced SQL workflow with iterative optimization, executing data extraction and analysis in a background thread while providing real-time progress tracking.
/tf/active/vicechatdev/full_smartstat/app.py
624 - 695
complex
Purpose
This endpoint serves as the entry point for executing complex SQL-based data workflows. It accepts user requests for data analysis, validates parameters, initializes progress tracking, and spawns a background thread to execute the EnhancedSQLWorkflow. The function handles various configuration options including schema files, database connections, row limits, quality thresholds, and table/column selections. It's designed for asynchronous execution with progress monitoring capabilities.
Source Code
def enhanced_sql_workflow(session_id):
"""Execute enhanced SQL workflow with iterative optimization"""
try:
data = request.get_json()
# Import the enhanced workflow
from enhanced_sql_workflow import EnhancedSQLWorkflow
# Extract parameters
combined_request = data.get('combined_request', '')
schema_file = data.get('schema_file', 'database_schema_20251003_120434.json')
connection_config = data.get('connection_config', 'sql_config.py')
max_rows_raw = data.get('max_rows', 10000)
# Handle unlimited option
if max_rows_raw == 'unlimited':
max_rows = None # None means no limit
logger.info("Using unlimited rows for data extraction")
else:
max_rows = int(max_rows_raw) if max_rows_raw else 10000
if not combined_request:
return jsonify({
'success': False,
'error': 'Combined request is required'
}), 400
# Initialize progress tracking
app.enhanced_workflows = getattr(app, 'enhanced_workflows', {})
app.enhanced_workflows[session_id] = {
'status': 'in_progress',
'current_step': 'parsing',
'progress': 10,
'message': 'Parsing data and analysis requirements...',
'iterations': [],
'start_time': datetime.now().isoformat()
}
# Execute workflow in background using threading
import threading
parameters = data.copy() # Include all form data as parameters
parameters['max_rows'] = max_rows
# Log the received parameters for debugging
logger.info(f"Enhanced workflow parameters: {list(parameters.keys())}")
if parameters.get('quality_threshold'):
logger.info(f"Quality threshold: {parameters['quality_threshold']}%")
if parameters.get('target_tables'):
logger.info(f"Target tables: {parameters['target_tables']}")
if parameters.get('specific_columns'):
logger.info(f"Specific columns: {len(parameters['specific_columns'])} selected")
workflow_thread = threading.Thread(
target=execute_enhanced_workflow_background,
args=(session_id, combined_request, schema_file, connection_config, parameters)
)
workflow_thread.daemon = True
workflow_thread.start()
# Return immediately to allow progress monitoring
return jsonify({
'success': True,
'workflow_id': session_id,
'message': 'Enhanced workflow started'
})
except Exception as e:
logger.error(f"Error in enhanced SQL workflow: {str(e)}")
return jsonify({
'success': False,
'error': str(e)
}), 500
Parameters
| Name | Type | Default | Kind |
|---|---|---|---|
session_id |
- | - | positional_or_keyword |
Parameter Details
session_id: Unique identifier for the workflow session, passed as a URL path parameter. Used to track progress and associate results with a specific user session. Expected to be a string (typically UUID format).
Return Value
Returns a Flask JSON response tuple. On success: {'success': True, 'workflow_id': session_id, 'message': 'Enhanced workflow started'} with HTTP 200. On validation error: {'success': False, 'error': error_message} with HTTP 400. On exception: {'success': False, 'error': error_string} with HTTP 500. The workflow_id can be used to poll for progress updates.
Dependencies
flaskloggingdatetimethreadingenhanced_sql_workflow
Required Imports
from flask import Flask, request, jsonify
import logging
from datetime import datetime
import threading
Conditional/Optional Imports
These imports are only needed under specific conditions:
from enhanced_sql_workflow import EnhancedSQLWorkflow
Condition: imported inside the function when the endpoint is called, required for workflow execution
Required (conditional)Usage Example
# Flask application setup
from flask import Flask, request, jsonify
import logging
app = Flask(__name__)
logger = logging.getLogger(__name__)
# Define the background execution function
def execute_enhanced_workflow_background(session_id, combined_request, schema_file, connection_config, parameters):
# Implementation of background workflow execution
pass
# Register the route
@app.route('/enhanced_sql_workflow/<session_id>', methods=['POST'])
def enhanced_sql_workflow(session_id):
# Function implementation here
pass
# Client usage example
import requests
session_id = 'unique-session-123'
url = f'http://localhost:5000/enhanced_sql_workflow/{session_id}'
payload = {
'combined_request': 'Analyze sales data for Q4 2023',
'schema_file': 'database_schema_20251003_120434.json',
'connection_config': 'sql_config.py',
'max_rows': 10000,
'quality_threshold': 85,
'target_tables': ['sales', 'customers'],
'specific_columns': ['date', 'amount', 'customer_id']
}
response = requests.post(url, json=payload)
result = response.json()
if result['success']:
workflow_id = result['workflow_id']
print(f"Workflow started: {workflow_id}")
# Poll for progress using workflow_id
else:
print(f"Error: {result['error']}")
Best Practices
- Always provide a unique session_id for each workflow execution to avoid conflicts
- The 'combined_request' parameter is required and should contain a clear description of the data analysis requirements
- Use 'unlimited' string value for max_rows to remove row limits, otherwise provide an integer
- Ensure the execute_enhanced_workflow_background function is properly implemented to handle the actual workflow execution
- Monitor workflow progress by polling the app.enhanced_workflows[session_id] dictionary
- The function uses daemon threads which will terminate when the main program exits - ensure proper cleanup
- Handle the asynchronous nature by implementing a separate progress monitoring endpoint
- Validate that schema_file and connection_config files exist before starting the workflow
- Consider implementing timeout mechanisms for long-running workflows
- Log all parameters for debugging purposes, especially quality_threshold, target_tables, and specific_columns
- The function stores workflow state in app.enhanced_workflows - ensure this is thread-safe in production
- Error handling returns appropriate HTTP status codes (400 for validation, 500 for server errors)
Tags
Similar Components
AI-powered semantic similarity - components with related functionality:
-
function enhanced_workflow_progress 78.1% similar
-
function execute_enhanced_workflow_background 77.6% similar
-
function load_sql_workflow_data 77.3% similar
-
function analyze_data 68.9% similar
-
class EnhancedSQLWorkflow 68.8% similar