🔍 Code Extractor

class StatisticalAnalysisService

Maturity: 26

Main service for statistical analysis orchestration

File:
/tf/active/vicechatdev/full_smartstat/services.py
Lines:
76 - 1564
Complexity:
moderate

Purpose

Main service for statistical analysis orchestration

Source Code

class StatisticalAnalysisService:
    """Main service for statistical analysis orchestration"""
    
    def __init__(self, config: Config):
        self.config = config
        self.database_manager = DatabaseManager(config.DATABASE_URL)
        self.data_processor = DataProcessor(config)
        self.statistical_agent = StatisticalAgent(config)
        self.agent_executor = AgentExecutor(config)  # New agent-based executor
        
    def create_analysis_session(self, title: str, description: str = "",
                              user_id: str = "default") -> str:
        """Create new analysis session"""
        try:
            session_id = str(uuid.uuid4())  # Generate session ID first
            logger.info(f"Creating session with ID: {session_id}")
            
            try:
                session = StatisticalSession(
                    session_id=session_id,
                    title=title,
                    description=description,
                    status=AnalysisStatus.PENDING,  # Use PENDING instead of CREATED
                    data_source=None,  # Don't create defaults in constructor
                    analysis_config=None  # Don't create defaults in constructor
                )
                logger.info(f"StatisticalSession object created successfully")
            except Exception as session_error:
                logger.error(f"Error creating StatisticalSession object: {str(session_error)}")
                raise session_error
            
            try:
                created_session_id = self.database_manager.create_session(session)
                logger.info(f"Created analysis session: {created_session_id}")
                return created_session_id
            except Exception as db_error:
                logger.error(f"Error calling database_manager.create_session: {str(db_error)}")
                raise db_error
            
        except Exception as e:
            logger.error(f"Error in create_analysis_session: {str(e)}")
            logger.error(f"Exception type: {type(e)}")
            # Simplified traceback to avoid formatting issues
            import traceback
            tb_lines = traceback.format_exception(type(e), e, e.__traceback__)
            logger.error(f"Traceback: {''.join(tb_lines)}")
            raise e
    
    def load_data_for_session(self, session_id: str, data_source: DataSource) -> Dict[str, Any]:
        """Load data for analysis session"""
        try:
            # Update session with data source
            session = self.database_manager.get_session(session_id)
            if not session:
                raise ValueError(f"Session {session_id} not found")
            
            session.data_source = data_source
            session.status = AnalysisStatus.PROCESSING
            session.updated_at = datetime.now()
            
            # Save updated session
            self.database_manager.update_session(session)
            
            # Load and process data
            if data_source.source_type == DataSourceType.SQL_WORKFLOW:
                # New SQL workflow: user query -> LLM generates SQL -> execute
                df, metadata = self.data_processor.load_data_from_sql_workflow(
                    user_query=data_source.user_query,
                    schema_file=data_source.schema_file,
                    connection_config=data_source.connection_config,
                    statistical_agent=self.statistical_agent
                )
            else:
                # Regular workflow (file upload or direct SQL)
                df, metadata = self.data_processor.load_data(data_source)
            
            # Generate data summary
            data_summary = self.data_processor.get_data_summary(df)
            
            # Check if dataset is empty but has valid structure
            is_empty_but_valid = df.empty and df.shape[1] > 0
            
            # Create analysis step for data loading
            step = AnalysisStep(
                step_id=str(uuid.uuid4()),
                session_id=session_id,
                step_number=1,
                step_type="data_load",
                input_data={
                    'data_source': data_source.to_dict(),
                    'metadata': metadata
                },
                execution_output=safe_json_dumps(data_summary),
                execution_success=True,  # Consider empty results as successful if query executed
                notes="Empty dataset - query executed successfully but returned no matching records" if is_empty_but_valid else None
            )
            
            self.database_manager.add_analysis_step(step)
            
            # Store data summary as result
            result = AnalysisResult(
                result_id=str(uuid.uuid4()),
                session_id=session_id,
                step_id=step.step_id,
                result_type="data_summary",
                result_data=data_summary,
                metadata=metadata
            )
            
            self.database_manager.add_analysis_result(result)
            
            # Store dataframe temporarily (in production, use proper data storage)
            self._store_session_data(session_id, df)
            
            # Update session with generated SQL query if available
            if metadata.get('sql_query'):
                session.sql_query = metadata['sql_query']
                session.updated_at = datetime.now()
                self.database_manager.update_session(session)
                logger.info(f"Updated session {session_id} with SQL query")
            
            response = {
                'success': True,
                'data_summary': data_summary,
                'metadata': metadata,
                'step_id': step.step_id,
                'shape': df.shape,
                'columns': df.columns.tolist(),
                'is_empty_result': df.empty and df.shape[1] > 0
            }
            
            # Include diagnostic information if available
            if 'diagnostic_results' in metadata:
                response['diagnostic_info'] = {
                    'explanation': metadata.get('diagnostic_explanation', ''),
                    'results': metadata['diagnostic_results']
                }
            
            return response
            
        except Exception as e:
            logger.error(f"Error loading data for session {session_id}: {str(e)}")
            self.database_manager.update_session_status(session_id, AnalysisStatus.FAILED)
            return {
                'success': False,
                'error': str(e)
            }
    
    def load_data_for_session_direct(self, session_id: str, df: pd.DataFrame, metadata: Dict[str, Any]) -> Dict[str, Any]:
        """Load data directly from dataframe for enhanced workflows"""
        try:
            # Update session status
            self.database_manager.update_session_status(session_id, AnalysisStatus.DATA_LOADED)
            
            # Generate data summary
            data_summary = self.data_processor.get_data_summary(df)
            
            # Check if dataset is empty but has valid structure
            is_empty_but_valid = df.empty and df.shape[1] > 0
            
            # Create analysis step for data loading
            step = AnalysisStep(
                step_id=str(uuid.uuid4()),
                session_id=session_id,
                step_number=1,
                step_type="enhanced_data_load",
                input_data={
                    'enhanced_workflow': True,
                    'metadata': metadata
                },
                execution_output=safe_json_dumps(data_summary),
                execution_success=True,
                notes="Enhanced SQL workflow - data loaded with iterative optimization" if not is_empty_but_valid else "Enhanced workflow completed but returned no data"
            )
            
            self.database_manager.add_analysis_step(step)
            
            # Store data summary as result (use standard result_type for compatibility)
            result = AnalysisResult(
                result_id=str(uuid.uuid4()),
                session_id=session_id,
                step_id=step.step_id,
                result_type="data_summary",  # Use standard type so it appears in data tab
                result_data=data_summary,
                metadata=metadata
            )
            
            self.database_manager.add_analysis_result(result)
            
            # Store dataframe
            self._store_session_data(session_id, df)
            
            return {
                'success': True,
                'data_summary': data_summary,
                'metadata': metadata,
                'step_id': step.step_id,
                'shape': df.shape,
                'columns': df.columns.tolist(),
                'is_empty_result': df.empty and df.shape[1] > 0,
                'workflow_type': 'enhanced_sql_workflow'
            }
            
        except Exception as e:
            logger.error(f"Error loading direct data for session {session_id}: {str(e)}")
            return {
                'success': False,
                'error': str(e)
            }
    
    def process_user_query(self, session_id: str, user_query: str) -> Dict[str, Any]:
        """Process natural language query and suggest analysis"""
        try:
            # Get session and data
            session = self.database_manager.get_session(session_id)
            if not session:
                raise ValueError(f"Session {session_id} not found")
            
            df = self._load_session_data(session_id)
            if df is None:
                raise ValueError(f"No data loaded for session {session_id}")
            
            data_summary = self._get_latest_data_summary(session_id)
            
            # Interpret query using LLM
            interpretation_result = self.statistical_agent.interpret_user_query(
                user_query, data_summary, df.columns.tolist()
            )
            
            if not interpretation_result['success']:
                return interpretation_result
            
            # Update session with analysis configuration
            session.analysis_config = interpretation_result['suggested_config']
            session.updated_at = datetime.now()
            
            # Create analysis step for query interpretation
            # Prepare serializable interpretation result for storage
            serializable_result = {
                'success': interpretation_result['success'],
                'analysis_plan': interpretation_result['analysis_plan'],
                'suggested_config': interpretation_result['suggested_config'].to_dict(),
                'interpretation': interpretation_result['interpretation'],
                'confidence': interpretation_result['confidence']
            }
            
            step = AnalysisStep(
                step_id=str(uuid.uuid4()),
                session_id=session_id,
                step_number=self._get_next_step_number(session_id),
                step_type="query_interpretation",
                input_data={'user_query': user_query},
                execution_output=safe_json_dumps(serializable_result),
                execution_success=True
            )
            
            self.database_manager.add_analysis_step(step)
            
            return {
                'success': True,
                'interpretation': interpretation_result,
                'step_id': step.step_id,
                'suggested_config': interpretation_result['suggested_config']  # Return the actual object, not dict
            }
            
        except Exception as e:
            logger.error(f"Error processing query for session {session_id}: {str(e)}")
            return {
                'success': False,
                'error': str(e)
            }
    
    def generate_and_execute_analysis(self, session_id: str, 
                                    analysis_config: AnalysisConfiguration = None,
                                    user_query: str = "",
                                    model: str = 'gpt-4o',
                                    include_previous_context: bool = True,
                                    selected_analyses: List[str] = None) -> Dict[str, Any]:
        """Generate and execute statistical analysis script with optional previous context"""
        try:
            # Get session and data
            session = self.database_manager.get_session(session_id)
            if not session:
                raise ValueError(f"Session {session_id} not found")
            
            if analysis_config is None:
                analysis_config = session.analysis_config
            
            df = self._load_session_data(session_id)
            if df is None:
                raise ValueError(f"No data loaded for session {session_id}")
            
            data_summary = self._get_latest_data_summary(session_id)
            
            # Collect previous analysis context if requested
            previous_context = None
            if include_previous_context:
                previous_context = self._collect_previous_analysis_context(session_id, selected_analyses)
            
            # Validate analysis configuration
            validation = self.data_processor.validate_columns_for_analysis(
                df, analysis_config.target_variables, analysis_config.grouping_variables
            )
            
            if not validation['valid']:
                return {
                    'success': False,
                    'error': f"Invalid analysis configuration: {'; '.join(validation['errors'])}"
                }
            
            # Generate analysis script with previous context
            script_result = self.statistical_agent.generate_analysis_script(
                analysis_config, data_summary, user_query, model, previous_context
            )
            
            if not script_result['success']:
                return script_result
            
            # Create step for script generation
            generation_step = AnalysisStep(
                step_id=str(uuid.uuid4()),
                session_id=session_id,
                step_number=self._get_next_step_number(session_id),
                step_type="script_generation",
                input_data={
                    'analysis_config': analysis_config.to_dict(),
                    'user_query': user_query
                },
                generated_script=script_result['script'],
                execution_success=True
            )
            
            self.database_manager.add_analysis_step(generation_step)
            
            # Execute script
            execution_result = self._execute_analysis_script(
                session_id, generation_step.step_id, script_result['script'], df, user_query, model
            )
            
            # Clean up old analysis files to prevent accumulation
            if self.config.AUTO_CLEANUP_ENABLED:
                try:
                    self.agent_executor.cleanup_old_analyses(session_id, keep_recent=self.config.KEEP_RECENT_ANALYSES)
                except Exception as cleanup_error:
                    logger.warning(f"Cleanup warning: {str(cleanup_error)}")
            
            return {
                'success': execution_result['success'],
                'script': script_result['script'],
                'explanation': script_result['explanation'],
                'execution_result': execution_result,
                'generation_step_id': generation_step.step_id
            }
            
        except Exception as e:
            logger.error(f"Error generating analysis for session {session_id}: {str(e)}")
            self.database_manager.update_session_status(session_id, AnalysisStatus.FAILED)
            return {
                'success': False,
                'error': str(e)
            }
    
    def debug_and_retry_analysis(self, session_id: str, failed_step_id: str, model: str = 'gpt-4o') -> Dict[str, Any]:
        """Debug failed analysis and retry execution using agent-based approach"""
        try:
            # Get failed step
            steps = self.database_manager.get_session_steps(session_id)
            failed_step = next((s for s in steps if s.step_id == failed_step_id), None)
            
            if not failed_step:
                raise ValueError(f"Step {failed_step_id} not found")
            
            if failed_step.execution_success:
                return {
                    'success': False,
                    'error': "Step was already successful"
                }
            
            # Get session data
            df = self._load_session_data(session_id)
            session = self.database_manager.get_session(session_id)
            data_summary = self._get_latest_data_summary(session_id)
            
            # Count debugging iterations for this script
            debug_iteration = len([s for s in steps if s.step_type == "debugging" and 
                                 s.input_data.get('original_step_id') == failed_step_id]) + 1
            
            # Try to find the project directory for this session
            project_dir = Path(self.config.GENERATED_SCRIPTS_FOLDER) / session_id
            
            if project_dir.exists():
                # Retry execution with more iterations for debugging
                execution_result = self.agent_executor.execute_analysis_project(
                    project_dir=str(project_dir),
                    max_iterations=5,  # Allow more debugging iterations
                    model=model
                )
            else:
                # Generate a new project if the old one doesn't exist
                logger.info(f"Project directory not found, regenerating for session {session_id}")
                
                project_result = self.agent_executor.generate_analysis_project(
                    session_id=session_id,
                    user_query=f"Retry statistical analysis with debugging",
                    data_summary=data_summary,
                    analysis_config=session.analysis_config,  # Pass the object
                    session_data=self._load_session_data(session_id),  # Pass the data
                    model=model
                )
                
                if not project_result['success']:
                    return project_result
                
                execution_result = self.agent_executor.execute_analysis_project(
                    project_dir=project_result['project_dir'],
                    max_iterations=5,
                    model=model
                )
            
            # Create debugging step
            debug_step = AnalysisStep(
                step_id=str(uuid.uuid4()),
                session_id=session_id,
                step_number=self._get_next_step_number(session_id),
                step_type="debugging",
                input_data={
                    'original_step_id': failed_step_id,
                    'iteration': debug_iteration,
                    'original_error': failed_step.execution_error
                },
                generated_script="",  # Agent generates files separately
                execution_output=execution_result.get('output', '')
            )
            
            self.database_manager.add_analysis_step(debug_step)
            
            return {
                'success': execution_result['success'],
                'debug_explanation': f"Debug iteration {debug_iteration} completed",
                'execution_result': execution_result,
                'debug_step_id': debug_step.step_id,
                'iteration': debug_iteration,
                'generated_files': execution_result.get('generated_files', [])
            }
            
        except Exception as e:
            logger.error(f"Error debugging analysis for session {session_id}: {str(e)}")
            return {
                'success': False,
                'error': str(e)
            }
    
    def _execute_analysis_script(self, session_id: str, step_id: str, 
                               script: str, df: pd.DataFrame, user_query: str = "", model: str = 'gpt-4o') -> Dict[str, Any]:
        """Execute analysis script using agent-based executor"""
        try:
            # Update session status
            self.database_manager.update_session_status(session_id, AnalysisStatus.PROCESSING)
            
            # Get session data summary for context
            session = self.database_manager.get_session(session_id)
            if not session:
                raise ValueError(f"Session {session_id} not found")
            
            data_summary = self._get_latest_data_summary(session_id)
            analysis_config = session.analysis_config
            
            # Store dataframe for the agent to access
            self._store_session_data(session_id, df)
            
            # Use agent executor to generate and run analysis project
            project_result = self.agent_executor.generate_analysis_project(
                session_id=session_id,
                user_query=user_query if user_query else f"Statistical analysis: {analysis_config.analysis_type.value if analysis_config else 'general'}",
                data_summary=data_summary,
                analysis_config=analysis_config,  # Pass the object, not the dict
                session_data=df,  # Pass the dataframe directly
                model=model
            )
            
            if not project_result['success']:
                return project_result
            
            # Execute the generated project
            execution_result = self.agent_executor.execute_analysis_project(
                project_dir=project_result['project_dir'],
                max_iterations=3,
                model=model
            )
            
            # Update step with execution results
            step = self.database_manager.get_session_steps(session_id)
            current_step = next((s for s in step if s.step_id == step_id), None)
            if current_step:
                current_step.execution_output = execution_result.get('execution_output', execution_result.get('output', ''))
                current_step.execution_error = execution_result.get('execution_error', execution_result.get('error', ''))
                current_step.execution_success = execution_result['success']
                
                # Store enhanced execution metadata
                current_step.metadata = current_step.metadata or {}
                if execution_result.get('generated_files'):
                    current_step.metadata['generated_files'] = execution_result['generated_files']
                if execution_result.get('execution_log'):
                    current_step.metadata['execution_log'] = execution_result['execution_log']
                if execution_result.get('debug_iterations'):
                    current_step.metadata['debug_iterations'] = execution_result['debug_iterations']
                if execution_result.get('execution_time'):
                    current_step.metadata['execution_time'] = execution_result['execution_time']
                if execution_result.get('iteration'):
                    current_step.metadata['iterations_used'] = execution_result['iteration']
                
                # Save the updated step with execution metadata to database
                self.database_manager.update_step(current_step)
                logger.info(f"Updated step {step_id} with execution metadata")
            
            if execution_result['success']:
                # Store successful results
                self._store_execution_results(session_id, step_id, execution_result)
                self.database_manager.update_session_status(session_id, AnalysisStatus.COMPLETED)
            else:
                self.database_manager.update_session_status(session_id, AnalysisStatus.DEBUGGING)
            
            return execution_result
            
        except Exception as e:
            logger.error(f"Error executing script for session {session_id}: {str(e)}")
            self.database_manager.update_session_status(session_id, AnalysisStatus.FAILED)
            return {
                'success': False,
                'error': str(e)
            }
    
    def _store_execution_results(self, session_id: str, step_id: str, 
                               execution_result: Dict[str, Any]):
        """Store execution results in database"""
        
        # Store main results
        if execution_result.get('results'):
            result = AnalysisResult(
                result_id=str(uuid.uuid4()),
                session_id=session_id,
                step_id=step_id,
                result_type="analysis_results",
                result_data=execution_result['results'],
                metadata={
                    'execution_time': execution_result['execution_time'],
                    'output': execution_result['output']
                }
            )
            self.database_manager.add_analysis_result(result)
        
        # Store plots
        if execution_result.get('plots'):
            for plot_path in execution_result['plots']:
                plot_result = AnalysisResult(
                    result_id=str(uuid.uuid4()),
                    session_id=session_id,
                    step_id=step_id,
                    result_type="plot",
                    result_data={'description': 'Generated plot'},
                    file_paths=[plot_path],
                    metadata={'plot_type': 'analysis_plot'}
                )
                self.database_manager.add_analysis_result(plot_result)
    
    def get_session_summary(self, session_id: str) -> Dict[str, Any]:
        """Get comprehensive session summary"""
        try:
            logger.debug(f"Getting session summary for {session_id}")
            session = self.database_manager.get_session(session_id)
            if not session:
                return {'success': False, 'error': 'Session not found'}
            
            logger.debug("Getting session steps...")
            steps = self.database_manager.get_session_steps(session_id)
            logger.debug("Getting session results...")
            results = self.database_manager.get_session_results(session_id)
            
            # Get data summary
            logger.debug("Getting latest data summary...")
            data_summary = self._get_latest_data_summary(session_id)
            
            # Organize results by type
            logger.debug("Organizing results by type...")
            organized_results = {
                'analysis_results': [],
                'plots': [],
                'data_summary': data_summary
            }
            
            for result in results:
                if result.result_type == 'plot':
                    organized_results['plots'].append({
                        'file_path': result.file_paths[0] if result.file_paths else '',
                        'metadata': result.metadata
                    })
                elif result.result_type == 'analysis_results':
                    organized_results['analysis_results'].append(result.result_data)
            
            # Get generated files from agent executor
            logger.debug("Getting generated files info...")
            generated_files = self._get_generated_files_info(session_id)
            
            # Count plots from both organized_results and generated_files
            plots_from_results = len(organized_results['plots']) if organized_results.get('plots') else 0
            plots_from_files = len([f for f in generated_files if f.get('type') == 'plot']) if generated_files else 0
            total_plots = max(plots_from_results, plots_from_files)  # Use the higher count to avoid duplicates
            
            # Get SQL query from session steps or metadata
            sql_query = None
            logger.debug(f"Looking for SQL query in {len(steps) if steps else 0} steps")
            
            if steps:
                for step in reversed(steps):  # Look from most recent
                    if step.metadata:
                        logger.debug(f"Step {step.step_id} metadata keys: {list(step.metadata.keys())}")
                        if 'sql_query' in step.metadata:
                            sql_query = step.metadata['sql_query']
                            logger.debug(f"Found SQL query in step metadata: {sql_query[:100] if sql_query else 'None'}...")
                            break
                        elif 'query_metadata' in step.metadata:
                            query_meta = step.metadata['query_metadata']
                            if isinstance(query_meta, dict) and 'sql_query' in query_meta:
                                sql_query = query_meta['sql_query']
                                logger.debug(f"Found SQL query in query_metadata: {sql_query[:100] if sql_query else 'None'}...")
                                break
                        # Also check for SQL in results metadata
                        elif step.step_type == 'data_collection' and 'sql' in step.metadata:
                            sql_query = step.metadata['sql']
                            logger.debug(f"Found SQL in step.metadata.sql: {sql_query[:100] if sql_query else 'None'}...")
                            break
            
            # If not found in steps, check session sql_query field
            if not sql_query and session:
                sql_query = getattr(session, 'sql_query', None)
                logger.debug(f"Session sql_query: {sql_query[:100] if sql_query else 'None'}...")
            
            logger.debug(f"Final SQL query found: {'Yes' if sql_query else 'No'}")

            # Get analysis history
            logger.debug("Getting analysis history...")
            try:
                analysis_history = self.get_analysis_history(session_id)
                logger.debug("Analysis history retrieved successfully")
            except Exception as e:
                logger.error(f"Error in get_analysis_history: {e}")
                analysis_history = []
            
            try:
                logger.debug("Creating summary data dictionary...")
                session_dict = session.to_dict()
                logger.debug("Converting steps to dict...")
                steps_dict = [step.to_dict() for step in steps]
                logger.debug("Calculating successful steps...")
                successful_steps = len([s for s in steps if getattr(s, 'execution_success', False) is True]) if steps else 0
                logger.debug("Getting session status...")
                session_status = session.status.value if hasattr(session.status, 'value') else str(session.status)
                
                summary_data = {
                    'success': True,
                    'session': session_dict,
                    'steps': steps_dict,
                    'results': organized_results,
                    'generated_files': generated_files,
                    'analysis_history': analysis_history,
                    'sql_query': sql_query,  # Add SQL query to the response
                    'summary': {
                        'total_steps': len(steps) if steps else 0,
                        'successful_steps': successful_steps if successful_steps is not None else 0,
                        'plots_generated': total_plots,
                        'status': session_status
                    }
                }
                logger.debug("Summary data created successfully")
            except Exception as e:
                logger.error(f"Error creating summary data: {e}")
                raise
            
            # Clean any NaN values to prevent JSON serialization issues
            logger.debug("Cleaning NaN values...")
            try:
                cleaned_data = clean_nan_for_json(summary_data)
                logger.debug("Data cleaned successfully")
                return cleaned_data
            except Exception as e:
                logger.error(f"Error cleaning NaN values: {e}")
                raise
            
        except Exception as e:
            logger.error(f"Error getting session summary {session_id}: {str(e)}")
            return {'success': False, 'error': str(e)}
    
    def _get_generated_files_info(self, session_id: str) -> List[Dict[str, Any]]:
        """Get information about generated files for a session"""
        try:
            session_dir = Path(self.config.OUTPUT_DIR) / session_id
            if not session_dir.exists():
                return []
            
            generated_files = []
            
            # Look for analysis directories (analysis_HASH format) within this session only
            analysis_dirs = [d for d in session_dir.iterdir() if d.is_dir() and d.name.startswith('analysis_')]
            
            if analysis_dirs:
                # Use the most recent analysis directory within this session
                try:
                    def safe_mtime(d):
                        try:
                            mtime = d.stat().st_mtime
                            return mtime if mtime is not None else 0
                        except (OSError, AttributeError):
                            return 0
                    
                    latest_analysis_dir = max(analysis_dirs, key=safe_mtime)
                except (OSError, TypeError, ValueError) as e:
                    logger.warning(f"Error getting file stats for analysis directories: {e}")
                    latest_analysis_dir = analysis_dirs[0]  # Use first available if stats fail
                
                # Process files in the analysis directory
                for file_path in latest_analysis_dir.glob("*"):
                    if file_path.is_file() and file_path.name != 'venv':  # Skip virtual environment
                        file_type = self._determine_file_type(file_path)
                        
                        file_info = {
                            'name': file_path.name,
                            'type': file_type,
                            'description': self._get_file_description(file_path, file_type),
                            'downloadable': True,
                            'path': str(file_path),
                            'relative_path': f"{latest_analysis_dir.name}/{file_path.name}"  # For URL construction
                        }
                        
                        # For text files, include content preview
                        if file_path.suffix.lower() in ['.txt', '.log', '.csv', '.json', '.py']:
                            try:
                                with open(file_path, 'r', encoding='utf-8') as f:
                                    content = f.read()
                                    # For scripts, provide more content for display
                                    if file_path.suffix.lower() == '.py':
                                        file_info['content'] = content  # Full content for scripts
                                        file_info['content_preview'] = content[:5000] + '...' if len(content) > 5000 else content
                                    else:
                                        file_info['content'] = content[:2000] if len(content) > 2000 else content
                                        file_info['content_preview'] = content[:500] + '...' if len(content) > 500 else content
                            except Exception as e:
                                logger.warning(f"Could not read file {file_path}: {str(e)}")
                                file_info['content'] = 'Content unavailable'
                        
                        generated_files.append(file_info)
            
            # Also check for files directly in session directory (legacy support)
            for file_path in session_dir.glob("*"):
                if file_path.is_file():
                    file_type = self._determine_file_type(file_path)
                    
                    file_info = {
                        'name': file_path.name,
                        'type': file_type,
                        'description': self._get_file_description(file_path, file_type),
                        'downloadable': True,
                        'path': str(file_path),
                        'relative_path': file_path.name  # For URL construction
                    }
                    
                    # For text files, include content preview
                    if file_path.suffix.lower() in ['.txt', '.log', '.csv', '.json', '.py']:
                        try:
                            with open(file_path, 'r', encoding='utf-8') as f:
                                content = f.read()
                                # For scripts, provide more content for display
                                if file_path.suffix.lower() == '.py':
                                    file_info['content'] = content  # Full content for scripts
                                    file_info['content_preview'] = content[:5000] + '...' if len(content) > 5000 else content
                                else:
                                    file_info['content'] = content[:2000] if len(content) > 2000 else content
                                    file_info['content_preview'] = content[:500] + '...' if len(content) > 500 else content
                        except Exception as e:
                            logger.warning(f"Could not read file {file_path}: {str(e)}")
                            file_info['content'] = 'Content unavailable'
                    
                    generated_files.append(file_info)
            
            return generated_files
            
        except Exception as e:
            logger.error(f"Error getting generated files for session {session_id}: {str(e)}")
            return []
    
    def _get_execution_tracking(self, session_id: str) -> Dict[str, Any]:
        """Get execution tracking information for a session"""
        try:
            steps = self.database_manager.get_session_steps(session_id)
            if not steps:
                return {}
            

            
            # Get the most recent execution step (any step with execution metadata)
            execution_steps = [s for s in steps if s.metadata and (
                'execution_log' in s.metadata or 
                'debug_iterations' in s.metadata or 
                s.execution_success is not None
            )]
            if not execution_steps:
                # Fallback: get any steps with script generation or debugging
                execution_steps = [s for s in steps if s.step_type in ['script_generation', 'debugging']]
            if not execution_steps:
                return {}
            
            latest_step = max(execution_steps, key=lambda x: x.created_at)
            metadata = latest_step.metadata or {}
            
            execution_tracking = {
                'last_execution': {
                    'step_id': latest_step.step_id,
                    'timestamp': latest_step.created_at.isoformat() if latest_step.created_at else None,
                    'success': latest_step.execution_success,
                    'output': latest_step.execution_output or '',
                    'error': latest_step.execution_error or '',
                    'iterations_used': metadata.get('iterations_used', 1)
                }
            }
            
            # Add execution log if available
            if 'execution_log' in metadata:
                execution_tracking['execution_log'] = metadata['execution_log']
            
            # Add debug iterations if available
            if 'debug_iterations' in metadata:
                execution_tracking['debug_iterations'] = metadata['debug_iterations']
                execution_tracking['debugging_occurred'] = len(metadata['debug_iterations']) > 0
            
            # Add execution timing if available
            if 'execution_time' in metadata:
                execution_tracking['execution_time'] = metadata['execution_time']
            
            # Generate content summary
            execution_tracking['content_summary'] = self._generate_content_summary(session_id)
            
            return execution_tracking
            
        except Exception as e:
            logger.error(f"Error getting execution tracking for session {session_id}: {str(e)}")
            return {}
    
    def _generate_content_summary(self, session_id: str) -> Dict[str, Any]:
        """Generate a summary of what content was generated during execution"""
        try:
            generated_files = self._get_generated_files_info(session_id)
            
            content_summary = {
                'total_files': len(generated_files),
                'file_types': {},
                'content_details': []
            }
            
            for file_info in generated_files:
                file_type = file_info.get('type', 'unknown')
                if file_type not in content_summary['file_types']:
                    content_summary['file_types'][file_type] = 0
                content_summary['file_types'][file_type] += 1
                
                # Add detailed info for important files
                if file_type in ['script', 'plot', 'text']:
                    content_details = {
                        'name': file_info.get('name'),
                        'type': file_type,
                        'description': file_info.get('description'),
                        'has_content': bool(file_info.get('content') or file_info.get('content_preview'))
                    }
                    
                    if file_type == 'script':
                        content_details['content_length'] = len(file_info.get('content', ''))
                        content_details['preview_available'] = bool(file_info.get('content_preview'))
                    
                    content_summary['content_details'].append(content_details)
            
            return content_summary
            
        except Exception as e:
            logger.error(f"Error generating content summary for session {session_id}: {str(e)}")
            return {}
    
    def get_session_results(self, session_id: str) -> List[Dict[str, Any]]:
        """Get session results - wrapper around database manager method"""
        try:
            return self.database_manager.get_session_results(session_id)
        except Exception as e:
            logger.error(f"Error getting session results for {session_id}: {str(e)}")
            return []
    
    def get_analysis_history(self, session_id: str) -> List[Dict[str, Any]]:
        """Get analysis history for a session with file information"""
        try:
            session_dir = Path(self.config.OUTPUT_DIR) / session_id
            if not session_dir.exists():
                return []
            
            analysis_history = []
            
            # Look for analysis directories
            try:
                all_dirs = list(session_dir.iterdir())
                analysis_dirs = []
                for d in all_dirs:
                    try:
                        if d.is_dir() and d.name.startswith('analysis_'):
                            analysis_dirs.append(d)
                    except Exception as e:
                        logger.warning(f"Error checking directory {d}: {e}")
                        continue
            except Exception as e:
                logger.error(f"Error listing session directory: {e}")
                return []
            
            # Sort by name instead of modification time to avoid comparison issues
            try:
                analysis_dirs.sort(key=lambda d: d.name)
            except Exception as e:
                logger.error(f"Error sorting analysis directories: {e}")
                # Don't sort if there's an issue
            
            for i in range(len(analysis_dirs)):
                analysis_dir = analysis_dirs[i]
                analysis_number = i + 1
                try:
                    logger.debug(f"Processing analysis directory {analysis_number}: {analysis_dir.name}")
                    mtime = analysis_dir.stat().st_mtime
                    safe_mtime = mtime if mtime is not None else 0
                    formatted_time = datetime.fromtimestamp(safe_mtime).strftime('%Y-%m-%d %H:%M:%S') if safe_mtime > 0 else 'Unknown'
                except (OSError, AttributeError, ValueError):
                    safe_mtime = 0
                    formatted_time = 'Unknown'

                try:
                    logger.debug("Getting file count...")
                    file_list = []
                    try:
                        file_list = list(analysis_dir.glob('*'))
                    except Exception as e:
                        logger.warning(f"Error globbing files in {analysis_dir.name}: {e}")
                        file_list = []
                    
                    file_count = 0
                    has_plots = False
                    has_tables = False
                    
                    for f in file_list:
                        try:
                            if f.is_file() and f.name != 'venv':
                                file_count += 1
                                if f.suffix.lower() in ['.png', '.jpg', '.jpeg', '.svg']:
                                    has_plots = True
                                elif f.suffix.lower() in ['.csv', '.xlsx']:
                                    has_tables = True
                        except Exception as e:
                            logger.warning(f"Error checking file {f}: {e}")
                            continue
                    
                    logger.debug("Checking for conclusions...")
                    try:
                        has_conclusions = (analysis_dir / 'conclusions.txt').exists()
                    except Exception as e:
                        logger.warning(f"Error checking conclusions file: {e}")
                        has_conclusions = False
                        
                except Exception as e:
                    logger.error(f"Error processing files in {analysis_dir.name}: {e}")
                    file_count = 0
                    has_plots = False
                    has_tables = False
                    has_conclusions = False

                analysis_info = {
                    'analysis_number': analysis_number,
                    'analysis_id': analysis_dir.name,
                    'timestamp': safe_mtime,
                    'formatted_time': formatted_time,
                    'file_count': file_count,
                    'has_plots': has_plots,
                    'has_tables': has_tables,
                    'has_conclusions': has_conclusions
                }
                
                # Try to extract the user query from the analysis script
                analysis_script = analysis_dir / 'analysis.py'
                if analysis_script.exists():
                    try:
                        with open(analysis_script, 'r', encoding='utf-8') as f:
                            script_content = f.read()
                            # Look for the query comment in the script
                            for line in script_content.split('\n')[:10]:  # Check first 10 lines
                                if 'Query:' in line:
                                    analysis_info['user_query'] = line.split('Query:')[-1].strip()
                                    break
                            else:
                                analysis_info['user_query'] = 'Analysis script'
                    except Exception as e:
                        logger.warning(f"Could not read analysis script {analysis_script}: {str(e)}")
                        analysis_info['user_query'] = 'Unknown query'
                else:
                    analysis_info['user_query'] = 'No script found'
                
                analysis_history.append(analysis_info)
            
            return analysis_history
            
        except Exception as e:
            logger.error(f"Error getting analysis history for session {session_id}: {str(e)}")
            return []
    
    def get_analysis_files(self, session_id: str, analysis_id: str) -> List[Dict[str, Any]]:
        """Get files for a specific analysis"""
        try:
            session_dir = Path(self.config.OUTPUT_DIR) / session_id
            analysis_dir = session_dir / analysis_id
            
            if not analysis_dir.exists() or not analysis_dir.is_dir():
                return []
            
            generated_files = []
            
            for file_path in analysis_dir.glob("*"):
                if file_path.is_file() and file_path.name != 'venv':
                    file_type = self._determine_file_type(file_path)
                    
                    file_info = {
                        'name': file_path.name,
                        'type': file_type,
                        'description': self._get_file_description(file_path, file_type),
                        'downloadable': True,
                        'path': str(file_path),
                        'relative_path': f"{analysis_dir.name}/{file_path.name}"
                    }
                    
                    # For text files, include content preview
                    if file_path.suffix.lower() in ['.txt', '.log', '.csv', '.json', '.py']:
                        try:
                            with open(file_path, 'r', encoding='utf-8') as f:
                                content = f.read()
                                # For scripts, provide more content for display
                                if file_path.suffix.lower() == '.py':
                                    file_info['content'] = content  # Full content for scripts
                                    file_info['content_preview'] = content[:5000] + '...' if len(content) > 5000 else content
                                else:
                                    file_info['content'] = content[:2000] if len(content) > 2000 else content
                                    file_info['content_preview'] = content[:500] + '...' if len(content) > 500 else content
                        except Exception as e:
                            logger.warning(f"Could not read file {file_path}: {str(e)}")
                            file_info['content'] = 'Content unavailable'
                    
                    generated_files.append(file_info)
            
            return generated_files
            
        except Exception as e:
            logger.error(f"Error getting analysis files for session {session_id}, analysis {analysis_id}: {str(e)}")
            return []
    
    def _collect_previous_analysis_context(self, session_id: str, selected_analyses: List[str] = None) -> Dict[str, Any]:
        """Collect context from previous analyses for iterative refinement"""
        try:
            session_dir = Path(self.config.OUTPUT_DIR) / session_id
            if not session_dir.exists():
                return {'analyses': [], 'summary': 'No previous analyses found.', 'count': 0}
            
            # Get all analysis directories
            analysis_dirs = [d for d in session_dir.iterdir() if d.is_dir() and d.name.startswith('analysis_')]
            
            if not analysis_dirs:
                return {'analyses': [], 'summary': 'No previous analyses found.', 'count': 0}
            
            # Sort by creation time (oldest first)
            def safe_mtime_sort2(d):
                try:
                    mtime = d.stat().st_mtime
                    return mtime if mtime is not None else 0
                except (OSError, AttributeError):
                    return 0
            analysis_dirs.sort(key=safe_mtime_sort2)
            
            # Filter by selected analyses if specified
            if selected_analyses:
                analysis_dirs = [d for d in analysis_dirs if d.name in selected_analyses]
                if not analysis_dirs:
                    return {'analyses': [], 'summary': 'Selected analyses not found.', 'count': 0}
            else:
                # If no specific selection, exclude the most recent (current) analysis
                analysis_dirs = analysis_dirs[:-1]
            
            previous_analyses = []
            
            for i, analysis_dir in enumerate(analysis_dirs, 1):
                try:
                    mtime = analysis_dir.stat().st_mtime
                    safe_mtime = mtime if mtime is not None else 0
                    formatted_time = datetime.fromtimestamp(safe_mtime).strftime('%Y-%m-%d %H:%M:%S') if safe_mtime > 0 else 'Unknown'
                except (OSError, AttributeError, ValueError):
                    formatted_time = 'Unknown'

                analysis_context = {
                    'analysis_number': i,
                    'analysis_id': analysis_dir.name,
                    'timestamp': formatted_time
                }
                
                # Extract user query from analysis script
                analysis_script = analysis_dir / 'analysis.py'
                if analysis_script.exists():
                    try:
                        with open(analysis_script, 'r', encoding='utf-8') as f:
                            script_content = f.read()
                            analysis_context['script_content'] = script_content
                            
                            # Extract user query from script comments
                            for line in script_content.split('\n')[:15]:
                                if 'Query:' in line or 'User Query:' in line:
                                    analysis_context['user_query'] = line.split(':')[-1].strip().strip('"\'')
                                    break
                            else:
                                analysis_context['user_query'] = f'Analysis {i}'
                    except Exception as e:
                        logger.warning(f"Could not read analysis script {analysis_script}: {str(e)}")
                        analysis_context['script_content'] = 'Script not available'
                        analysis_context['user_query'] = f'Analysis {i}'
                
                # Extract conclusions
                conclusions_file = analysis_dir / 'conclusions.txt'
                if conclusions_file.exists():
                    try:
                        with open(conclusions_file, 'r', encoding='utf-8') as f:
                            analysis_context['conclusions'] = f.read()
                    except Exception as e:
                        logger.warning(f"Could not read conclusions {conclusions_file}: {str(e)}")
                        analysis_context['conclusions'] = 'Conclusions not available'
                
                # Extract key results from output files
                results_summary = []
                for file_path in analysis_dir.glob('*'):
                    if file_path.is_file() and file_path.suffix.lower() in ['.csv', '.txt', '.json']:
                        if file_path.name.startswith(('table_', 'results_', 'summary_')):
                            try:
                                with open(file_path, 'r', encoding='utf-8') as f:
                                    content = f.read()[:1000]  # First 1000 chars
                                    results_summary.append({
                                        'filename': file_path.name,
                                        'preview': content
                                    })
                            except Exception as e:
                                logger.warning(f"Could not read result file {file_path}: {str(e)}")
                
                analysis_context['results_summary'] = results_summary
                
                # List of generated plots
                plots = [f.name for f in analysis_dir.glob('*') if f.suffix.lower() in ['.png', '.jpg', '.jpeg', '.svg']]
                analysis_context['plots_generated'] = plots
                
                previous_analyses.append(analysis_context)
            
            # Create summary of analysis progression
            summary_parts = []
            if selected_analyses:
                summary_parts.append(f"Using context from {len(previous_analyses)} selected previous analyses:")
            else:
                summary_parts.append(f"Session has {len(previous_analyses)} previous analysis iterations:")
            
            for analysis in previous_analyses:
                summary_parts.append(f"\n{analysis['analysis_number']}. {analysis['user_query']} ({analysis['timestamp']})")
                if analysis.get('conclusions'):
                    # Include first 200 chars of conclusions
                    conclusions_preview = analysis['conclusions'][:200] + '...' if len(analysis['conclusions']) > 200 else analysis['conclusions']
                    summary_parts.append(f"   Key findings: {conclusions_preview}")
                if analysis.get('plots_generated'):
                    summary_parts.append(f"   Generated plots: {', '.join(analysis['plots_generated'])}")
            
            context_type = "selected" if selected_analyses else "all previous"
            return {
                'analyses': previous_analyses,
                'summary': '\n'.join(summary_parts),
                'count': len(previous_analyses),
                'type': context_type
            }
            
        except Exception as e:
            logger.error(f"Error collecting previous analysis context for session {session_id}: {str(e)}")
            return {'analyses': [], 'summary': f'Error collecting context: {str(e)}', 'count': 0}
    
    def _determine_file_type(self, file_path: Path) -> str:
        """Determine the type of a file based on its extension and name"""
        suffix = file_path.suffix.lower()
        name = file_path.name.lower()
        
        if suffix in ['.png', '.jpg', '.jpeg', '.svg', '.gif']:
            return 'plot'
        elif suffix == '.py':
            return 'script'
        elif name == 'requirements.txt':
            return 'requirements'
        elif suffix in ['.csv', '.xlsx', '.xls']:
            return 'table'
        elif suffix in ['.txt', '.md']:
            return 'text'
        elif suffix in ['.log']:
            return 'log'
        elif suffix in ['.json']:
            return 'data'
        else:
            return 'output'
    
    def _get_file_description(self, file_path: Path, file_type: str) -> str:
        """Get a human-readable description for a file"""
        name = file_path.name.lower()
        
        if file_type == 'script':
            return 'Generated Python analysis script'
        elif file_type == 'requirements':
            return 'Python package requirements'
        elif file_type == 'plot':
            return 'Generated visualization'
        elif file_type == 'table':
            return 'Data table or results'
        elif name.startswith('conclusion'):
            return 'Analysis conclusions and insights'
        elif name.startswith('table_'):
            return 'Statistical results table'
        elif name.startswith('plot_'):
            return 'Analysis visualization'
        elif file_type == 'log':
            return 'Execution log'
        else:
            return f'Generated {file_type}'
    
    def generate_interpretation(self, session_id: str) -> Dict[str, Any]:
        """Generate interpretation of analysis results"""
        try:
            session = self.database_manager.get_session(session_id)
            results = self.database_manager.get_session_results(session_id)
            
            # Get latest analysis results
            analysis_results = [r for r in results if r.result_type == 'analysis_results']
            if not analysis_results:
                return {
                    'success': False,
                    'error': 'No analysis results found to interpret'
                }
            
            latest_results = analysis_results[-1].result_data
            
            # Generate interpretation using LLM
            interpretation_result = self.statistical_agent.interpret_results(
                latest_results, session.analysis_config, session.description
            )
            
            if interpretation_result['success']:
                # Store interpretation as result
                interp_result = AnalysisResult(
                    result_id=str(uuid.uuid4()),
                    session_id=session_id,
                    step_id="interpretation",
                    result_type="interpretation",
                    result_data={
                        'interpretation': interpretation_result['interpretation'],
                        'key_findings': interpretation_result.get('key_findings', []),
                        'recommendations': interpretation_result.get('recommendations', [])
                    }
                )
                self.database_manager.add_analysis_result(interp_result)
            
            return interpretation_result
            
        except Exception as e:
            logger.error(f"Error generating interpretation for session {session_id}: {str(e)}")
            return {
                'success': False,
                'error': str(e)
            }
    
    def _store_session_data(self, session_id: str, df: pd.DataFrame):
        """Store session data temporarily (in production, use proper storage)"""
        try:
            data_path = self.config.SESSIONS_FOLDER / f"{session_id}_data.pkl"
            df.to_pickle(data_path)
        except Exception as e:
            logger.error(f"Error storing session data: {str(e)}")
    
    def _load_session_data(self, session_id: str) -> Optional[pd.DataFrame]:
        """Load session data"""
        try:
            data_path = self.config.SESSIONS_FOLDER / f"{session_id}_data.pkl"
            if data_path.exists():
                return pd.read_pickle(data_path)
        except Exception as e:
            logger.error(f"Error loading session data: {str(e)}")
        return None
    
    def _get_latest_data_summary(self, session_id: str) -> Dict[str, Any]:
        """Get latest data summary for session"""
        try:
            logger.debug("Getting session results for data summary...")
            results = self.database_manager.get_session_results(session_id)
            logger.debug(f"Found {len(results)} results")
            data_summary_results = [r for r in results if r.result_type == 'data_summary']
            logger.debug(f"Found {len(data_summary_results)} data summary results")
            
            if data_summary_results:
                return data_summary_results[-1].result_data
            return {}
        except Exception as e:
            logger.error(f"Error in _get_latest_data_summary: {e}")
            return {}
    
    def _get_next_step_number(self, session_id: str) -> int:
        """Get next step number for session"""
        steps = self.database_manager.get_session_steps(session_id)
        return len(steps) + 1
    
    def get_recent_sessions(self, user_id: str = "default", limit: int = 10) -> List[Dict[str, Any]]:
        """Get recent sessions for user"""
        try:
            sessions = self.database_manager.get_recent_sessions(user_id, limit)
            return [
                {
                    'session_id': session.session_id,
                    'title': session.title,
                    'description': session.description,
                    'status': session.status.value,
                    'created_at': session.created_at.isoformat() if session.created_at else None,
                    'updated_at': session.updated_at.isoformat() if session.updated_at else None
                }
                for session in sessions
            ]
        except Exception as e:
            logger.error(f"Error getting recent sessions: {str(e)}")
            return []

    def delete_session(self, session_id: str) -> bool:
        """Delete session and all associated data"""
        try:
            # Clean up all session files
            self.agent_executor.cleanup_session(session_id)
            
            # Delete from database
            session = self.database_manager.get_session(session_id)
            if session:
                # Delete related data
                steps = self.database_manager.get_session_steps(session_id)
                for step in steps:
                    self.database_manager.delete_step(step.step_id)
                
                results = self.database_manager.get_session_results(session_id)
                for result in results:
                    self.database_manager.delete_result(result.result_id)
                
                # Delete session
                success = self.database_manager.delete_session(session_id)
                
                # Clean up any uploaded data files
                try:
                    import glob
                    upload_pattern = str(self.config.UPLOAD_FOLDER / f"*{session_id}*")
                    for file_path in glob.glob(upload_pattern):
                        os.remove(file_path)
                        logger.info(f"Removed uploaded file: {file_path}")
                except Exception as file_cleanup_error:
                    logger.warning(f"Could not clean up uploaded files: {str(file_cleanup_error)}")
                
                return success
            
            return False
            
        except Exception as e:
            logger.error(f"Error deleting session {session_id}: {str(e)}")
            return False
    
    def cleanup_orphaned_files(self) -> Dict[str, Any]:
        """Clean up orphaned files and directories from deleted sessions"""
        try:
            # Get all active session IDs from database
            all_sessions = self.database_manager.get_all_sessions()
            active_session_ids = {session.session_id for session in all_sessions}
            
            cleanup_results = {
                'orphaned_dirs_removed': 0,
                'orphaned_files_removed': 0,
                'total_size_freed_mb': 0,
                'errors': []
            }
            
            # Directories to check for orphaned session data
            directories_to_check = [
                self.config.OUTPUT_DIR,
                self.config.GENERATED_SCRIPTS_FOLDER,
                self.config.SANDBOX_FOLDER,
                self.config.SESSIONS_FOLDER,
                self.config.REPORTS_FOLDER
            ]
            
            # Clean up orphaned session directories
            for directory in directories_to_check:
                if not directory.exists():
                    continue
                    
                for item in directory.iterdir():
                    if item.is_dir():
                        # Check if this looks like a session ID (UUID format)
                        if len(item.name) == 36 and item.name.count('-') == 4:
                            if item.name not in active_session_ids:
                                try:
                                    # Calculate size before deletion
                                    size = sum(f.stat().st_size for f in item.rglob('*') if f.is_file())
                                    cleanup_results['total_size_freed_mb'] += size / (1024 * 1024)
                                    
                                    shutil.rmtree(item)
                                    cleanup_results['orphaned_dirs_removed'] += 1
                                    logger.info(f"Removed orphaned directory: {item}")
                                except Exception as e:
                                    cleanup_results['errors'].append(f"Could not remove {item}: {str(e)}")
            
            # Clean up orphaned upload files
            if self.config.UPLOAD_FOLDER.exists():
                for file_path in self.config.UPLOAD_FOLDER.iterdir():
                    if file_path.is_file():
                        # Check if filename contains any active session ID
                        file_has_active_session = False
                        for session_id in active_session_ids:
                            if session_id in file_path.name:
                                file_has_active_session = True
                                break
                        
                        # If no active session found and file looks like session data
                        if not file_has_active_session and ('_' in file_path.name or len(file_path.name) > 30):
                            try:
                                size = file_path.stat().st_size
                                cleanup_results['total_size_freed_mb'] += size / (1024 * 1024)
                                
                                os.remove(file_path)
                                cleanup_results['orphaned_files_removed'] += 1
                                logger.info(f"Removed orphaned upload file: {file_path.name}")
                            except Exception as e:
                                cleanup_results['errors'].append(f"Could not remove {file_path}: {str(e)}")
            
            cleanup_results['total_size_freed_mb'] = round(cleanup_results['total_size_freed_mb'], 2)
            
            return {
                'success': True,
                'results': cleanup_results
            }
            
        except Exception as e:
            logger.error(f"Error during orphaned files cleanup: {str(e)}")
            return {
                'success': False,
                'error': str(e)
            }
    
    def cleanup_old_sessions(self, days_old: int = 7):
        """Clean up sessions older than specified days"""
        try:
            cutoff_date = datetime.now() - timedelta(days=days_old)
            
            # Get all sessions
            all_sessions = self.database_manager.get_all_sessions()
            
            cleaned_count = 0
            for session in all_sessions:
                if session.created_at < cutoff_date:
                    logger.info(f"Cleaning up old session: {session.session_id} (created: {session.created_at})")
                    if self.delete_session(session.session_id):
                        cleaned_count += 1
            
            logger.info(f"Cleaned up {cleaned_count} old sessions")
            
            # Also run orphaned file cleanup as part of regular maintenance
            if cleaned_count > 0:
                try:
                    orphaned_result = self.cleanup_orphaned_files()
                    if orphaned_result['success']:
                        results = orphaned_result['results']
                        if results['orphaned_dirs_removed'] > 0 or results['orphaned_files_removed'] > 0:
                            logger.info(f"Additional orphaned cleanup: {results['orphaned_dirs_removed']} dirs, {results['orphaned_files_removed']} files, {results['total_size_freed_mb']} MB freed")
                except Exception as orphaned_error:
                    logger.warning(f"Orphaned cleanup warning: {str(orphaned_error)}")
            
            return cleaned_count
            
        except Exception as e:
            logger.error(f"Error during periodic cleanup: {str(e)}")
            return 0

