🔍 Code Extractor

class StatisticalAnalysisService_v1

Maturity: 26

Main service for statistical analysis orchestration

File:
/tf/active/vicechatdev/smartstat/services.py
Lines:
28 - 1128
Complexity:
moderate

Purpose

Main service for statistical analysis orchestration

Source Code

class StatisticalAnalysisService:
    """Main service for statistical analysis orchestration"""
    
    def __init__(self, config: Config):
        self.config = config
        self.database_manager = DatabaseManager(config.DATABASE_URL)
        self.data_processor = DataProcessor(config)
        self.statistical_agent = StatisticalAgent(config)
        self.agent_executor = AgentExecutor(config)  # New agent-based executor
        
    def create_analysis_session(self, title: str, description: str = "",
                              user_id: str = "default") -> str:
        """Create new analysis session"""
        try:
            session_id = str(uuid.uuid4())  # Generate session ID first
            logger.info(f"Creating session with ID: {session_id}")
            
            try:
                session = StatisticalSession(
                    session_id=session_id,
                    title=title,
                    description=description,
                    status=AnalysisStatus.PENDING,  # Use PENDING instead of CREATED
                    data_source=None,  # Don't create defaults in constructor
                    analysis_config=None  # Don't create defaults in constructor
                )
                logger.info(f"StatisticalSession object created successfully")
            except Exception as session_error:
                logger.error(f"Error creating StatisticalSession object: {str(session_error)}")
                raise session_error
            
            try:
                created_session_id = self.database_manager.create_session(session)
                logger.info(f"Created analysis session: {created_session_id}")
                return created_session_id
            except Exception as db_error:
                logger.error(f"Error calling database_manager.create_session: {str(db_error)}")
                raise db_error
            
        except Exception as e:
            logger.error(f"Error in create_analysis_session: {str(e)}")
            logger.error(f"Exception type: {type(e)}")
            # Simplified traceback to avoid formatting issues
            import traceback
            tb_lines = traceback.format_exception(type(e), e, e.__traceback__)
            logger.error(f"Traceback: {''.join(tb_lines)}")
            raise e
    
    def load_data_for_session(self, session_id: str, data_source: DataSource) -> Dict[str, Any]:
        """Load data for analysis session"""
        try:
            # Update session with data source
            session = self.database_manager.get_session(session_id)
            if not session:
                raise ValueError(f"Session {session_id} not found")
            
            session.data_source = data_source
            session.status = AnalysisStatus.PROCESSING
            session.updated_at = datetime.now()
            
            # Save updated session
            self.database_manager.update_session(session)
            
            # Load and process data
            if data_source.source_type == DataSourceType.SQL_WORKFLOW:
                # New SQL workflow: user query -> LLM generates SQL -> execute
                df, metadata = self.data_processor.load_data_from_sql_workflow(
                    user_query=data_source.user_query,
                    schema_file=data_source.schema_file,
                    connection_config=data_source.connection_config,
                    statistical_agent=self.statistical_agent
                )
            else:
                # Regular workflow (file upload or direct SQL)
                df, metadata = self.data_processor.load_data(data_source)
            
            # Generate data summary
            data_summary = self.data_processor.get_data_summary(df)
            
            # Create analysis step for data loading
            step = AnalysisStep(
                step_id=str(uuid.uuid4()),
                session_id=session_id,
                step_number=1,
                step_type="data_load",
                input_data={
                    'data_source': data_source.to_dict(),
                    'metadata': metadata
                },
                execution_output=json.dumps(data_summary, default=str),
                execution_success=True
            )
            
            self.database_manager.add_analysis_step(step)
            
            # Store data summary as result
            result = AnalysisResult(
                result_id=str(uuid.uuid4()),
                session_id=session_id,
                step_id=step.step_id,
                result_type="data_summary",
                result_data=data_summary,
                metadata=metadata
            )
            
            self.database_manager.add_analysis_result(result)
            
            # Store dataframe temporarily (in production, use proper data storage)
            self._store_session_data(session_id, df)
            
            return {
                'success': True,
                'data_summary': data_summary,
                'metadata': metadata,
                'step_id': step.step_id,
                'shape': df.shape,
                'columns': df.columns.tolist()
            }
            
        except Exception as e:
            logger.error(f"Error loading data for session {session_id}: {str(e)}")
            self.database_manager.update_session_status(session_id, AnalysisStatus.FAILED)
            return {
                'success': False,
                'error': str(e)
            }
    
    def process_user_query(self, session_id: str, user_query: str) -> Dict[str, Any]:
        """Process natural language query and suggest analysis"""
        try:
            # Get session and data
            session = self.database_manager.get_session(session_id)
            if not session:
                raise ValueError(f"Session {session_id} not found")
            
            df = self._load_session_data(session_id)
            if df is None:
                raise ValueError(f"No data loaded for session {session_id}")
            
            data_summary = self._get_latest_data_summary(session_id)
            
            # Interpret query using LLM
            interpretation_result = self.statistical_agent.interpret_user_query(
                user_query, data_summary, df.columns.tolist()
            )
            
            if not interpretation_result['success']:
                return interpretation_result
            
            # Update session with analysis configuration
            session.analysis_config = interpretation_result['suggested_config']
            session.updated_at = datetime.now()
            
            # Create analysis step for query interpretation
            # Prepare serializable interpretation result for storage
            serializable_result = {
                'success': interpretation_result['success'],
                'analysis_plan': interpretation_result['analysis_plan'],
                'suggested_config': interpretation_result['suggested_config'].to_dict(),
                'interpretation': interpretation_result['interpretation'],
                'confidence': interpretation_result['confidence']
            }
            
            step = AnalysisStep(
                step_id=str(uuid.uuid4()),
                session_id=session_id,
                step_number=self._get_next_step_number(session_id),
                step_type="query_interpretation",
                input_data={'user_query': user_query},
                execution_output=json.dumps(serializable_result),
                execution_success=True
            )
            
            self.database_manager.add_analysis_step(step)
            
            return {
                'success': True,
                'interpretation': interpretation_result,
                'step_id': step.step_id,
                'suggested_config': interpretation_result['suggested_config']  # Return the actual object, not dict
            }
            
        except Exception as e:
            logger.error(f"Error processing query for session {session_id}: {str(e)}")
            return {
                'success': False,
                'error': str(e)
            }
    
    def generate_and_execute_analysis(self, session_id: str, 
                                    analysis_config: AnalysisConfiguration = None,
                                    user_query: str = "",
                                    model: str = 'gpt-4o',
                                    include_previous_context: bool = True,
                                    selected_analyses: List[str] = None) -> Dict[str, Any]:
        """Generate and execute statistical analysis script with optional previous context"""
        try:
            # Get session and data
            session = self.database_manager.get_session(session_id)
            if not session:
                raise ValueError(f"Session {session_id} not found")
            
            if analysis_config is None:
                analysis_config = session.analysis_config
            
            df = self._load_session_data(session_id)
            if df is None:
                raise ValueError(f"No data loaded for session {session_id}")
            
            data_summary = self._get_latest_data_summary(session_id)
            
            # Collect previous analysis context if requested
            previous_context = None
            if include_previous_context:
                previous_context = self._collect_previous_analysis_context(session_id, selected_analyses)
            
            # Validate analysis configuration
            validation = self.data_processor.validate_columns_for_analysis(
                df, analysis_config.target_variables, analysis_config.grouping_variables
            )
            
            if not validation['valid']:
                return {
                    'success': False,
                    'error': f"Invalid analysis configuration: {'; '.join(validation['errors'])}"
                }
            
            # Generate analysis script with previous context
            script_result = self.statistical_agent.generate_analysis_script(
                analysis_config, data_summary, user_query, model, previous_context
            )
            
            if not script_result['success']:
                return script_result
            
            # Create step for script generation
            generation_step = AnalysisStep(
                step_id=str(uuid.uuid4()),
                session_id=session_id,
                step_number=self._get_next_step_number(session_id),
                step_type="script_generation",
                input_data={
                    'analysis_config': analysis_config.to_dict(),
                    'user_query': user_query
                },
                generated_script=script_result['script'],
                execution_success=True
            )
            
            self.database_manager.add_analysis_step(generation_step)
            
            # Execute script
            execution_result = self._execute_analysis_script(
                session_id, generation_step.step_id, script_result['script'], df, user_query
            )
            
            # Clean up old analysis files to prevent accumulation
            if self.config.AUTO_CLEANUP_ENABLED:
                try:
                    self.agent_executor.cleanup_old_analyses(session_id, keep_recent=self.config.KEEP_RECENT_ANALYSES)
                except Exception as cleanup_error:
                    logger.warning(f"Cleanup warning: {str(cleanup_error)}")
            
            return {
                'success': execution_result['success'],
                'script': script_result['script'],
                'explanation': script_result['explanation'],
                'execution_result': execution_result,
                'generation_step_id': generation_step.step_id
            }
            
        except Exception as e:
            logger.error(f"Error generating analysis for session {session_id}: {str(e)}")
            self.database_manager.update_session_status(session_id, AnalysisStatus.FAILED)
            return {
                'success': False,
                'error': str(e)
            }
    
    def debug_and_retry_analysis(self, session_id: str, failed_step_id: str) -> Dict[str, Any]:
        """Debug failed analysis and retry execution using agent-based approach"""
        try:
            # Get failed step
            steps = self.database_manager.get_session_steps(session_id)
            failed_step = next((s for s in steps if s.step_id == failed_step_id), None)
            
            if not failed_step:
                raise ValueError(f"Step {failed_step_id} not found")
            
            if failed_step.execution_success:
                return {
                    'success': False,
                    'error': "Step was already successful"
                }
            
            # Get session data
            df = self._load_session_data(session_id)
            session = self.database_manager.get_session(session_id)
            data_summary = self._get_latest_data_summary(session_id)
            
            # Count debugging iterations for this script
            debug_iteration = len([s for s in steps if s.step_type == "debugging" and 
                                 s.input_data.get('original_step_id') == failed_step_id]) + 1
            
            # Try to find the project directory for this session
            project_dir = Path(self.config.GENERATED_SCRIPTS_FOLDER) / session_id
            
            if project_dir.exists():
                # Retry execution with more iterations for debugging
                execution_result = self.agent_executor.execute_analysis_project(
                    project_dir=str(project_dir),
                    max_iterations=5  # Allow more debugging iterations
                )
            else:
                # Generate a new project if the old one doesn't exist
                logger.info(f"Project directory not found, regenerating for session {session_id}")
                
                project_result = self.agent_executor.generate_analysis_project(
                    session_id=session_id,
                    user_query=f"Retry statistical analysis with debugging",
                    data_summary=data_summary,
                    analysis_config=session.analysis_config,  # Pass the object
                    session_data=self._load_session_data(session_id)  # Pass the data
                )
                
                if not project_result['success']:
                    return project_result
                
                execution_result = self.agent_executor.execute_analysis_project(
                    project_dir=project_result['project_dir'],
                    max_iterations=5
                )
            
            # Create debugging step
            debug_step = AnalysisStep(
                step_id=str(uuid.uuid4()),
                session_id=session_id,
                step_number=self._get_next_step_number(session_id),
                step_type="debugging",
                input_data={
                    'original_step_id': failed_step_id,
                    'iteration': debug_iteration,
                    'original_error': failed_step.execution_error
                },
                generated_script="",  # Agent generates files separately
                execution_output=execution_result.get('output', '')
            )
            
            self.database_manager.add_analysis_step(debug_step)
            
            return {
                'success': execution_result['success'],
                'debug_explanation': f"Debug iteration {debug_iteration} completed",
                'execution_result': execution_result,
                'debug_step_id': debug_step.step_id,
                'iteration': debug_iteration,
                'generated_files': execution_result.get('generated_files', [])
            }
            
        except Exception as e:
            logger.error(f"Error debugging analysis for session {session_id}: {str(e)}")
            return {
                'success': False,
                'error': str(e)
            }
    
    def _execute_analysis_script(self, session_id: str, step_id: str, 
                               script: str, df: pd.DataFrame, user_query: str = "") -> Dict[str, Any]:
        """Execute analysis script using agent-based executor"""
        try:
            # Update session status
            self.database_manager.update_session_status(session_id, AnalysisStatus.PROCESSING)
            
            # Get session data summary for context
            session = self.database_manager.get_session(session_id)
            if not session:
                raise ValueError(f"Session {session_id} not found")
            
            data_summary = self._get_latest_data_summary(session_id)
            analysis_config = session.analysis_config
            
            # Store dataframe for the agent to access
            self._store_session_data(session_id, df)
            
            # Use agent executor to generate and run analysis project
            project_result = self.agent_executor.generate_analysis_project(
                session_id=session_id,
                user_query=user_query if user_query else f"Statistical analysis: {analysis_config.analysis_type.value if analysis_config else 'general'}",
                data_summary=data_summary,
                analysis_config=analysis_config,  # Pass the object, not the dict
                session_data=df  # Pass the dataframe directly
            )
            
            if not project_result['success']:
                return project_result
            
            # Execute the generated project
            execution_result = self.agent_executor.execute_analysis_project(
                project_dir=project_result['project_dir'],
                max_iterations=3
            )
            
            # Update step with execution results
            step = self.database_manager.get_session_steps(session_id)
            current_step = next((s for s in step if s.step_id == step_id), None)
            if current_step:
                current_step.execution_output = execution_result.get('output', '')
                current_step.execution_error = execution_result.get('error', '')
                current_step.execution_success = execution_result['success']
                
                # Store generated files info
                if execution_result.get('generated_files'):
                    current_step.metadata = current_step.metadata or {}
                    current_step.metadata['generated_files'] = execution_result['generated_files']
            
            if execution_result['success']:
                # Store successful results
                self._store_execution_results(session_id, step_id, execution_result)
                self.database_manager.update_session_status(session_id, AnalysisStatus.COMPLETED)
            else:
                self.database_manager.update_session_status(session_id, AnalysisStatus.DEBUGGING)
            
            return execution_result
            
        except Exception as e:
            logger.error(f"Error executing script for session {session_id}: {str(e)}")
            self.database_manager.update_session_status(session_id, AnalysisStatus.FAILED)
            return {
                'success': False,
                'error': str(e)
            }
    
    def _store_execution_results(self, session_id: str, step_id: str, 
                               execution_result: Dict[str, Any]):
        """Store execution results in database"""
        
        # Store main results
        if execution_result.get('results'):
            result = AnalysisResult(
                result_id=str(uuid.uuid4()),
                session_id=session_id,
                step_id=step_id,
                result_type="analysis_results",
                result_data=execution_result['results'],
                metadata={
                    'execution_time': execution_result['execution_time'],
                    'output': execution_result['output']
                }
            )
            self.database_manager.add_analysis_result(result)
        
        # Store plots
        if execution_result.get('plots'):
            for plot_path in execution_result['plots']:
                plot_result = AnalysisResult(
                    result_id=str(uuid.uuid4()),
                    session_id=session_id,
                    step_id=step_id,
                    result_type="plot",
                    result_data={'description': 'Generated plot'},
                    file_paths=[plot_path],
                    metadata={'plot_type': 'analysis_plot'}
                )
                self.database_manager.add_analysis_result(plot_result)
    
    def get_session_summary(self, session_id: str) -> Dict[str, Any]:
        """Get comprehensive session summary"""
        try:
            session = self.database_manager.get_session(session_id)
            if not session:
                return {'success': False, 'error': 'Session not found'}
            
            steps = self.database_manager.get_session_steps(session_id)
            results = self.database_manager.get_session_results(session_id)
            
            # Get data summary
            data_summary = self._get_latest_data_summary(session_id)
            
            # Organize results by type
            organized_results = {
                'analysis_results': [],
                'plots': [],
                'data_summary': data_summary
            }
            
            for result in results:
                if result.result_type == 'plot':
                    organized_results['plots'].append({
                        'file_path': result.file_paths[0] if result.file_paths else '',
                        'metadata': result.metadata
                    })
                elif result.result_type == 'analysis_results':
                    organized_results['analysis_results'].append(result.result_data)
            
            # Get generated files from agent executor
            generated_files = self._get_generated_files_info(session_id)
            
            # Get analysis history
            analysis_history = self.get_analysis_history(session_id)
            
            return {
                'success': True,
                'session': session.to_dict(),
                'steps': [step.to_dict() for step in steps],
                'results': organized_results,
                'generated_files': generated_files,
                'analysis_history': analysis_history,
                'summary': {
                    'total_steps': len(steps),
                    'successful_steps': len([s for s in steps if s.execution_success]),
                    'plots_generated': len(organized_results['plots']),
                    'status': session.status.value
                }
            }
            
        except Exception as e:
            logger.error(f"Error getting session summary {session_id}: {str(e)}")
            return {'success': False, 'error': str(e)}
    
    def _get_generated_files_info(self, session_id: str) -> List[Dict[str, Any]]:
        """Get information about generated files for a session"""
        try:
            session_dir = Path(self.config.OUTPUT_DIR) / session_id
            if not session_dir.exists():
                return []
            
            generated_files = []
            
            # Look for analysis directories (analysis_HASH format) within this session only
            analysis_dirs = [d for d in session_dir.iterdir() if d.is_dir() and d.name.startswith('analysis_')]
            
            if analysis_dirs:
                # Use the most recent analysis directory within this session
                latest_analysis_dir = max(analysis_dirs, key=lambda d: d.stat().st_mtime)
                
                # Process files in the analysis directory
                for file_path in latest_analysis_dir.glob("*"):
                    if file_path.is_file() and file_path.name != 'venv':  # Skip virtual environment
                        file_type = self._determine_file_type(file_path)
                        
                        file_info = {
                            'name': file_path.name,
                            'type': file_type,
                            'description': self._get_file_description(file_path, file_type),
                            'downloadable': True,
                            'path': str(file_path),
                            'relative_path': f"{latest_analysis_dir.name}/{file_path.name}"  # For URL construction
                        }
                        
                        # For text files, include content preview
                        if file_path.suffix.lower() in ['.txt', '.log', '.csv', '.json', '.py']:
                            try:
                                with open(file_path, 'r', encoding='utf-8') as f:
                                    content = f.read()
                                    file_info['content'] = content[:2000] if len(content) > 2000 else content
                                    file_info['content_preview'] = content[:500] + '...' if len(content) > 500 else content
                            except Exception as e:
                                logger.warning(f"Could not read file {file_path}: {str(e)}")
                                file_info['content'] = 'Content unavailable'
                        
                        generated_files.append(file_info)
            
            # Also check for files directly in session directory (legacy support)
            for file_path in session_dir.glob("*"):
                if file_path.is_file():
                    file_type = self._determine_file_type(file_path)
                    
                    file_info = {
                        'name': file_path.name,
                        'type': file_type,
                        'description': self._get_file_description(file_path, file_type),
                        'downloadable': True,
                        'path': str(file_path),
                        'relative_path': file_path.name  # For URL construction
                    }
                    
                    # For text files, include content preview
                    if file_path.suffix.lower() in ['.txt', '.log', '.csv', '.json', '.py']:
                        try:
                            with open(file_path, 'r', encoding='utf-8') as f:
                                content = f.read()
                                file_info['content'] = content[:2000] if len(content) > 2000 else content
                                file_info['content_preview'] = content[:500] + '...' if len(content) > 500 else content
                        except Exception as e:
                            logger.warning(f"Could not read file {file_path}: {str(e)}")
                            file_info['content'] = 'Content unavailable'
                    
                    generated_files.append(file_info)
            
            return generated_files
            
        except Exception as e:
            logger.error(f"Error getting generated files for session {session_id}: {str(e)}")
            return []
    
    def get_analysis_history(self, session_id: str) -> List[Dict[str, Any]]:
        """Get analysis history for a session with file information"""
        try:
            session_dir = Path(self.config.OUTPUT_DIR) / session_id
            if not session_dir.exists():
                return []
            
            analysis_history = []
            
            # Look for analysis directories
            analysis_dirs = [d for d in session_dir.iterdir() if d.is_dir() and d.name.startswith('analysis_')]
            
            # Sort by modification time (oldest first)
            analysis_dirs.sort(key=lambda d: d.stat().st_mtime)
            
            for i, analysis_dir in enumerate(analysis_dirs, 1):
                analysis_info = {
                    'analysis_number': i,
                    'analysis_id': analysis_dir.name,
                    'timestamp': analysis_dir.stat().st_mtime,
                    'formatted_time': datetime.fromtimestamp(analysis_dir.stat().st_mtime).strftime('%Y-%m-%d %H:%M:%S'),
                    'file_count': len([f for f in analysis_dir.glob('*') if f.is_file() and f.name != 'venv']),
                    'has_plots': any(f.suffix.lower() in ['.png', '.jpg', '.jpeg', '.svg'] for f in analysis_dir.glob('*')),
                    'has_tables': any(f.suffix.lower() in ['.csv', '.xlsx'] for f in analysis_dir.glob('*')),
                    'has_conclusions': (analysis_dir / 'conclusions.txt').exists()
                }
                
                # Try to extract the user query from the analysis script
                analysis_script = analysis_dir / 'analysis.py'
                if analysis_script.exists():
                    try:
                        with open(analysis_script, 'r', encoding='utf-8') as f:
                            script_content = f.read()
                            # Look for the query comment in the script
                            for line in script_content.split('\n')[:10]:  # Check first 10 lines
                                if 'Query:' in line:
                                    analysis_info['user_query'] = line.split('Query:')[-1].strip()
                                    break
                            else:
                                analysis_info['user_query'] = 'Analysis script'
                    except Exception as e:
                        logger.warning(f"Could not read analysis script {analysis_script}: {str(e)}")
                        analysis_info['user_query'] = 'Unknown query'
                else:
                    analysis_info['user_query'] = 'No script found'
                
                analysis_history.append(analysis_info)
            
            return analysis_history
            
        except Exception as e:
            logger.error(f"Error getting analysis history for session {session_id}: {str(e)}")
            return []
    
    def get_analysis_files(self, session_id: str, analysis_id: str) -> List[Dict[str, Any]]:
        """Get files for a specific analysis"""
        try:
            session_dir = Path(self.config.OUTPUT_DIR) / session_id
            analysis_dir = session_dir / analysis_id
            
            if not analysis_dir.exists() or not analysis_dir.is_dir():
                return []
            
            generated_files = []
            
            for file_path in analysis_dir.glob("*"):
                if file_path.is_file() and file_path.name != 'venv':
                    file_type = self._determine_file_type(file_path)
                    
                    file_info = {
                        'name': file_path.name,
                        'type': file_type,
                        'description': self._get_file_description(file_path, file_type),
                        'downloadable': True,
                        'path': str(file_path),
                        'relative_path': f"{analysis_dir.name}/{file_path.name}"
                    }
                    
                    # For text files, include content preview
                    if file_path.suffix.lower() in ['.txt', '.log', '.csv', '.json', '.py']:
                        try:
                            with open(file_path, 'r', encoding='utf-8') as f:
                                content = f.read()
                                file_info['content'] = content[:2000] if len(content) > 2000 else content
                                file_info['content_preview'] = content[:500] + '...' if len(content) > 500 else content
                        except Exception as e:
                            logger.warning(f"Could not read file {file_path}: {str(e)}")
                            file_info['content'] = 'Content unavailable'
                    
                    generated_files.append(file_info)
            
            return generated_files
            
        except Exception as e:
            logger.error(f"Error getting analysis files for session {session_id}, analysis {analysis_id}: {str(e)}")
            return []
    
    def _collect_previous_analysis_context(self, session_id: str, selected_analyses: List[str] = None) -> Dict[str, Any]:
        """Collect context from previous analyses for iterative refinement"""
        try:
            session_dir = Path(self.config.OUTPUT_DIR) / session_id
            if not session_dir.exists():
                return {'analyses': [], 'summary': 'No previous analyses found.', 'count': 0}
            
            # Get all analysis directories
            analysis_dirs = [d for d in session_dir.iterdir() if d.is_dir() and d.name.startswith('analysis_')]
            
            if not analysis_dirs:
                return {'analyses': [], 'summary': 'No previous analyses found.', 'count': 0}
            
            # Sort by creation time (oldest first)
            analysis_dirs.sort(key=lambda d: d.stat().st_mtime)
            
            # Filter by selected analyses if specified
            if selected_analyses:
                analysis_dirs = [d for d in analysis_dirs if d.name in selected_analyses]
                if not analysis_dirs:
                    return {'analyses': [], 'summary': 'Selected analyses not found.', 'count': 0}
            else:
                # If no specific selection, exclude the most recent (current) analysis
                analysis_dirs = analysis_dirs[:-1]
            
            previous_analyses = []
            
            for i, analysis_dir in enumerate(analysis_dirs, 1):
                analysis_context = {
                    'analysis_number': i,
                    'analysis_id': analysis_dir.name,
                    'timestamp': datetime.fromtimestamp(analysis_dir.stat().st_mtime).strftime('%Y-%m-%d %H:%M:%S')
                }
                
                # Extract user query from analysis script
                analysis_script = analysis_dir / 'analysis.py'
                if analysis_script.exists():
                    try:
                        with open(analysis_script, 'r', encoding='utf-8') as f:
                            script_content = f.read()
                            analysis_context['script_content'] = script_content
                            
                            # Extract user query from script comments
                            for line in script_content.split('\n')[:15]:
                                if 'Query:' in line or 'User Query:' in line:
                                    analysis_context['user_query'] = line.split(':')[-1].strip().strip('"\'')
                                    break
                            else:
                                analysis_context['user_query'] = f'Analysis {i}'
                    except Exception as e:
                        logger.warning(f"Could not read analysis script {analysis_script}: {str(e)}")
                        analysis_context['script_content'] = 'Script not available'
                        analysis_context['user_query'] = f'Analysis {i}'
                
                # Extract conclusions
                conclusions_file = analysis_dir / 'conclusions.txt'
                if conclusions_file.exists():
                    try:
                        with open(conclusions_file, 'r', encoding='utf-8') as f:
                            analysis_context['conclusions'] = f.read()
                    except Exception as e:
                        logger.warning(f"Could not read conclusions {conclusions_file}: {str(e)}")
                        analysis_context['conclusions'] = 'Conclusions not available'
                
                # Extract key results from output files
                results_summary = []
                for file_path in analysis_dir.glob('*'):
                    if file_path.is_file() and file_path.suffix.lower() in ['.csv', '.txt', '.json']:
                        if file_path.name.startswith(('table_', 'results_', 'summary_')):
                            try:
                                with open(file_path, 'r', encoding='utf-8') as f:
                                    content = f.read()[:1000]  # First 1000 chars
                                    results_summary.append({
                                        'filename': file_path.name,
                                        'preview': content
                                    })
                            except Exception as e:
                                logger.warning(f"Could not read result file {file_path}: {str(e)}")
                
                analysis_context['results_summary'] = results_summary
                
                # List of generated plots
                plots = [f.name for f in analysis_dir.glob('*') if f.suffix.lower() in ['.png', '.jpg', '.jpeg', '.svg']]
                analysis_context['plots_generated'] = plots
                
                previous_analyses.append(analysis_context)
            
            # Create summary of analysis progression
            summary_parts = []
            if selected_analyses:
                summary_parts.append(f"Using context from {len(previous_analyses)} selected previous analyses:")
            else:
                summary_parts.append(f"Session has {len(previous_analyses)} previous analysis iterations:")
            
            for analysis in previous_analyses:
                summary_parts.append(f"\n{analysis['analysis_number']}. {analysis['user_query']} ({analysis['timestamp']})")
                if analysis.get('conclusions'):
                    # Include first 200 chars of conclusions
                    conclusions_preview = analysis['conclusions'][:200] + '...' if len(analysis['conclusions']) > 200 else analysis['conclusions']
                    summary_parts.append(f"   Key findings: {conclusions_preview}")
                if analysis.get('plots_generated'):
                    summary_parts.append(f"   Generated plots: {', '.join(analysis['plots_generated'])}")
            
            context_type = "selected" if selected_analyses else "all previous"
            return {
                'analyses': previous_analyses,
                'summary': '\n'.join(summary_parts),
                'count': len(previous_analyses),
                'type': context_type
            }
            
        except Exception as e:
            logger.error(f"Error collecting previous analysis context for session {session_id}: {str(e)}")
            return {'analyses': [], 'summary': f'Error collecting context: {str(e)}', 'count': 0}
    
    def _determine_file_type(self, file_path: Path) -> str:
        """Determine the type of a file based on its extension and name"""
        suffix = file_path.suffix.lower()
        name = file_path.name.lower()
        
        if suffix in ['.png', '.jpg', '.jpeg', '.svg', '.gif']:
            return 'plot'
        elif suffix == '.py':
            return 'script'
        elif name == 'requirements.txt':
            return 'requirements'
        elif suffix in ['.csv', '.xlsx', '.xls']:
            return 'table'
        elif suffix in ['.txt', '.md']:
            return 'text'
        elif suffix in ['.log']:
            return 'log'
        elif suffix in ['.json']:
            return 'data'
        else:
            return 'output'
    
    def _get_file_description(self, file_path: Path, file_type: str) -> str:
        """Get a human-readable description for a file"""
        name = file_path.name.lower()
        
        if file_type == 'script':
            return 'Generated Python analysis script'
        elif file_type == 'requirements':
            return 'Python package requirements'
        elif file_type == 'plot':
            return 'Generated visualization'
        elif file_type == 'table':
            return 'Data table or results'
        elif name.startswith('conclusion'):
            return 'Analysis conclusions and insights'
        elif name.startswith('table_'):
            return 'Statistical results table'
        elif name.startswith('plot_'):
            return 'Analysis visualization'
        elif file_type == 'log':
            return 'Execution log'
        else:
            return f'Generated {file_type}'
    
    def generate_interpretation(self, session_id: str) -> Dict[str, Any]:
        """Generate interpretation of analysis results"""
        try:
            session = self.database_manager.get_session(session_id)
            results = self.database_manager.get_session_results(session_id)
            
            # Get latest analysis results
            analysis_results = [r for r in results if r.result_type == 'analysis_results']
            if not analysis_results:
                return {
                    'success': False,
                    'error': 'No analysis results found to interpret'
                }
            
            latest_results = analysis_results[-1].result_data
            
            # Generate interpretation using LLM
            interpretation_result = self.statistical_agent.interpret_results(
                latest_results, session.analysis_config, session.description
            )
            
            if interpretation_result['success']:
                # Store interpretation as result
                interp_result = AnalysisResult(
                    result_id=str(uuid.uuid4()),
                    session_id=session_id,
                    step_id="interpretation",
                    result_type="interpretation",
                    result_data={
                        'interpretation': interpretation_result['interpretation'],
                        'key_findings': interpretation_result.get('key_findings', []),
                        'recommendations': interpretation_result.get('recommendations', [])
                    }
                )
                self.database_manager.add_analysis_result(interp_result)
            
            return interpretation_result
            
        except Exception as e:
            logger.error(f"Error generating interpretation for session {session_id}: {str(e)}")
            return {
                'success': False,
                'error': str(e)
            }
    
    def _store_session_data(self, session_id: str, df: pd.DataFrame):
        """Store session data temporarily (in production, use proper storage)"""
        try:
            data_path = self.config.SESSIONS_FOLDER / f"{session_id}_data.pkl"
            df.to_pickle(data_path)
        except Exception as e:
            logger.error(f"Error storing session data: {str(e)}")
    
    def _load_session_data(self, session_id: str) -> Optional[pd.DataFrame]:
        """Load session data"""
        try:
            data_path = self.config.SESSIONS_FOLDER / f"{session_id}_data.pkl"
            if data_path.exists():
                return pd.read_pickle(data_path)
        except Exception as e:
            logger.error(f"Error loading session data: {str(e)}")
        return None
    
    def _get_latest_data_summary(self, session_id: str) -> Dict[str, Any]:
        """Get latest data summary for session"""
        results = self.database_manager.get_session_results(session_id)
        data_summary_results = [r for r in results if r.result_type == 'data_summary']
        
        if data_summary_results:
            return data_summary_results[-1].result_data
        return {}
    
    def _get_next_step_number(self, session_id: str) -> int:
        """Get next step number for session"""
        steps = self.database_manager.get_session_steps(session_id)
        return len(steps) + 1
    
    def get_recent_sessions(self, user_id: str = "default", limit: int = 10) -> List[Dict[str, Any]]:
        """Get recent sessions for user"""
        try:
            sessions = self.database_manager.get_recent_sessions(user_id, limit)
            return [
                {
                    'session_id': session.session_id,
                    'title': session.title,
                    'description': session.description,
                    'status': session.status.value,
                    'created_at': session.created_at.isoformat() if session.created_at else None,
                    'updated_at': session.updated_at.isoformat() if session.updated_at else None
                }
                for session in sessions
            ]
        except Exception as e:
            logger.error(f"Error getting recent sessions: {str(e)}")
            return []

    def delete_session(self, session_id: str) -> bool:
        """Delete session and all associated data"""
        try:
            # Clean up all session files
            self.agent_executor.cleanup_session(session_id)
            
            # Delete from database
            session = self.database_manager.get_session(session_id)
            if session:
                # Delete related data
                steps = self.database_manager.get_session_steps(session_id)
                for step in steps:
                    self.database_manager.delete_step(step.step_id)
                
                results = self.database_manager.get_session_results(session_id)
                for result in results:
                    self.database_manager.delete_result(result.result_id)
                
                # Delete session
                success = self.database_manager.delete_session(session_id)
                
                # Clean up any uploaded data files
                try:
                    import glob
                    upload_pattern = str(self.config.UPLOAD_FOLDER / f"*{session_id}*")
                    for file_path in glob.glob(upload_pattern):
                        os.remove(file_path)
                        logger.info(f"Removed uploaded file: {file_path}")
                except Exception as file_cleanup_error:
                    logger.warning(f"Could not clean up uploaded files: {str(file_cleanup_error)}")
                
                return success
            
            return False
            
        except Exception as e:
            logger.error(f"Error deleting session {session_id}: {str(e)}")
            return False
    
    def cleanup_orphaned_files(self) -> Dict[str, Any]:
        """Clean up orphaned files and directories from deleted sessions"""
        try:
            # Get all active session IDs from database
            all_sessions = self.database_manager.get_all_sessions()
            active_session_ids = {session.session_id for session in all_sessions}
            
            cleanup_results = {
                'orphaned_dirs_removed': 0,
                'orphaned_files_removed': 0,
                'total_size_freed_mb': 0,
                'errors': []
            }
            
            # Directories to check for orphaned session data
            directories_to_check = [
                self.config.OUTPUT_DIR,
                self.config.GENERATED_SCRIPTS_FOLDER,
                self.config.SANDBOX_FOLDER,
                self.config.SESSIONS_FOLDER,
                self.config.REPORTS_FOLDER
            ]
            
            # Clean up orphaned session directories
            for directory in directories_to_check:
                if not directory.exists():
                    continue
                    
                for item in directory.iterdir():
                    if item.is_dir():
                        # Check if this looks like a session ID (UUID format)
                        if len(item.name) == 36 and item.name.count('-') == 4:
                            if item.name not in active_session_ids:
                                try:
                                    # Calculate size before deletion
                                    size = sum(f.stat().st_size for f in item.rglob('*') if f.is_file())
                                    cleanup_results['total_size_freed_mb'] += size / (1024 * 1024)
                                    
                                    shutil.rmtree(item)
                                    cleanup_results['orphaned_dirs_removed'] += 1
                                    logger.info(f"Removed orphaned directory: {item}")
                                except Exception as e:
                                    cleanup_results['errors'].append(f"Could not remove {item}: {str(e)}")
            
            # Clean up orphaned upload files
            if self.config.UPLOAD_FOLDER.exists():
                for file_path in self.config.UPLOAD_FOLDER.iterdir():
                    if file_path.is_file():
                        # Check if filename contains any active session ID
                        file_has_active_session = False
                        for session_id in active_session_ids:
                            if session_id in file_path.name:
                                file_has_active_session = True
                                break
                        
                        # If no active session found and file looks like session data
                        if not file_has_active_session and ('_' in file_path.name or len(file_path.name) > 30):
                            try:
                                size = file_path.stat().st_size
                                cleanup_results['total_size_freed_mb'] += size / (1024 * 1024)
                                
                                os.remove(file_path)
                                cleanup_results['orphaned_files_removed'] += 1
                                logger.info(f"Removed orphaned upload file: {file_path.name}")
                            except Exception as e:
                                cleanup_results['errors'].append(f"Could not remove {file_path}: {str(e)}")
            
            cleanup_results['total_size_freed_mb'] = round(cleanup_results['total_size_freed_mb'], 2)
            
            return {
                'success': True,
                'results': cleanup_results
            }
            
        except Exception as e:
            logger.error(f"Error during orphaned files cleanup: {str(e)}")
            return {
                'success': False,
                'error': str(e)
            }
    
    def cleanup_old_sessions(self, days_old: int = 7):
        """Clean up sessions older than specified days"""
        try:
            cutoff_date = datetime.now() - timedelta(days=days_old)
            
            # Get all sessions
            all_sessions = self.database_manager.get_all_sessions()
            
            cleaned_count = 0
            for session in all_sessions:
                if session.created_at < cutoff_date:
                    logger.info(f"Cleaning up old session: {session.session_id} (created: {session.created_at})")
                    if self.delete_session(session.session_id):
                        cleaned_count += 1
            
            logger.info(f"Cleaned up {cleaned_count} old sessions")
            
            # Also run orphaned file cleanup as part of regular maintenance
            if cleaned_count > 0:
                try:
                    orphaned_result = self.cleanup_orphaned_files()
                    if orphaned_result['success']:
                        results = orphaned_result['results']
                        if results['orphaned_dirs_removed'] > 0 or results['orphaned_files_removed'] > 0:
                            logger.info(f"Additional orphaned cleanup: {results['orphaned_dirs_removed']} dirs, {results['orphaned_files_removed']} files, {results['total_size_freed_mb']} MB freed")
                except Exception as orphaned_error:
                    logger.warning(f"Orphaned cleanup warning: {str(orphaned_error)}")
            
            return cleaned_count
            
        except Exception as e:
            logger.error(f"Error during periodic cleanup: {str(e)}")
            return 0

