function run_analysis_async
Executes a data analysis workflow asynchronously with real-time progress tracking, including query interpretation, script generation, execution, and result finalization.
/tf/active/vicechatdev/full_smartstat/app.py
1148 - 1259
complex
Purpose
This function orchestrates an end-to-end asynchronous analysis pipeline for data science workflows. It processes natural language queries, generates analysis configurations, executes statistical analyses, and provides progress updates throughout the process. It handles error recovery, cleanup operations, and ensures all results are JSON-serializable for API responses. The function is designed to run in a background thread while updating a shared progress dictionary that can be polled by clients.
Source Code
def run_analysis_async(analysis_id: str, session_id: str, user_query: str, model: str = 'gpt-4o', include_previous_context: bool = True, selected_analyses: List[str] = None):
"""Run analysis asynchronously with progress updates"""
try:
# Update progress: Query interpretation
analysis_progress[analysis_id].update({
'status': 'interpreting',
'progress': 10,
'message': 'Understanding your query...'
})
# Process user query to understand intent
interpretation_result = analysis_service.process_user_query(session_id, user_query)
if not interpretation_result['success']:
analysis_progress[analysis_id].update({
'status': 'failed',
'progress': 100,
'message': 'Failed to interpret query',
'error': interpretation_result.get('error', 'Unknown error')
})
return
# Update progress: Script generation
analysis_progress[analysis_id].update({
'status': 'generating',
'progress': 30,
'message': 'Generating analysis script...'
})
suggested_config = interpretation_result['suggested_config']
# Update progress: Script execution
analysis_progress[analysis_id].update({
'status': 'executing',
'progress': 60,
'message': 'Executing analysis...'
})
# Generate and execute analysis
analysis_result = analysis_service.generate_and_execute_analysis(
session_id,
suggested_config,
user_query,
model,
include_previous_context,
selected_analyses or []
)
# Update progress: Finalizing
analysis_progress[analysis_id].update({
'status': 'finalizing',
'progress': 90,
'message': 'Finalizing results...'
})
# Prepare response data with proper serialization
response_data = {
'interpretation': {
'success': interpretation_result['interpretation']['success'],
'analysis_plan': interpretation_result['interpretation']['analysis_plan'],
'suggested_config': suggested_config.to_dict(),
'interpretation': interpretation_result['interpretation']['interpretation'],
'confidence': interpretation_result['interpretation']['confidence']
},
'analysis': analysis_result
}
# If analysis failed, provide debugging option
if not analysis_result['success'] and 'execution_result' in analysis_result:
response_data['debug_available'] = True
response_data['failed_step_id'] = analysis_result.get('generation_step_id')
# Ensure all data is JSON serializable
def make_json_serializable(obj):
if hasattr(obj, 'to_dict'):
return obj.to_dict()
elif isinstance(obj, dict):
return {k: make_json_serializable(v) for k, v in obj.items()}
elif isinstance(obj, list):
return [make_json_serializable(item) for item in obj]
elif hasattr(obj, 'value'): # Handle enums
return obj.value
else:
return obj
response_data = make_json_serializable(response_data)
# Update progress: Complete
analysis_progress[analysis_id].update({
'status': 'completed',
'progress': 100,
'message': 'Analysis completed successfully!',
'results': response_data,
'completed_at': datetime.now().isoformat()
})
except Exception as e:
logger.error(f"Error in async analysis {analysis_id}: {str(e)}")
# Clean up any partial analysis files on failure
if app_config.AUTO_CLEANUP_ENABLED:
try:
analysis_service.agent_executor.cleanup_old_analyses(session_id, keep_recent=app_config.KEEP_RECENT_ANALYSES)
except Exception as cleanup_error:
logger.warning(f"Cleanup after failure warning: {str(cleanup_error)}")
analysis_progress[analysis_id].update({
'status': 'failed',
'progress': 100,
'message': f'Analysis failed: {str(e)}',
'error': str(e)
})
Parameters
| Name | Type | Default | Kind |
|---|---|---|---|
analysis_id |
str | - | positional_or_keyword |
session_id |
str | - | positional_or_keyword |
user_query |
str | - | positional_or_keyword |
model |
str | 'gpt-4o' | positional_or_keyword |
include_previous_context |
bool | True | positional_or_keyword |
selected_analyses |
List[str] | None | positional_or_keyword |
Parameter Details
analysis_id: Unique identifier for this analysis run, used to track progress in the analysis_progress dictionary. Should be a unique string (typically UUID) generated before calling this function.
session_id: Identifier for the user session, used to maintain context across multiple analyses and manage session-specific data files and history.
user_query: Natural language query describing the analysis to perform (e.g., 'Show correlation between sales and marketing spend'). This is interpreted by the analysis service to generate appropriate analysis configurations.
model: LLM model identifier to use for query interpretation and script generation. Defaults to 'gpt-4o'. Can be any supported model string like 'gpt-4', 'gpt-3.5-turbo', etc.
include_previous_context: Boolean flag indicating whether to include previous analysis results and context when generating the new analysis. Defaults to True. Set to False for isolated analyses.
selected_analyses: Optional list of analysis IDs to include as context. If None, all previous analyses in the session may be considered. Use empty list [] to exclude all previous analyses even if include_previous_context is True.
Return Value
This function returns None as it updates the global analysis_progress dictionary in-place. The analysis_progress[analysis_id] dictionary is updated with keys: 'status' (interpreting|generating|executing|finalizing|completed|failed), 'progress' (0-100), 'message' (human-readable status), 'results' (complete analysis results on success), 'error' (error message on failure), 'completed_at' (ISO timestamp on completion), 'debug_available' (boolean if debugging is possible), 'failed_step_id' (ID of failed step if applicable).
Dependencies
flaskpandasdatetimetypingpathlibloggingthreadingjson
Required Imports
from datetime import datetime
from typing import List
import logging
Conditional/Optional Imports
These imports are only needed under specific conditions:
from services import StatisticalAnalysisService
Condition: Required for analysis_service instance to process queries and execute analyses
Required (conditional)from config import Config
Condition: Required for app_config to access AUTO_CLEANUP_ENABLED and KEEP_RECENT_ANALYSES settings
Required (conditional)from models import AnalysisConfiguration
Condition: Required for handling suggested_config objects and their to_dict() method
Required (conditional)Usage Example
import threading
from collections import defaultdict
import uuid
# Initialize required globals
analysis_progress = defaultdict(dict)
analysis_service = StatisticalAnalysisService(config)
app_config = Config()
logger = logging.getLogger(__name__)
# Generate unique analysis ID
analysis_id = str(uuid.uuid4())
session_id = 'user_session_123'
# Initialize progress tracking
analysis_progress[analysis_id] = {
'status': 'pending',
'progress': 0,
'message': 'Starting analysis...'
}
# Start analysis in background thread
thread = threading.Thread(
target=run_analysis_async,
args=(analysis_id, session_id, 'Show correlation between age and income'),
kwargs={
'model': 'gpt-4o',
'include_previous_context': True,
'selected_analyses': []
}
)
thread.start()
# Poll progress
while analysis_progress[analysis_id]['status'] not in ['completed', 'failed']:
print(f"Progress: {analysis_progress[analysis_id]['progress']}% - {analysis_progress[analysis_id]['message']}")
time.sleep(1)
# Check results
if analysis_progress[analysis_id]['status'] == 'completed':
results = analysis_progress[analysis_id]['results']
print(f"Analysis completed: {results}")
else:
error = analysis_progress[analysis_id].get('error', 'Unknown error')
print(f"Analysis failed: {error}")
Best Practices
- Always initialize the analysis_progress dictionary entry before calling this function to avoid race conditions
- Run this function in a separate thread to avoid blocking the main application thread
- Implement proper polling mechanism with reasonable intervals (1-2 seconds) to check progress status
- Handle both 'completed' and 'failed' status outcomes in the calling code
- Ensure the analysis_service instance is properly initialized with valid configuration and LLM credentials
- Consider implementing timeout mechanisms to prevent indefinitely running analyses
- The analysis_progress dictionary should be thread-safe (use threading.Lock if multiple threads access it)
- Clean up completed analysis entries from analysis_progress periodically to prevent memory leaks
- Validate that session_id exists and has proper data sources before calling this function
- Use unique analysis_id values (UUID recommended) to prevent conflicts in concurrent analyses
- Monitor the 'debug_available' flag in failed analyses to provide debugging options to users
- Implement proper error logging and monitoring for production deployments
- Consider setting include_previous_context=False for independent analyses to improve performance
Tags
Similar Components
AI-powered semantic similarity - components with related functionality:
-
function analyze_data 72.1% similar
-
function _continue_to_analysis 69.1% similar
-
function smartstat_run_analysis 65.0% similar
-
function demo_analysis_workflow 64.1% similar
-
function execute_enhanced_workflow_background 64.1% similar