Parameters

Name Type Default Kind
bases - -

Parameter Details

bases: Parameter of type

Return Value

Returns unspecified type

Class Interface

Methods

__init__(self, config)

Purpose: Internal method: init

Parameters:

  • config: Type: Config

Returns: None

create_analysis_session(self, title, description, user_id) -> str

Purpose: Create new analysis session

Parameters:

  • title: Type: str
  • description: Type: str
  • user_id: Type: str

Returns: Returns str

load_data_for_session(self, session_id, data_source) -> Dict[str, Any]

Purpose: Load data for analysis session

Parameters:

  • session_id: Type: str
  • data_source: Type: DataSource

Returns: Returns Dict[str, Any]

load_data_for_session_direct(self, session_id, df, metadata) -> Dict[str, Any]

Purpose: Load data directly from dataframe for enhanced workflows

Parameters:

  • session_id: Type: str
  • df: Type: pd.DataFrame
  • metadata: Type: Dict[str, Any]

Returns: Returns Dict[str, Any]

process_user_query(self, session_id, user_query) -> Dict[str, Any]

Purpose: Process natural language query and suggest analysis

Parameters:

  • session_id: Type: str
  • user_query: Type: str

Returns: Returns Dict[str, Any]

generate_and_execute_analysis(self, session_id, analysis_config, user_query, model, include_previous_context, selected_analyses) -> Dict[str, Any]