Parameters

Name Type Default Kind
bases - -

Parameter Details

bases: Parameter of type

Return Value

Returns unspecified type

Class Interface

Methods

__init__(self, config)

Purpose: Internal method: init

Parameters:

  • config: Type: Config

Returns: None

create_analysis_session(self, title, description, user_id) -> str

Purpose: Create new analysis session

Parameters:

  • title: Type: str
  • description: Type: str
  • user_id: Type: str

Returns: Returns str

load_data_for_session(self, session_id, data_source) -> Dict[str, Any]

Purpose: Load data for analysis session

Parameters:

  • session_id: Type: str
  • data_source: Type: DataSource

Returns: Returns Dict[str, Any]

process_user_query(self, session_id, user_query) -> Dict[str, Any]

Purpose: Process natural language query and suggest analysis

Parameters:

  • session_id: Type: str
  • user_query: Type: str

Returns: Returns Dict[str, Any]

generate_and_execute_analysis(self, session_id, analysis_config, user_query, model, include_previous_context, selected_analyses) -> Dict[str, Any]

Purpose: Generate and execute statistical analysis script with optional previous context

Parameters:

  • session_id: Type: str
  • analysis_config: Type: AnalysisConfiguration
  • user_query: Type: str
  • model: Type: str
  • include_previous_context: Type: bool
  • selected_analyses: Type: List[str]

