🔍 Code Extractor

function run_analysis_async

Maturity: 68

Executes a data analysis workflow asynchronously with real-time progress tracking, including query interpretation, script generation, execution, and result finalization.

File:
/tf/active/vicechatdev/full_smartstat/app.py
Lines:
1148 - 1259
Complexity:
complex

Purpose

This function orchestrates an end-to-end asynchronous analysis pipeline for data science workflows. It processes natural language queries, generates analysis configurations, executes statistical analyses, and provides progress updates throughout the process. It handles error recovery, cleanup operations, and ensures all results are JSON-serializable for API responses. The function is designed to run in a background thread while updating a shared progress dictionary that can be polled by clients.

Source Code

def run_analysis_async(analysis_id: str, session_id: str, user_query: str, model: str = 'gpt-4o', include_previous_context: bool = True, selected_analyses: List[str] = None):
    """Run analysis asynchronously with progress updates"""
    try:
        # Update progress: Query interpretation
        analysis_progress[analysis_id].update({
            'status': 'interpreting',
            'progress': 10,
            'message': 'Understanding your query...'
        })
        
        # Process user query to understand intent
        interpretation_result = analysis_service.process_user_query(session_id, user_query)
        
        if not interpretation_result['success']:
            analysis_progress[analysis_id].update({
                'status': 'failed',
                'progress': 100,
                'message': 'Failed to interpret query',
                'error': interpretation_result.get('error', 'Unknown error')
            })
            return
        
        # Update progress: Script generation
        analysis_progress[analysis_id].update({
            'status': 'generating',
            'progress': 30,
            'message': 'Generating analysis script...'
        })
        
        suggested_config = interpretation_result['suggested_config']
        
        # Update progress: Script execution
        analysis_progress[analysis_id].update({
            'status': 'executing',
            'progress': 60,
            'message': 'Executing analysis...'
        })
        
        # Generate and execute analysis
        analysis_result = analysis_service.generate_and_execute_analysis(
            session_id, 
            suggested_config,
            user_query,
            model,
            include_previous_context,
            selected_analyses or []
        )
        
        # Update progress: Finalizing
        analysis_progress[analysis_id].update({
            'status': 'finalizing',
            'progress': 90,
            'message': 'Finalizing results...'
        })
        
        # Prepare response data with proper serialization
        response_data = {
            'interpretation': {
                'success': interpretation_result['interpretation']['success'],
                'analysis_plan': interpretation_result['interpretation']['analysis_plan'],
                'suggested_config': suggested_config.to_dict(),
                'interpretation': interpretation_result['interpretation']['interpretation'],
                'confidence': interpretation_result['interpretation']['confidence']
            },
            'analysis': analysis_result
        }
        
        # If analysis failed, provide debugging option
        if not analysis_result['success'] and 'execution_result' in analysis_result:
            response_data['debug_available'] = True
            response_data['failed_step_id'] = analysis_result.get('generation_step_id')
        
        # Ensure all data is JSON serializable
        def make_json_serializable(obj):
            if hasattr(obj, 'to_dict'):
                return obj.to_dict()
            elif isinstance(obj, dict):
                return {k: make_json_serializable(v) for k, v in obj.items()}
            elif isinstance(obj, list):
                return [make_json_serializable(item) for item in obj]
            elif hasattr(obj, 'value'):  # Handle enums
                return obj.value
            else:
                return obj
        
        response_data = make_json_serializable(response_data)
        
        # Update progress: Complete
        analysis_progress[analysis_id].update({
            'status': 'completed',
            'progress': 100,
            'message': 'Analysis completed successfully!',
            'results': response_data,
            'completed_at': datetime.now().isoformat()
        })
        
    except Exception as e:
        logger.error(f"Error in async analysis {analysis_id}: {str(e)}")
        
        # Clean up any partial analysis files on failure
        if app_config.AUTO_CLEANUP_ENABLED:
            try:
                analysis_service.agent_executor.cleanup_old_analyses(session_id, keep_recent=app_config.KEEP_RECENT_ANALYSES)
            except Exception as cleanup_error:
                logger.warning(f"Cleanup after failure warning: {str(cleanup_error)}")
        
        analysis_progress[analysis_id].update({
            'status': 'failed',
            'progress': 100,
            'message': f'Analysis failed: {str(e)}',
            'error': str(e)
        })

Parameters

Name Type Default Kind
analysis_id str - positional_or_keyword
session_id str - positional_or_keyword
user_query str - positional_or_keyword
model str 'gpt-4o' positional_or_keyword
include_previous_context bool True positional_or_keyword
selected_analyses List[str] None positional_or_keyword

Parameter Details

analysis_id: Unique identifier for this analysis run, used to track progress in the analysis_progress dictionary. Should be a unique string (typically UUID) generated before calling this function.

session_id: Identifier for the user session, used to maintain context across multiple analyses and manage session-specific data files and history.

user_query: Natural language query describing the analysis to perform (e.g., 'Show correlation between sales and marketing spend'). This is interpreted by the analysis service to generate appropriate analysis configurations.

model: LLM model identifier to use for query interpretation and script generation. Defaults to 'gpt-4o'. Can be any supported model string like 'gpt-4', 'gpt-3.5-turbo', etc.

