class StatisticalAnalysisService_v1
Main service for statistical analysis orchestration
/tf/active/vicechatdev/smartstat/services.py
28 - 1128
moderate
Purpose
Main service for statistical analysis orchestration
Source Code
class StatisticalAnalysisService:
"""Main service for statistical analysis orchestration"""
def __init__(self, config: Config):
self.config = config
self.database_manager = DatabaseManager(config.DATABASE_URL)
self.data_processor = DataProcessor(config)
self.statistical_agent = StatisticalAgent(config)
self.agent_executor = AgentExecutor(config) # New agent-based executor
def create_analysis_session(self, title: str, description: str = "",
user_id: str = "default") -> str:
"""Create new analysis session"""
try:
session_id = str(uuid.uuid4()) # Generate session ID first
logger.info(f"Creating session with ID: {session_id}")
try:
session = StatisticalSession(
session_id=session_id,
title=title,
description=description,
status=AnalysisStatus.PENDING, # Use PENDING instead of CREATED
data_source=None, # Don't create defaults in constructor
analysis_config=None # Don't create defaults in constructor
)
logger.info(f"StatisticalSession object created successfully")
except Exception as session_error:
logger.error(f"Error creating StatisticalSession object: {str(session_error)}")
raise session_error
try:
created_session_id = self.database_manager.create_session(session)
logger.info(f"Created analysis session: {created_session_id}")
return created_session_id
except Exception as db_error:
logger.error(f"Error calling database_manager.create_session: {str(db_error)}")
raise db_error
except Exception as e:
logger.error(f"Error in create_analysis_session: {str(e)}")
logger.error(f"Exception type: {type(e)}")
# Simplified traceback to avoid formatting issues
import traceback
tb_lines = traceback.format_exception(type(e), e, e.__traceback__)
logger.error(f"Traceback: {''.join(tb_lines)}")
raise e
def load_data_for_session(self, session_id: str, data_source: DataSource) -> Dict[str, Any]:
"""Load data for analysis session"""
try:
# Update session with data source
session = self.database_manager.get_session(session_id)
if not session:
raise ValueError(f"Session {session_id} not found")
session.data_source = data_source
session.status = AnalysisStatus.PROCESSING
session.updated_at = datetime.now()
# Save updated session
self.database_manager.update_session(session)
# Load and process data
if data_source.source_type == DataSourceType.SQL_WORKFLOW:
# New SQL workflow: user query -> LLM generates SQL -> execute
df, metadata = self.data_processor.load_data_from_sql_workflow(
user_query=data_source.user_query,
schema_file=data_source.schema_file,
connection_config=data_source.connection_config,
statistical_agent=self.statistical_agent
)
else:
# Regular workflow (file upload or direct SQL)
df, metadata = self.data_processor.load_data(data_source)
# Generate data summary
data_summary = self.data_processor.get_data_summary(df)
# Create analysis step for data loading
step = AnalysisStep(
step_id=str(uuid.uuid4()),
session_id=session_id,
step_number=1,
step_type="data_load",
input_data={
'data_source': data_source.to_dict(),
'metadata': metadata
},
execution_output=json.dumps(data_summary, default=str),
execution_success=True
)
self.database_manager.add_analysis_step(step)
# Store data summary as result
result = AnalysisResult(
result_id=str(uuid.uuid4()),
session_id=session_id,
step_id=step.step_id,
result_type="data_summary",
result_data=data_summary,
metadata=metadata
)
self.database_manager.add_analysis_result(result)
# Store dataframe temporarily (in production, use proper data storage)
self._store_session_data(session_id, df)
return {
'success': True,
'data_summary': data_summary,
'metadata': metadata,
'step_id': step.step_id,
'shape': df.shape,
'columns': df.columns.tolist()
}
except Exception as e:
logger.error(f"Error loading data for session {session_id}: {str(e)}")
self.database_manager.update_session_status(session_id, AnalysisStatus.FAILED)
return {
'success': False,
'error': str(e)
}
def process_user_query(self, session_id: str, user_query: str) -> Dict[str, Any]:
"""Process natural language query and suggest analysis"""
try:
# Get session and data
session = self.database_manager.get_session(session_id)
if not session:
raise ValueError(f"Session {session_id} not found")
df = self._load_session_data(session_id)
if df is None:
raise ValueError(f"No data loaded for session {session_id}")
data_summary = self._get_latest_data_summary(session_id)
# Interpret query using LLM
interpretation_result = self.statistical_agent.interpret_user_query(
user_query, data_summary, df.columns.tolist()
)
if not interpretation_result['success']:
return interpretation_result
# Update session with analysis configuration
session.analysis_config = interpretation_result['suggested_config']
session.updated_at = datetime.now()
# Create analysis step for query interpretation
# Prepare serializable interpretation result for storage
serializable_result = {
'success': interpretation_result['success'],
'analysis_plan': interpretation_result['analysis_plan'],
'suggested_config': interpretation_result['suggested_config'].to_dict(),
'interpretation': interpretation_result['interpretation'],
'confidence': interpretation_result['confidence']
}
step = AnalysisStep(
step_id=str(uuid.uuid4()),
session_id=session_id,
step_number=self._get_next_step_number(session_id),
step_type="query_interpretation",
input_data={'user_query': user_query},
execution_output=json.dumps(serializable_result),
execution_success=True
)
self.database_manager.add_analysis_step(step)
return {
'success': True,
'interpretation': interpretation_result,
'step_id': step.step_id,
'suggested_config': interpretation_result['suggested_config'] # Return the actual object, not dict
}
except Exception as e:
logger.error(f"Error processing query for session {session_id}: {str(e)}")
return {
'success': False,
'error': str(e)
}
def generate_and_execute_analysis(self, session_id: str,
analysis_config: AnalysisConfiguration = None,
user_query: str = "",
model: str = 'gpt-4o',
include_previous_context: bool = True,
selected_analyses: List[str] = None) -> Dict[str, Any]:
"""Generate and execute statistical analysis script with optional previous context"""
try:
# Get session and data
session = self.database_manager.get_session(session_id)
if not session:
raise ValueError(f"Session {session_id} not found")
if analysis_config is None:
analysis_config = session.analysis_config
df = self._load_session_data(session_id)
if df is None:
raise ValueError(f"No data loaded for session {session_id}")
data_summary = self._get_latest_data_summary(session_id)
# Collect previous analysis context if requested
previous_context = None
if include_previous_context:
previous_context = self._collect_previous_analysis_context(session_id, selected_analyses)
# Validate analysis configuration
validation = self.data_processor.validate_columns_for_analysis(
df, analysis_config.target_variables, analysis_config.grouping_variables
)
if not validation['valid']:
return {
'success': False,
'error': f"Invalid analysis configuration: {'; '.join(validation['errors'])}"
}
# Generate analysis script with previous context
script_result = self.statistical_agent.generate_analysis_script(
analysis_config, data_summary, user_query, model, previous_context
)
if not script_result['success']:
return script_result
# Create step for script generation
generation_step = AnalysisStep(
step_id=str(uuid.uuid4()),
session_id=session_id,
step_number=self._get_next_step_number(session_id),
step_type="script_generation",
input_data={
'analysis_config': analysis_config.to_dict(),
'user_query': user_query
},
generated_script=script_result['script'],
execution_success=True
)
self.database_manager.add_analysis_step(generation_step)
# Execute script
execution_result = self._execute_analysis_script(
session_id, generation_step.step_id, script_result['script'], df, user_query
)
# Clean up old analysis files to prevent accumulation
if self.config.AUTO_CLEANUP_ENABLED:
try:
self.agent_executor.cleanup_old_analyses(session_id, keep_recent=self.config.KEEP_RECENT_ANALYSES)
except Exception as cleanup_error:
logger.warning(f"Cleanup warning: {str(cleanup_error)}")
return {
'success': execution_result['success'],
'script': script_result['script'],
'explanation': script_result['explanation'],
'execution_result': execution_result,
'generation_step_id': generation_step.step_id
}
except Exception as e:
logger.error(f"Error generating analysis for session {session_id}: {str(e)}")
self.database_manager.update_session_status(session_id, AnalysisStatus.FAILED)
return {
'success': False,
'error': str(e)
}
def debug_and_retry_analysis(self, session_id: str, failed_step_id: str) -> Dict[str, Any]:
"""Debug failed analysis and retry execution using agent-based approach"""
try:
# Get failed step
steps = self.database_manager.get_session_steps(session_id)
failed_step = next((s for s in steps if s.step_id == failed_step_id), None)
if not failed_step:
raise ValueError(f"Step {failed_step_id} not found")
if failed_step.execution_success:
return {
'success': False,
'error': "Step was already successful"
}
# Get session data
df = self._load_session_data(session_id)
session = self.database_manager.get_session(session_id)
data_summary = self._get_latest_data_summary(session_id)
# Count debugging iterations for this script
debug_iteration = len([s for s in steps if s.step_type == "debugging" and
s.input_data.get('original_step_id') == failed_step_id]) + 1
# Try to find the project directory for this session
project_dir = Path(self.config.GENERATED_SCRIPTS_FOLDER) / session_id
if project_dir.exists():
# Retry execution with more iterations for debugging
execution_result = self.agent_executor.execute_analysis_project(
project_dir=str(project_dir),
max_iterations=5 # Allow more debugging iterations
)
else:
# Generate a new project if the old one doesn't exist
logger.info(f"Project directory not found, regenerating for session {session_id}")
project_result = self.agent_executor.generate_analysis_project(
session_id=session_id,
user_query=f"Retry statistical analysis with debugging",
data_summary=data_summary,
analysis_config=session.analysis_config, # Pass the object
session_data=self._load_session_data(session_id) # Pass the data
)
if not project_result['success']:
return project_result
execution_result = self.agent_executor.execute_analysis_project(
project_dir=project_result['project_dir'],
max_iterations=5
)
# Create debugging step
debug_step = AnalysisStep(
step_id=str(uuid.uuid4()),
session_id=session_id,
step_number=self._get_next_step_number(session_id),
step_type="debugging",
input_data={
'original_step_id': failed_step_id,
'iteration': debug_iteration,
'original_error': failed_step.execution_error
},
generated_script="", # Agent generates files separately
execution_output=execution_result.get('output', '')
)
self.database_manager.add_analysis_step(debug_step)
return {
'success': execution_result['success'],
'debug_explanation': f"Debug iteration {debug_iteration} completed",
'execution_result': execution_result,
'debug_step_id': debug_step.step_id,
'iteration': debug_iteration,
'generated_files': execution_result.get('generated_files', [])
}
except Exception as e:
logger.error(f"Error debugging analysis for session {session_id}: {str(e)}")
return {
'success': False,
'error': str(e)
}
def _execute_analysis_script(self, session_id: str, step_id: str,
script: str, df: pd.DataFrame, user_query: str = "") -> Dict[str, Any]:
"""Execute analysis script using agent-based executor"""
try:
# Update session status
self.database_manager.update_session_status(session_id, AnalysisStatus.PROCESSING)
# Get session data summary for context
session = self.database_manager.get_session(session_id)
if not session:
raise ValueError(f"Session {session_id} not found")
data_summary = self._get_latest_data_summary(session_id)
analysis_config = session.analysis_config
# Store dataframe for the agent to access
self._store_session_data(session_id, df)
# Use agent executor to generate and run analysis project
project_result = self.agent_executor.generate_analysis_project(
session_id=session_id,
user_query=user_query if user_query else f"Statistical analysis: {analysis_config.analysis_type.value if analysis_config else 'general'}",
data_summary=data_summary,
analysis_config=analysis_config, # Pass the object, not the dict
session_data=df # Pass the dataframe directly
)
if not project_result['success']:
return project_result
# Execute the generated project
execution_result = self.agent_executor.execute_analysis_project(
project_dir=project_result['project_dir'],
max_iterations=3
)
# Update step with execution results
step = self.database_manager.get_session_steps(session_id)
current_step = next((s for s in step if s.step_id == step_id), None)
if current_step:
current_step.execution_output = execution_result.get('output', '')
current_step.execution_error = execution_result.get('error', '')
current_step.execution_success = execution_result['success']
# Store generated files info
if execution_result.get('generated_files'):
current_step.metadata = current_step.metadata or {}
current_step.metadata['generated_files'] = execution_result['generated_files']
if execution_result['success']:
# Store successful results
self._store_execution_results(session_id, step_id, execution_result)
self.database_manager.update_session_status(session_id, AnalysisStatus.COMPLETED)
else:
self.database_manager.update_session_status(session_id, AnalysisStatus.DEBUGGING)
return execution_result
except Exception as e:
logger.error(f"Error executing script for session {session_id}: {str(e)}")
self.database_manager.update_session_status(session_id, AnalysisStatus.FAILED)
return {
'success': False,
'error': str(e)
}
def _store_execution_results(self, session_id: str, step_id: str,
execution_result: Dict[str, Any]):
"""Store execution results in database"""
# Store main results
if execution_result.get('results'):
result = AnalysisResult(
result_id=str(uuid.uuid4()),
session_id=session_id,
step_id=step_id,
result_type="analysis_results",
result_data=execution_result['results'],
metadata={
'execution_time': execution_result['execution_time'],
'output': execution_result['output']
}
)
self.database_manager.add_analysis_result(result)
# Store plots
if execution_result.get('plots'):
for plot_path in execution_result['plots']:
plot_result = AnalysisResult(
result_id=str(uuid.uuid4()),
session_id=session_id,
step_id=step_id,
result_type="plot",
result_data={'description': 'Generated plot'},
file_paths=[plot_path],
metadata={'plot_type': 'analysis_plot'}
)
self.database_manager.add_analysis_result(plot_result)
def get_session_summary(self, session_id: str) -> Dict[str, Any]:
"""Get comprehensive session summary"""
try:
session = self.database_manager.get_session(session_id)
if not session:
return {'success': False, 'error': 'Session not found'}
steps = self.database_manager.get_session_steps(session_id)
results = self.database_manager.get_session_results(session_id)
# Get data summary
data_summary = self._get_latest_data_summary(session_id)
# Organize results by type
organized_results = {
'analysis_results': [],
'plots': [],
'data_summary': data_summary
}
for result in results:
if result.result_type == 'plot':
organized_results['plots'].append({
'file_path': result.file_paths[0] if result.file_paths else '',
'metadata': result.metadata
})
elif result.result_type == 'analysis_results':
organized_results['analysis_results'].append(result.result_data)
# Get generated files from agent executor
generated_files = self._get_generated_files_info(session_id)
# Get analysis history
analysis_history = self.get_analysis_history(session_id)
return {
'success': True,
'session': session.to_dict(),
'steps': [step.to_dict() for step in steps],
'results': organized_results,
'generated_files': generated_files,
'analysis_history': analysis_history,
'summary': {
'total_steps': len(steps),
'successful_steps': len([s for s in steps if s.execution_success]),
'plots_generated': len(organized_results['plots']),
'status': session.status.value
}
}
except Exception as e:
logger.error(f"Error getting session summary {session_id}: {str(e)}")
return {'success': False, 'error': str(e)}
def _get_generated_files_info(self, session_id: str) -> List[Dict[str, Any]]:
"""Get information about generated files for a session"""
try:
session_dir = Path(self.config.OUTPUT_DIR) / session_id
if not session_dir.exists():
return []
generated_files = []
# Look for analysis directories (analysis_HASH format) within this session only
analysis_dirs = [d for d in session_dir.iterdir() if d.is_dir() and d.name.startswith('analysis_')]
if analysis_dirs:
# Use the most recent analysis directory within this session
latest_analysis_dir = max(analysis_dirs, key=lambda d: d.stat().st_mtime)
# Process files in the analysis directory
for file_path in latest_analysis_dir.glob("*"):
if file_path.is_file() and file_path.name != 'venv': # Skip virtual environment
file_type = self._determine_file_type(file_path)
file_info = {
'name': file_path.name,
'type': file_type,
'description': self._get_file_description(file_path, file_type),
'downloadable': True,
'path': str(file_path),
'relative_path': f"{latest_analysis_dir.name}/{file_path.name}" # For URL construction
}
# For text files, include content preview
if file_path.suffix.lower() in ['.txt', '.log', '.csv', '.json', '.py']:
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
file_info['content'] = content[:2000] if len(content) > 2000 else content
file_info['content_preview'] = content[:500] + '...' if len(content) > 500 else content
except Exception as e:
logger.warning(f"Could not read file {file_path}: {str(e)}")
file_info['content'] = 'Content unavailable'
generated_files.append(file_info)
# Also check for files directly in session directory (legacy support)
for file_path in session_dir.glob("*"):
if file_path.is_file():
file_type = self._determine_file_type(file_path)
file_info = {
'name': file_path.name,
'type': file_type,
'description': self._get_file_description(file_path, file_type),
'downloadable': True,
'path': str(file_path),
'relative_path': file_path.name # For URL construction
}
# For text files, include content preview
if file_path.suffix.lower() in ['.txt', '.log', '.csv', '.json', '.py']:
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
file_info['content'] = content[:2000] if len(content) > 2000 else content
file_info['content_preview'] = content[:500] + '...' if len(content) > 500 else content
except Exception as e:
logger.warning(f"Could not read file {file_path}: {str(e)}")
file_info['content'] = 'Content unavailable'
generated_files.append(file_info)
return generated_files
except Exception as e:
logger.error(f"Error getting generated files for session {session_id}: {str(e)}")
return []
def get_analysis_history(self, session_id: str) -> List[Dict[str, Any]]:
"""Get analysis history for a session with file information"""
try:
session_dir = Path(self.config.OUTPUT_DIR) / session_id
if not session_dir.exists():
return []
analysis_history = []
# Look for analysis directories
analysis_dirs = [d for d in session_dir.iterdir() if d.is_dir() and d.name.startswith('analysis_')]
# Sort by modification time (oldest first)
analysis_dirs.sort(key=lambda d: d.stat().st_mtime)
for i, analysis_dir in enumerate(analysis_dirs, 1):
analysis_info = {
'analysis_number': i,
'analysis_id': analysis_dir.name,
'timestamp': analysis_dir.stat().st_mtime,
'formatted_time': datetime.fromtimestamp(analysis_dir.stat().st_mtime).strftime('%Y-%m-%d %H:%M:%S'),
'file_count': len([f for f in analysis_dir.glob('*') if f.is_file() and f.name != 'venv']),
'has_plots': any(f.suffix.lower() in ['.png', '.jpg', '.jpeg', '.svg'] for f in analysis_dir.glob('*')),
'has_tables': any(f.suffix.lower() in ['.csv', '.xlsx'] for f in analysis_dir.glob('*')),
'has_conclusions': (analysis_dir / 'conclusions.txt').exists()
}
# Try to extract the user query from the analysis script
analysis_script = analysis_dir / 'analysis.py'
if analysis_script.exists():
try:
with open(analysis_script, 'r', encoding='utf-8') as f:
script_content = f.read()
# Look for the query comment in the script
for line in script_content.split('\n')[:10]: # Check first 10 lines
if 'Query:' in line:
analysis_info['user_query'] = line.split('Query:')[-1].strip()
break
else:
analysis_info['user_query'] = 'Analysis script'
except Exception as e:
logger.warning(f"Could not read analysis script {analysis_script}: {str(e)}")
analysis_info['user_query'] = 'Unknown query'
else:
analysis_info['user_query'] = 'No script found'
analysis_history.append(analysis_info)
return analysis_history
except Exception as e:
logger.error(f"Error getting analysis history for session {session_id}: {str(e)}")
return []
def get_analysis_files(self, session_id: str, analysis_id: str) -> List[Dict[str, Any]]:
"""Get files for a specific analysis"""
try:
session_dir = Path(self.config.OUTPUT_DIR) / session_id
analysis_dir = session_dir / analysis_id
if not analysis_dir.exists() or not analysis_dir.is_dir():
return []
generated_files = []
for file_path in analysis_dir.glob("*"):
if file_path.is_file() and file_path.name != 'venv':
file_type = self._determine_file_type(file_path)
file_info = {
'name': file_path.name,
'type': file_type,
'description': self._get_file_description(file_path, file_type),
'downloadable': True,
'path': str(file_path),
'relative_path': f"{analysis_dir.name}/{file_path.name}"
}
# For text files, include content preview
if file_path.suffix.lower() in ['.txt', '.log', '.csv', '.json', '.py']:
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
file_info['content'] = content[:2000] if len(content) > 2000 else content
file_info['content_preview'] = content[:500] + '...' if len(content) > 500 else content
except Exception as e:
logger.warning(f"Could not read file {file_path}: {str(e)}")
file_info['content'] = 'Content unavailable'
generated_files.append(file_info)
return generated_files
except Exception as e:
logger.error(f"Error getting analysis files for session {session_id}, analysis {analysis_id}: {str(e)}")
return []
def _collect_previous_analysis_context(self, session_id: str, selected_analyses: List[str] = None) -> Dict[str, Any]:
"""Collect context from previous analyses for iterative refinement"""
try:
session_dir = Path(self.config.OUTPUT_DIR) / session_id
if not session_dir.exists():
return {'analyses': [], 'summary': 'No previous analyses found.', 'count': 0}
# Get all analysis directories
analysis_dirs = [d for d in session_dir.iterdir() if d.is_dir() and d.name.startswith('analysis_')]
if not analysis_dirs:
return {'analyses': [], 'summary': 'No previous analyses found.', 'count': 0}
# Sort by creation time (oldest first)
analysis_dirs.sort(key=lambda d: d.stat().st_mtime)
# Filter by selected analyses if specified
if selected_analyses:
analysis_dirs = [d for d in analysis_dirs if d.name in selected_analyses]
if not analysis_dirs:
return {'analyses': [], 'summary': 'Selected analyses not found.', 'count': 0}
else:
# If no specific selection, exclude the most recent (current) analysis
analysis_dirs = analysis_dirs[:-1]
previous_analyses = []
for i, analysis_dir in enumerate(analysis_dirs, 1):
analysis_context = {
'analysis_number': i,
'analysis_id': analysis_dir.name,
'timestamp': datetime.fromtimestamp(analysis_dir.stat().st_mtime).strftime('%Y-%m-%d %H:%M:%S')
}
# Extract user query from analysis script
analysis_script = analysis_dir / 'analysis.py'
if analysis_script.exists():
try:
with open(analysis_script, 'r', encoding='utf-8') as f:
script_content = f.read()
analysis_context['script_content'] = script_content
# Extract user query from script comments
for line in script_content.split('\n')[:15]:
if 'Query:' in line or 'User Query:' in line:
analysis_context['user_query'] = line.split(':')[-1].strip().strip('"\'')
break
else:
analysis_context['user_query'] = f'Analysis {i}'
except Exception as e:
logger.warning(f"Could not read analysis script {analysis_script}: {str(e)}")
analysis_context['script_content'] = 'Script not available'
analysis_context['user_query'] = f'Analysis {i}'
# Extract conclusions
conclusions_file = analysis_dir / 'conclusions.txt'
if conclusions_file.exists():
try:
with open(conclusions_file, 'r', encoding='utf-8') as f:
analysis_context['conclusions'] = f.read()
except Exception as e:
logger.warning(f"Could not read conclusions {conclusions_file}: {str(e)}")
analysis_context['conclusions'] = 'Conclusions not available'
# Extract key results from output files
results_summary = []
for file_path in analysis_dir.glob('*'):
if file_path.is_file() and file_path.suffix.lower() in ['.csv', '.txt', '.json']:
if file_path.name.startswith(('table_', 'results_', 'summary_')):
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()[:1000] # First 1000 chars
results_summary.append({
'filename': file_path.name,
'preview': content
})
except Exception as e:
logger.warning(f"Could not read result file {file_path}: {str(e)}")
analysis_context['results_summary'] = results_summary
# List of generated plots
plots = [f.name for f in analysis_dir.glob('*') if f.suffix.lower() in ['.png', '.jpg', '.jpeg', '.svg']]
analysis_context['plots_generated'] = plots
previous_analyses.append(analysis_context)
# Create summary of analysis progression
summary_parts = []
if selected_analyses:
summary_parts.append(f"Using context from {len(previous_analyses)} selected previous analyses:")
else:
summary_parts.append(f"Session has {len(previous_analyses)} previous analysis iterations:")
for analysis in previous_analyses:
summary_parts.append(f"\n{analysis['analysis_number']}. {analysis['user_query']} ({analysis['timestamp']})")
if analysis.get('conclusions'):
# Include first 200 chars of conclusions
conclusions_preview = analysis['conclusions'][:200] + '...' if len(analysis['conclusions']) > 200 else analysis['conclusions']
summary_parts.append(f" Key findings: {conclusions_preview}")
if analysis.get('plots_generated'):
summary_parts.append(f" Generated plots: {', '.join(analysis['plots_generated'])}")
context_type = "selected" if selected_analyses else "all previous"
return {
'analyses': previous_analyses,
'summary': '\n'.join(summary_parts),
'count': len(previous_analyses),
'type': context_type
}
except Exception as e:
logger.error(f"Error collecting previous analysis context for session {session_id}: {str(e)}")
return {'analyses': [], 'summary': f'Error collecting context: {str(e)}', 'count': 0}
def _determine_file_type(self, file_path: Path) -> str:
"""Determine the type of a file based on its extension and name"""
suffix = file_path.suffix.lower()
name = file_path.name.lower()
if suffix in ['.png', '.jpg', '.jpeg', '.svg', '.gif']:
return 'plot'
elif suffix == '.py':
return 'script'
elif name == 'requirements.txt':
return 'requirements'
elif suffix in ['.csv', '.xlsx', '.xls']:
return 'table'
elif suffix in ['.txt', '.md']:
return 'text'
elif suffix in ['.log']:
return 'log'
elif suffix in ['.json']:
return 'data'
else:
return 'output'
def _get_file_description(self, file_path: Path, file_type: str) -> str:
"""Get a human-readable description for a file"""
name = file_path.name.lower()
if file_type == 'script':
return 'Generated Python analysis script'
elif file_type == 'requirements':
return 'Python package requirements'
elif file_type == 'plot':
return 'Generated visualization'
elif file_type == 'table':
return 'Data table or results'
elif name.startswith('conclusion'):
return 'Analysis conclusions and insights'
elif name.startswith('table_'):
return 'Statistical results table'
elif name.startswith('plot_'):
return 'Analysis visualization'
elif file_type == 'log':
return 'Execution log'
else:
return f'Generated {file_type}'
def generate_interpretation(self, session_id: str) -> Dict[str, Any]:
"""Generate interpretation of analysis results"""
try:
session = self.database_manager.get_session(session_id)
results = self.database_manager.get_session_results(session_id)
# Get latest analysis results
analysis_results = [r for r in results if r.result_type == 'analysis_results']
if not analysis_results:
return {
'success': False,
'error': 'No analysis results found to interpret'
}
latest_results = analysis_results[-1].result_data
# Generate interpretation using LLM
interpretation_result = self.statistical_agent.interpret_results(
latest_results, session.analysis_config, session.description
)
if interpretation_result['success']:
# Store interpretation as result
interp_result = AnalysisResult(
result_id=str(uuid.uuid4()),
session_id=session_id,
step_id="interpretation",
result_type="interpretation",
result_data={
'interpretation': interpretation_result['interpretation'],
'key_findings': interpretation_result.get('key_findings', []),
'recommendations': interpretation_result.get('recommendations', [])
}
)
self.database_manager.add_analysis_result(interp_result)
return interpretation_result
except Exception as e:
logger.error(f"Error generating interpretation for session {session_id}: {str(e)}")
return {
'success': False,
'error': str(e)
}
def _store_session_data(self, session_id: str, df: pd.DataFrame):
"""Store session data temporarily (in production, use proper storage)"""
try:
data_path = self.config.SESSIONS_FOLDER / f"{session_id}_data.pkl"
df.to_pickle(data_path)
except Exception as e:
logger.error(f"Error storing session data: {str(e)}")
def _load_session_data(self, session_id: str) -> Optional[pd.DataFrame]:
"""Load session data"""
try:
data_path = self.config.SESSIONS_FOLDER / f"{session_id}_data.pkl"
if data_path.exists():
return pd.read_pickle(data_path)
except Exception as e:
logger.error(f"Error loading session data: {str(e)}")
return None
def _get_latest_data_summary(self, session_id: str) -> Dict[str, Any]:
"""Get latest data summary for session"""
results = self.database_manager.get_session_results(session_id)
data_summary_results = [r for r in results if r.result_type == 'data_summary']
if data_summary_results:
return data_summary_results[-1].result_data
return {}
def _get_next_step_number(self, session_id: str) -> int:
"""Get next step number for session"""
steps = self.database_manager.get_session_steps(session_id)
return len(steps) + 1
def get_recent_sessions(self, user_id: str = "default", limit: int = 10) -> List[Dict[str, Any]]:
"""Get recent sessions for user"""
try:
sessions = self.database_manager.get_recent_sessions(user_id, limit)
return [
{
'session_id': session.session_id,
'title': session.title,
'description': session.description,
'status': session.status.value,
'created_at': session.created_at.isoformat() if session.created_at else None,
'updated_at': session.updated_at.isoformat() if session.updated_at else None
}
for session in sessions
]
except Exception as e:
logger.error(f"Error getting recent sessions: {str(e)}")
return []
def delete_session(self, session_id: str) -> bool:
"""Delete session and all associated data"""
try:
# Clean up all session files
self.agent_executor.cleanup_session(session_id)
# Delete from database
session = self.database_manager.get_session(session_id)
if session:
# Delete related data
steps = self.database_manager.get_session_steps(session_id)
for step in steps:
self.database_manager.delete_step(step.step_id)
results = self.database_manager.get_session_results(session_id)
for result in results:
self.database_manager.delete_result(result.result_id)
# Delete session
success = self.database_manager.delete_session(session_id)
# Clean up any uploaded data files
try:
import glob
upload_pattern = str(self.config.UPLOAD_FOLDER / f"*{session_id}*")
for file_path in glob.glob(upload_pattern):
os.remove(file_path)
logger.info(f"Removed uploaded file: {file_path}")
except Exception as file_cleanup_error:
logger.warning(f"Could not clean up uploaded files: {str(file_cleanup_error)}")
return success
return False
except Exception as e:
logger.error(f"Error deleting session {session_id}: {str(e)}")
return False
def cleanup_orphaned_files(self) -> Dict[str, Any]:
"""Clean up orphaned files and directories from deleted sessions"""
try:
# Get all active session IDs from database
all_sessions = self.database_manager.get_all_sessions()
active_session_ids = {session.session_id for session in all_sessions}
cleanup_results = {
'orphaned_dirs_removed': 0,
'orphaned_files_removed': 0,
'total_size_freed_mb': 0,
'errors': []
}
# Directories to check for orphaned session data
directories_to_check = [
self.config.OUTPUT_DIR,
self.config.GENERATED_SCRIPTS_FOLDER,
self.config.SANDBOX_FOLDER,
self.config.SESSIONS_FOLDER,
self.config.REPORTS_FOLDER
]
# Clean up orphaned session directories
for directory in directories_to_check:
if not directory.exists():
continue
for item in directory.iterdir():
if item.is_dir():
# Check if this looks like a session ID (UUID format)
if len(item.name) == 36 and item.name.count('-') == 4:
if item.name not in active_session_ids:
try:
# Calculate size before deletion
size = sum(f.stat().st_size for f in item.rglob('*') if f.is_file())
cleanup_results['total_size_freed_mb'] += size / (1024 * 1024)
shutil.rmtree(item)
cleanup_results['orphaned_dirs_removed'] += 1
logger.info(f"Removed orphaned directory: {item}")
except Exception as e:
cleanup_results['errors'].append(f"Could not remove {item}: {str(e)}")
# Clean up orphaned upload files
if self.config.UPLOAD_FOLDER.exists():
for file_path in self.config.UPLOAD_FOLDER.iterdir():
if file_path.is_file():
# Check if filename contains any active session ID
file_has_active_session = False
for session_id in active_session_ids:
if session_id in file_path.name:
file_has_active_session = True
break
# If no active session found and file looks like session data
if not file_has_active_session and ('_' in file_path.name or len(file_path.name) > 30):
try:
size = file_path.stat().st_size
cleanup_results['total_size_freed_mb'] += size / (1024 * 1024)
os.remove(file_path)
cleanup_results['orphaned_files_removed'] += 1
logger.info(f"Removed orphaned upload file: {file_path.name}")
except Exception as e:
cleanup_results['errors'].append(f"Could not remove {file_path}: {str(e)}")
cleanup_results['total_size_freed_mb'] = round(cleanup_results['total_size_freed_mb'], 2)
return {
'success': True,
'results': cleanup_results
}
except Exception as e:
logger.error(f"Error during orphaned files cleanup: {str(e)}")
return {
'success': False,
'error': str(e)
}
def cleanup_old_sessions(self, days_old: int = 7):
"""Clean up sessions older than specified days"""
try:
cutoff_date = datetime.now() - timedelta(days=days_old)
# Get all sessions
all_sessions = self.database_manager.get_all_sessions()
cleaned_count = 0
for session in all_sessions:
if session.created_at < cutoff_date:
logger.info(f"Cleaning up old session: {session.session_id} (created: {session.created_at})")
if self.delete_session(session.session_id):
cleaned_count += 1
logger.info(f"Cleaned up {cleaned_count} old sessions")
# Also run orphaned file cleanup as part of regular maintenance
if cleaned_count > 0:
try:
orphaned_result = self.cleanup_orphaned_files()
if orphaned_result['success']:
results = orphaned_result['results']
if results['orphaned_dirs_removed'] > 0 or results['orphaned_files_removed'] > 0:
logger.info(f"Additional orphaned cleanup: {results['orphaned_dirs_removed']} dirs, {results['orphaned_files_removed']} files, {results['total_size_freed_mb']} MB freed")
except Exception as orphaned_error:
logger.warning(f"Orphaned cleanup warning: {str(orphaned_error)}")
return cleaned_count
except Exception as e:
logger.error(f"Error during periodic cleanup: {str(e)}")
return 0
Parameters
| Name | Type | Default | Kind |
|---|---|---|---|
bases |
- | - |
Parameter Details
bases: Parameter of type
Return Value
Returns unspecified type
Class Interface
Methods
__init__(self, config)
Purpose: Internal method: init
Parameters:
config: Type: Config
Returns: None
create_analysis_session(self, title, description, user_id) -> str
Purpose: Create new analysis session
Parameters:
title: Type: strdescription: Type: struser_id: Type: str
Returns: Returns str
load_data_for_session(self, session_id, data_source) -> Dict[str, Any]
Purpose: Load data for analysis session
Parameters:
session_id: Type: strdata_source: Type: DataSource
Returns: Returns Dict[str, Any]
process_user_query(self, session_id, user_query) -> Dict[str, Any]
Purpose: Process natural language query and suggest analysis
Parameters:
session_id: Type: struser_query: Type: str
Returns: Returns Dict[str, Any]
generate_and_execute_analysis(self, session_id, analysis_config, user_query, model, include_previous_context, selected_analyses) -> Dict[str, Any]
Purpose: Generate and execute statistical analysis script with optional previous context
Parameters:
session_id: Type: stranalysis_config: Type: AnalysisConfigurationuser_query: Type: strmodel: Type: strinclude_previous_context: Type: boolselected_analyses: Type: List[str]
Returns: Returns Dict[str, Any]
debug_and_retry_analysis(self, session_id, failed_step_id) -> Dict[str, Any]
Purpose: Debug failed analysis and retry execution using agent-based approach
Parameters:
session_id: Type: strfailed_step_id: Type: str
Returns: Returns Dict[str, Any]
_execute_analysis_script(self, session_id, step_id, script, df, user_query) -> Dict[str, Any]
Purpose: Execute analysis script using agent-based executor
Parameters:
session_id: Type: strstep_id: Type: strscript: Type: strdf: Type: pd.DataFrameuser_query: Type: str
Returns: Returns Dict[str, Any]
_store_execution_results(self, session_id, step_id, execution_result)
Purpose: Store execution results in database
Parameters:
session_id: Type: strstep_id: Type: strexecution_result: Type: Dict[str, Any]
Returns: None
get_session_summary(self, session_id) -> Dict[str, Any]
Purpose: Get comprehensive session summary
Parameters:
session_id: Type: str
Returns: Returns Dict[str, Any]
_get_generated_files_info(self, session_id) -> List[Dict[str, Any]]
Purpose: Get information about generated files for a session
Parameters:
session_id: Type: str
Returns: Returns List[Dict[str, Any]]
get_analysis_history(self, session_id) -> List[Dict[str, Any]]
Purpose: Get analysis history for a session with file information
Parameters:
session_id: Type: str
Returns: Returns List[Dict[str, Any]]
get_analysis_files(self, session_id, analysis_id) -> List[Dict[str, Any]]
Purpose: Get files for a specific analysis
Parameters:
session_id: Type: stranalysis_id: Type: str
Returns: Returns List[Dict[str, Any]]
_collect_previous_analysis_context(self, session_id, selected_analyses) -> Dict[str, Any]
Purpose: Collect context from previous analyses for iterative refinement
Parameters:
session_id: Type: strselected_analyses: Type: List[str]
Returns: Returns Dict[str, Any]
_determine_file_type(self, file_path) -> str
Purpose: Determine the type of a file based on its extension and name
Parameters:
file_path: Type: Path
Returns: Returns str
_get_file_description(self, file_path, file_type) -> str
Purpose: Get a human-readable description for a file
Parameters:
file_path: Type: Pathfile_type: Type: str
Returns: Returns str
generate_interpretation(self, session_id) -> Dict[str, Any]
Purpose: Generate interpretation of analysis results
Parameters:
session_id: Type: str
Returns: Returns Dict[str, Any]
_store_session_data(self, session_id, df)
Purpose: Store session data temporarily (in production, use proper storage)
Parameters:
session_id: Type: strdf: Type: pd.DataFrame
Returns: None
_load_session_data(self, session_id) -> Optional[pd.DataFrame]
Purpose: Load session data
Parameters:
session_id: Type: str
Returns: Returns Optional[pd.DataFrame]
_get_latest_data_summary(self, session_id) -> Dict[str, Any]
Purpose: Get latest data summary for session
Parameters:
session_id: Type: str
Returns: Returns Dict[str, Any]
_get_next_step_number(self, session_id) -> int
Purpose: Get next step number for session
Parameters:
session_id: Type: str
Returns: Returns int
get_recent_sessions(self, user_id, limit) -> List[Dict[str, Any]]
Purpose: Get recent sessions for user
Parameters:
user_id: Type: strlimit: Type: int
Returns: Returns List[Dict[str, Any]]
delete_session(self, session_id) -> bool
Purpose: Delete session and all associated data
Parameters:
session_id: Type: str
Returns: Returns bool
cleanup_orphaned_files(self) -> Dict[str, Any]
Purpose: Clean up orphaned files and directories from deleted sessions
Returns: Returns Dict[str, Any]
cleanup_old_sessions(self, days_old)
Purpose: Clean up sessions older than specified days
Parameters:
days_old: Type: int
Returns: None
Required Imports
import uuid
import json
import logging
import os
import shutil
Usage Example
# Example usage:
# result = StatisticalAnalysisService(bases)
Similar Components
AI-powered semantic similarity - components with related functionality:
-
class StatisticalAnalysisService 98.0% similar
-
class SmartStatService 65.9% similar
-
class DataAnalysisService 65.8% similar
-
class StatisticalAgent_v1 57.0% similar
-
class StatisticalSession 57.0% similar