Returns: Returns Dict[str, Any]

debug_and_retry_analysis(self, session_id, failed_step_id) -> Dict[str, Any]

Purpose: Debug failed analysis and retry execution using agent-based approach

Parameters:

  • session_id: Type: str
  • failed_step_id: Type: str

Returns: Returns Dict[str, Any]

_execute_analysis_script(self, session_id, step_id, script, df, user_query) -> Dict[str, Any]

Purpose: Execute analysis script using agent-based executor

Parameters:

  • session_id: Type: str
  • step_id: Type: str
  • script: Type: str
  • df: Type: pd.DataFrame
  • user_query: Type: str

Returns: Returns Dict[str, Any]

_store_execution_results(self, session_id, step_id, execution_result)

Purpose: Store execution results in database

Parameters:

  • session_id: Type: str
  • step_id: Type: str
  • execution_result: Type: Dict[str, Any]

Returns: None

get_session_summary(self, session_id) -> Dict[str, Any]

Purpose: Get comprehensive session summary

Parameters:

  • session_id: Type: str

Returns: Returns Dict[str, Any]

_get_generated_files_info(self, session_id) -> List[Dict[str, Any]]

Purpose: Get information about generated files for a session

Parameters:

  • session_id: Type: str

Returns: Returns List[Dict[str, Any]]