Purpose: Generate and execute statistical analysis script with optional previous context

Parameters:

  • session_id: Type: str
  • analysis_config: Type: AnalysisConfiguration
  • user_query: Type: str
  • model: Type: str
  • include_previous_context: Type: bool
  • selected_analyses: Type: List[str]

Returns: Returns Dict[str, Any]

debug_and_retry_analysis(self, session_id, failed_step_id, model) -> Dict[str, Any]

Purpose: Debug failed analysis and retry execution using agent-based approach

Parameters:

  • session_id: Type: str
  • failed_step_id: Type: str
  • model: Type: str

Returns: Returns Dict[str, Any]

_execute_analysis_script(self, session_id, step_id, script, df, user_query, model) -> Dict[str, Any]

Purpose: Execute analysis script using agent-based executor

Parameters:

  • session_id: Type: str
  • step_id: Type: str
  • script: Type: str
  • df: Type: pd.DataFrame
  • user_query: Type: str
  • model: Type: str

Returns: Returns Dict[str, Any]

_store_execution_results(self, session_id, step_id, execution_result)

Purpose: Store execution results in database

Parameters:

  • session_id: Type: str
  • step_id: Type: str
  • execution_result: Type: Dict[str, Any]

Returns: None

get_session_summary(self, session_id) -> Dict[str, Any]

