function execute_enhanced_workflow_background
Executes an enhanced SQL workflow in a background thread, retrieving data from a database, processing it through an AI-powered workflow, and automatically triggering statistical analysis on the results.
/tf/active/vicechatdev/full_smartstat/app.py
342 - 515
complex
Purpose
This function orchestrates a complex data retrieval and analysis pipeline in the background. It creates an EnhancedSQLWorkflow instance, executes it with user-specified parameters, stores the resulting dataframe and metadata, updates session tracking, and automatically continues to statistical analysis if data is successfully retrieved. It handles progress tracking, error management, and session state updates throughout the workflow execution.
Source Code
def execute_enhanced_workflow_background(session_id, combined_request, schema_file, connection_config, parameters):
"""Execute enhanced workflow in background thread"""
import pandas # Import pandas locally in thread context
from enhanced_sql_workflow import EnhancedSQLWorkflow # Import locally in thread context
from config import Config
from statistical_agent import StatisticalAgent
try:
logger.info(f"Starting enhanced workflow for session {session_id}")
# Extract model parameter from UI selection
ai_model = parameters.get('ai_model', 'gpt-4o')
logger.info(f"Using AI model: {ai_model}")
# Create config and agent instances
config = Config()
statistical_agent = StatisticalAgent(config)
# Create enhanced SQL workflow instance with dynamic schema
discovered_schema = getattr(app, 'discovered_schema', None)
workflow = EnhancedSQLWorkflow(
config=config,
statistical_agent=statistical_agent,
discovered_schema=discovered_schema
)
# Execute the workflow with all parameters
result = workflow.execute_integrated_workflow(
combined_request=combined_request,
session_id=session_id,
schema_file=schema_file,
connection_config=connection_config,
max_rows=parameters.get('max_rows', 10000),
model=ai_model,
quality_threshold=parameters.get('quality_threshold', 70),
include_incomplete=parameters.get('include_incomplete', True),
optimize_for_analysis=parameters.get('optimize_for_analysis', True),
include_dataset_context=parameters.get('include_dataset_context', False),
target_tables=parameters.get('target_tables'),
specific_columns=parameters.get('specific_columns'),
specific_relationships=parameters.get('specific_relationships')
) # Check if we got a dataframe
if result and 'dataframe' in result:
dataframe = result.get('dataframe')
# If dataframe is None or empty, create empty dataframe
if dataframe is None or (hasattr(dataframe, 'empty') and dataframe.empty):
logger.warning(f"Empty or None dataframe for session {session_id}")
dataframe = pandas.DataFrame() # Create empty dataframe if None using full import
# Store result in workflow tracking and update session data
workflows = getattr(app, 'enhanced_workflows', {})
if session_id in workflows:
workflows[session_id].update({
'status': 'analyzing',
'progress': 85,
'message': 'Data retrieved, starting statistical analysis...',
'result': result
})
# Store dataframe and prepare for analysis
if not dataframe.empty:
# Store the enhanced result for later use
try:
# Create a data source that includes the enhanced metadata
enhanced_data_source = {
'type': 'enhanced_sql',
'dataframe': dataframe,
'metadata': result.get('metadata', {}),
'analysis_config': result.get('analysis_config', {}),
'sql_query': result.get('final_query', ''),
'iterations': result.get('iterations', [])
}
# Store the data using analysis service
# Update session with enhanced data source
session = analysis_service.database_manager.get_session(session_id)
if session:
session.data_source = DataSource(
source_type=DataSourceType.SQL_QUERY,
sql_query=enhanced_data_source.get('sql_query', ''),
generated_sql=enhanced_data_source.get('sql_query', ''),
query_explanation=f"Enhanced data: {len(dataframe)} rows, {len(dataframe.columns)} columns"
)
session.sql_query = enhanced_data_source.get('sql_query', '')
session.updated_at = datetime.now()
analysis_service.database_manager.update_session(session)
# Also save the DataFrame for analysis service to use
data_path = analysis_service.config.SESSIONS_FOLDER / f"{session_id}_data.pkl"
analysis_service.config.SESSIONS_FOLDER.mkdir(parents=True, exist_ok=True)
dataframe.to_pickle(data_path)
logger.info(f"Saved DataFrame to {data_path} for analysis service")
# Create proper data summary result for the UI data tab
metadata = enhanced_data_source.get('metadata', {})
data_summary_result = analysis_service.load_data_for_session_direct(
session_id, dataframe, metadata
)
logger.info(f"Created data summary result for session {session_id}: {data_summary_result.get('success', False)}")
logger.info(f"Enhanced data stored for session {session_id}: {len(dataframe)} rows, {len(dataframe.columns)} columns")
# If we have good data, automatically continue to analysis
if len(dataframe) > 0:
logger.info(f"Automatically continuing to analysis for session {session_id}")
# Update progress to show analysis starting
if session_id in workflows:
workflows[session_id].update({
'status': 'analyzing',
'progress': 90,
'message': f'Data retrieved ({len(dataframe)} rows) - Starting analysis...',
})
# Trigger analysis in a separate thread
# Include AI model parameter from enhanced workflow
parameters['ai_model'] = parameters.get('ai_model', 'gpt-4o')
analysis_thread = threading.Thread(
target=_continue_to_analysis,
args=(session_id, analysis_service, parameters)
)
analysis_thread.start()
else:
# No data - mark as completed
if session_id in workflows:
workflows[session_id].update({
'status': 'completed',
'progress': 100,
'message': f'Data retrieval completed but no data found',
})
except Exception as storage_error:
logger.error(f"Failed to store enhanced data for session {session_id}: {str(storage_error)}")
if session_id in workflows:
workflows[session_id].update({
'status': 'completed',
'progress': 100,
'message': 'Data retrieved but storage failed - data available in memory',
})
else:
logger.warning(f"Empty dataframe for session {session_id}")
if session_id in workflows:
workflows[session_id].update({
'status': 'completed',
'progress': 100,
'message': 'Workflow completed but no data was retrieved',
})
logger.info(f"Enhanced workflow completed for session {session_id}")
else:
# No dataframe returned - mark as failed
workflows = getattr(app, 'enhanced_workflows', {})
if session_id in workflows:
workflows[session_id].update({
'status': 'failed',
'progress': 0,
'message': 'No data returned from workflow',
'error': 'Workflow did not return any data'
})
logger.error(f"Enhanced workflow failed for session {session_id}: No data returned")
except Exception as e:
logger.error(f"Enhanced workflow background error for session {session_id}: {str(e)}")
# Update workflow status to failed
workflows = getattr(app, 'enhanced_workflows', {})
if session_id in workflows:
workflows[session_id].update({
'status': 'failed',
'progress': 0,
'message': f'Workflow failed: {str(e)}',
'error': str(e)
})
Parameters
| Name | Type | Default | Kind |
|---|---|---|---|
session_id |
- | - | positional_or_keyword |
combined_request |
- | - | positional_or_keyword |
schema_file |
- | - | positional_or_keyword |
connection_config |
- | - | positional_or_keyword |
parameters |
- | - | positional_or_keyword |
Parameter Details
session_id: Unique identifier for the user session. Used to track workflow progress, store results, and associate data with the correct session throughout the application.
combined_request: Natural language or structured request describing the data to retrieve and analyze. This is passed to the EnhancedSQLWorkflow to generate appropriate SQL queries.
schema_file: Path to the database schema file or schema definition. Used by the workflow to understand database structure and generate valid SQL queries.
connection_config: Dictionary containing database connection parameters (host, port, database name, credentials, etc.). Used to establish connection to the data source.
parameters: Dictionary of workflow configuration parameters including: 'ai_model' (default 'gpt-4o'), 'max_rows' (default 10000), 'quality_threshold' (default 70), 'include_incomplete' (default True), 'optimize_for_analysis' (default True), 'include_dataset_context' (default False), 'target_tables', 'specific_columns', 'specific_relationships'.
Return Value
This function does not return a value (returns None implicitly). It operates by side effects, updating the global 'app.enhanced_workflows' dictionary with workflow status, progress, results, and errors. Results are stored in session data and the analysis_service for later retrieval.
Dependencies
pandasflaskloggingthreadingdatetimepathlibenhanced_sql_workflowconfigstatistical_agentmodelsservices
Required Imports
import logging
import threading
from datetime import datetime
from pathlib import Path
Conditional/Optional Imports
These imports are only needed under specific conditions:
import pandas
Condition: Imported locally within the thread context to ensure thread-safe pandas operations
Required (conditional)from enhanced_sql_workflow import EnhancedSQLWorkflow
Condition: Imported locally within the thread context to avoid circular imports and ensure proper module loading
Required (conditional)from config import Config
Condition: Imported locally within the thread context for configuration management
Required (conditional)from statistical_agent import StatisticalAgent
Condition: Imported locally within the thread context for AI-powered statistical analysis
Required (conditional)Usage Example
import threading
from pathlib import Path
# Assume app, analysis_service, and logger are already configured globally
session_id = 'unique-session-123'
combined_request = 'Get sales data for the last quarter with customer demographics'
schema_file = Path('/path/to/schema.json')
connection_config = {
'host': 'localhost',
'port': 5432,
'database': 'sales_db',
'user': 'analyst',
'password': 'secure_password'
}
parameters = {
'ai_model': 'gpt-4o',
'max_rows': 5000,
'quality_threshold': 75,
'include_incomplete': True,
'optimize_for_analysis': True,
'include_dataset_context': True,
'target_tables': ['sales', 'customers'],
'specific_columns': None,
'specific_relationships': None
}
# Initialize workflow tracking
if not hasattr(app, 'enhanced_workflows'):
app.enhanced_workflows = {}
app.enhanced_workflows[session_id] = {
'status': 'running',
'progress': 0,
'message': 'Starting workflow...'
}
# Execute in background thread
workflow_thread = threading.Thread(
target=execute_enhanced_workflow_background,
args=(session_id, combined_request, schema_file, connection_config, parameters)
)
workflow_thread.start()
# Check status later
status = app.enhanced_workflows.get(session_id, {})
print(f"Status: {status.get('status')}, Progress: {status.get('progress')}%")
Best Practices
- Always initialize app.enhanced_workflows dictionary before calling this function to track workflow status
- Ensure the global 'app' and 'analysis_service' objects are properly configured before execution
- Run this function in a separate thread to avoid blocking the main application thread
- Monitor the workflow status through app.enhanced_workflows[session_id] for progress updates
- Handle the case where the workflow may fail by checking the 'status' field for 'failed' state
- Ensure database connection credentials in connection_config are valid and have appropriate permissions
- Configure appropriate max_rows parameter to avoid memory issues with large datasets
- The function automatically triggers analysis on successful data retrieval, so ensure _continue_to_analysis is defined
- Session data is persisted to disk as pickle files, ensure SESSIONS_FOLDER has write permissions
- Use appropriate AI model selection based on complexity and cost requirements (gpt-4o vs other models)
- The function uses lazy imports within the thread context to avoid circular dependencies and ensure thread safety
- Empty or None dataframes are handled gracefully, but check the workflow status for completion messages
- Error messages are stored in the workflow tracking dictionary for debugging and user feedback
Tags
Similar Components
AI-powered semantic similarity - components with related functionality:
-
function enhanced_sql_workflow 77.6% similar
-
function test_enhanced_workflow 69.2% similar
-
class EnhancedSQLWorkflow 67.5% similar
-
function run_analysis_async 64.1% similar
-
function demonstrate_sql_workflow_v1 62.5% similar