get_analysis_history(self, session_id) -> List[Dict[str, Any]]

Purpose: Get analysis history for a session with file information

Parameters:

  • session_id: Type: str

Returns: Returns List[Dict[str, Any]]

get_analysis_files(self, session_id, analysis_id) -> List[Dict[str, Any]]

Purpose: Get files for a specific analysis

Parameters:

  • session_id: Type: str
  • analysis_id: Type: str

Returns: Returns List[Dict[str, Any]]

_collect_previous_analysis_context(self, session_id, selected_analyses) -> Dict[str, Any]

Purpose: Collect context from previous analyses for iterative refinement

Parameters:

  • session_id: Type: str
  • selected_analyses: Type: List[str]

Returns: Returns Dict[str, Any]

_determine_file_type(self, file_path) -> str

Purpose: Determine the type of a file based on its extension and name

Parameters:

  • file_path: Type: Path

Returns: Returns str

_get_file_description(self, file_path, file_type) -> str

Purpose: Get a human-readable description for a file

Parameters:

  • file_path: Type: Path
  • file_type: Type: str

Returns: Returns str

generate_interpretation(self, session_id) -> Dict[str, Any]

Purpose: Generate interpretation of analysis results

Parameters:

  • session_id: Type: str

Returns: Returns Dict[str, Any]