Purpose: Get comprehensive session summary

Parameters:

  • session_id: Type: str

Returns: Returns Dict[str, Any]

_get_generated_files_info(self, session_id) -> List[Dict[str, Any]]

Purpose: Get information about generated files for a session

Parameters:

  • session_id: Type: str

Returns: Returns List[Dict[str, Any]]

_get_execution_tracking(self, session_id) -> Dict[str, Any]

Purpose: Get execution tracking information for a session

Parameters:

  • session_id: Type: str

Returns: Returns Dict[str, Any]

_generate_content_summary(self, session_id) -> Dict[str, Any]

Purpose: Generate a summary of what content was generated during execution

Parameters:

  • session_id: Type: str

Returns: Returns Dict[str, Any]

get_session_results(self, session_id) -> List[Dict[str, Any]]

Purpose: Get session results - wrapper around database manager method

Parameters:

  • session_id: Type: str

Returns: Returns List[Dict[str, Any]]

get_analysis_history(self, session_id) -> List[Dict[str, Any]]

Purpose: Get analysis history for a session with file information

Parameters:

  • session_id: Type: str

Returns: Returns List[Dict[str, Any]]

get_analysis_files(self, session_id, analysis_id) -> List[Dict[str, Any]]

Purpose: Get files for a specific analysis