include_previous_context: Boolean flag indicating whether to include previous analysis results and context when generating the new analysis. Defaults to True. Set to False for isolated analyses.

selected_analyses: Optional list of analysis IDs to include as context. If None, all previous analyses in the session may be considered. Use empty list [] to exclude all previous analyses even if include_previous_context is True.

Return Value

This function returns None as it updates the global analysis_progress dictionary in-place. The analysis_progress[analysis_id] dictionary is updated with keys: 'status' (interpreting|generating|executing|finalizing|completed|failed), 'progress' (0-100), 'message' (human-readable status), 'results' (complete analysis results on success), 'error' (error message on failure), 'completed_at' (ISO timestamp on completion), 'debug_available' (boolean if debugging is possible), 'failed_step_id' (ID of failed step if applicable).

Dependencies

  • flask
  • pandas
  • datetime
  • typing
  • pathlib
  • logging
  • threading
  • json

Required Imports

from datetime import datetime
from typing import List
import logging

Conditional/Optional Imports

These imports are only needed under specific conditions:

from services import StatisticalAnalysisService

Condition: Required for analysis_service instance to process queries and execute analyses

Required (conditional)
from config import Config

Condition: Required for app_config to access AUTO_CLEANUP_ENABLED and KEEP_RECENT_ANALYSES settings

Required (conditional)
from models import AnalysisConfiguration

Condition: Required for handling suggested_config objects and their to_dict() method

Required (conditional)

Usage Example

import threading
from collections import defaultdict
import uuid

# Initialize required globals
analysis_progress = defaultdict(dict)
analysis_service = StatisticalAnalysisService(config)
app_config = Config()
logger = logging.getLogger(__name__)

# Generate unique analysis ID
analysis_id = str(uuid.uuid4())
session_id = 'user_session_123'

# Initialize progress tracking
analysis_progress[analysis_id] = {
    'status': 'pending',
    'progress': 0,
    'message': 'Starting analysis...'
}

# Start analysis in background thread
thread = threading.Thread(
    target=run_analysis_async,
    args=(analysis_id, session_id, 'Show correlation between age and income'),
    kwargs={
        'model': 'gpt-4o',
        'include_previous_context': True,
        'selected_analyses': []
    }
)
thread.start()

# Poll progress
while analysis_progress[analysis_id]['status'] not in ['completed', 'failed']:
    print(f"Progress: {analysis_progress[analysis_id]['progress']}% - {analysis_progress[analysis_id]['message']}")
    time.sleep(1)

# Check results
if analysis_progress[analysis_id]['status'] == 'completed':
    results = analysis_progress[analysis_id]['results']
    print(f"Analysis completed: {results}")
else:
    error = analysis_progress[analysis_id].get('error', 'Unknown error')
    print(f"Analysis failed: {error}")

Best Practices

  • Always initialize the analysis_progress dictionary entry before calling this function to avoid race conditions
  • Run this function in a separate thread to avoid blocking the main application thread
  • Implement proper polling mechanism with reasonable intervals (1-2 seconds) to check progress status
  • Handle both 'completed' and 'failed' status outcomes in the calling code
  • Ensure the analysis_service instance is properly initialized with valid configuration and LLM credentials
  • Consider implementing timeout mechanisms to prevent indefinitely running analyses
  • The analysis_progress dictionary should be thread-safe (use threading.Lock if multiple threads access it)
  • Clean up completed analysis entries from analysis_progress periodically to prevent memory leaks
  • Validate that session_id exists and has proper data sources before calling this function
  • Use unique analysis_id values (UUID recommended) to prevent conflicts in concurrent analyses
  • Monitor the 'debug_available' flag in failed analyses to provide debugging options to users
  • Implement proper error logging and monitoring for production deployments
  • Consider setting include_previous_context=False for independent analyses to improve performance

Similar Components

AI-powered semantic similarity - components with related functionality:

  • function analyze_data 72.1% similar

    Flask route handler that initiates an asynchronous data analysis process based on user query, creating a background thread to perform the analysis and returning an analysis ID for progress tracking.

    From: /tf/active/vicechatdev/full_smartstat/app.py
  • function _continue_to_analysis 69.1% similar

    Continues statistical analysis workflow after data retrieval by configuring analysis parameters, executing statistical analysis via StatisticalAnalysisService, and updating workflow progress status.

    From: /tf/active/vicechatdev/full_smartstat/app.py
  • function smartstat_run_analysis 65.0% similar

    Flask API endpoint that initiates a SmartStat statistical analysis in a background thread, tracking progress and persisting results to a data section.

    From: /tf/active/vicechatdev/vice_ai/new_app.py
  • function demo_analysis_workflow 64.1% similar

    Demonstrates a complete end-to-end statistical analysis workflow using the SmartStat system, including session creation, data loading, natural language query processing, analysis execution, and result interpretation.

    From: /tf/active/vicechatdev/full_smartstat/demo.py
  • function execute_enhanced_workflow_background 64.1% similar

    Executes an enhanced SQL workflow in a background thread, retrieving data from a database, processing it through an AI-powered workflow, and automatically triggering statistical analysis on the results.

    From: /tf/active/vicechatdev/full_smartstat/app.py
← Back to Browse