_store_session_data(self, session_id, df)

Purpose: Store session data temporarily (in production, use proper storage)

Parameters:

  • session_id: Type: str
  • df: Type: pd.DataFrame

Returns: None

_load_session_data(self, session_id) -> Optional[pd.DataFrame]

Purpose: Load session data

Parameters:

  • session_id: Type: str

Returns: Returns Optional[pd.DataFrame]

_get_latest_data_summary(self, session_id) -> Dict[str, Any]

Purpose: Get latest data summary for session

Parameters:

  • session_id: Type: str

Returns: Returns Dict[str, Any]

_get_next_step_number(self, session_id) -> int

Purpose: Get next step number for session

Parameters:

  • session_id: Type: str

Returns: Returns int

get_recent_sessions(self, user_id, limit) -> List[Dict[str, Any]]

Purpose: Get recent sessions for user

Parameters:

  • user_id: Type: str
  • limit: Type: int

Returns: Returns List[Dict[str, Any]]

delete_session(self, session_id) -> bool

Purpose: Delete session and all associated data

Parameters:

  • session_id: Type: str

Returns: Returns bool

cleanup_orphaned_files(self) -> Dict[str, Any]

Purpose: Clean up orphaned files and directories from deleted sessions

Returns: Returns Dict[str, Any]