Parameters:

  • session_id: Type: str
  • analysis_id: Type: str

Returns: Returns List[Dict[str, Any]]

_collect_previous_analysis_context(self, session_id, selected_analyses) -> Dict[str, Any]

Purpose: Collect context from previous analyses for iterative refinement

Parameters:

  • session_id: Type: str
  • selected_analyses: Type: List[str]

Returns: Returns Dict[str, Any]

_determine_file_type(self, file_path) -> str

Purpose: Determine the type of a file based on its extension and name

Parameters:

  • file_path: Type: Path

Returns: Returns str

_get_file_description(self, file_path, file_type) -> str

Purpose: Get a human-readable description for a file

Parameters:

  • file_path: Type: Path
  • file_type: Type: str

Returns: Returns str

generate_interpretation(self, session_id) -> Dict[str, Any]

Purpose: Generate interpretation of analysis results

Parameters:

  • session_id: Type: str

Returns: Returns Dict[str, Any]

_store_session_data(self, session_id, df)

Purpose: Store session data temporarily (in production, use proper storage)

Parameters:

  • session_id: Type: str
  • df: Type: pd.DataFrame

Returns: None

_load_session_data(self, session_id) -> Optional[pd.DataFrame]

Purpose: Load session data

Parameters:

  • session_id: Type: str