cleanup_old_sessions(self, days_old)

Purpose: Clean up sessions older than specified days

Parameters:

  • days_old: Type: int

Returns: None

Required Imports

import uuid
import json
import logging
import os
import shutil

Usage Example

# Example usage:
# result = StatisticalAnalysisService(bases)

Similar Components

AI-powered semantic similarity - components with related functionality:

  • class StatisticalAnalysisService 98.0% similar

    Main service for statistical analysis orchestration

    From: /tf/active/vicechatdev/full_smartstat/services.py
  • class SmartStatService 65.9% similar

    Service for running SmartStat analysis sessions in Vice AI

    From: /tf/active/vicechatdev/vice_ai/smartstat_service.py
  • class DataAnalysisService 65.8% similar

    Service class for managing data analysis operations within document sections, integrating with SmartStat components for statistical analysis, dataset processing, and visualization generation.

    From: /tf/active/vicechatdev/vice_ai/data_analysis_service.py
  • class StatisticalAgent_v1 57.0% similar

    LLM-powered statistical analysis agent

    From: /tf/active/vicechatdev/full_smartstat/statistical_agent.py
  • class StatisticalSession 57.0% similar

    A dataclass representing a statistical analysis session that tracks metadata, configuration, and status of data analysis operations.

    From: /tf/active/vicechatdev/vice_ai/smartstat_models.py
← Back to Browse