Returns: Returns Optional[pd.DataFrame]

_get_latest_data_summary(self, session_id) -> Dict[str, Any]

Purpose: Get latest data summary for session

Parameters:

  • session_id: Type: str

Returns: Returns Dict[str, Any]

_get_next_step_number(self, session_id) -> int

Purpose: Get next step number for session

Parameters:

  • session_id: Type: str

Returns: Returns int

get_recent_sessions(self, user_id, limit) -> List[Dict[str, Any]]

Purpose: Get recent sessions for user

Parameters:

  • user_id: Type: str
  • limit: Type: int

Returns: Returns List[Dict[str, Any]]

delete_session(self, session_id) -> bool

Purpose: Delete session and all associated data

Parameters:

  • session_id: Type: str

Returns: Returns bool

cleanup_orphaned_files(self) -> Dict[str, Any]

Purpose: Clean up orphaned files and directories from deleted sessions

Returns: Returns Dict[str, Any]

cleanup_old_sessions(self, days_old)

Purpose: Clean up sessions older than specified days

Parameters:

  • days_old: Type: int

Returns: None

Required Imports

import uuid
import json
import logging
import os
import shutil

Usage Example

# Example usage:
# result = StatisticalAnalysisService(bases)

Similar Components

AI-powered semantic similarity - components with related functionality:

  • class StatisticalAnalysisService_v1 98.0% similar

    Main service for statistical analysis orchestration

    From: /tf/active/vicechatdev/smartstat/services.py
  • class DataAnalysisService 66.1% similar

    Service class for managing data analysis operations within document sections, integrating with SmartStat components for statistical analysis, dataset processing, and visualization generation.

    From: /tf/active/vicechatdev/vice_ai/data_analysis_service.py
  • class SmartStatService 64.6% similar

    Service for running SmartStat analysis sessions in Vice AI

    From: /tf/active/vicechatdev/vice_ai/smartstat_service.py
  • class DataSectionService 57.5% similar

    Service class for managing DataSection entities, providing CRUD operations and specialized update methods for analysis sessions, plots, and conclusions.

    From: /tf/active/vicechatdev/vice_ai/services.py
  • class StatisticalSession 57.2% similar

    A dataclass representing a statistical analysis session that tracks metadata, configuration, and status of data analysis operations.

    From: /tf/active/vicechatdev/vice_ai/smartstat_models.py
← Back to Browse