class SmartStatService
Service for running SmartStat analysis sessions in Vice AI
/tf/active/vicechatdev/vice_ai/smartstat_service.py
1064 - 1642
moderate
Purpose
Service for running SmartStat analysis sessions in Vice AI
Source Code
class SmartStatService:
"""Service for running SmartStat analysis sessions in Vice AI"""
def __init__(self, config):
self.config = config
self.sessions = {} # In-memory session storage (could move to DB)
# Initialize SmartStat components
self.statistical_agent = StatisticalAgent(config)
self.script_executor = ScriptExecutor(config)
self.agent_executor = AgentExecutor(config)
# Ensure required directories exist
os.makedirs(config.UPLOAD_FOLDER, exist_ok=True)
os.makedirs(config.GENERATED_SCRIPTS_FOLDER, exist_ok=True)
logger.info("SmartStat Service initialized for Vice AI")
def create_session(self, data_section_id: str, title: str) -> str:
"""Create a new SmartStat analysis session"""
session_id = str(uuid.uuid4())
session = SmartStatSession(session_id, data_section_id, title)
self.sessions[session_id] = session
logger.info(f"Created SmartStat session {session_id} for data section {data_section_id}")
return session_id
def get_session(self, session_id: str) -> Optional[SmartStatSession]:
"""Get a session by ID"""
return self.sessions.get(session_id)
def add_dataset(self, session_id: str, dataset_name: str, dataframe: pd.DataFrame,
validation: Optional[Dict] = None, context: Optional[str] = None) -> Dict[str, Any]:
"""Add a named dataset or information sheet to a session
Args:
session_id: Session identifier
dataset_name: Name for this dataset (e.g., sheet name, filename)
dataframe: The dataframe to add
validation: Optional format validation results
context: Optional extracted context for information sheets
Returns:
Dict with success status and dataset info
"""
session = self.get_session(session_id)
if not session:
raise ValueError(f"Session {session_id} not found")
try:
# Check if this is an information sheet
if validation and validation.get('sheet_type') == 'information_sheet':
# Store as information sheet with context
if context:
session.info_sheets[dataset_name] = context
logger.info(f"Added information sheet '{dataset_name}' to session {session_id} (stored as context)")
return {
'success': True,
'type': 'information_sheet',
'dataset_name': dataset_name,
'context_length': len(context) if context else 0,
'validation': validation
}
# Add as regular dataset (tabular data)
session.datasets[dataset_name] = dataframe
# Also set as primary dataframe if it's the first/only one
if session.dataframe is None:
session.dataframe = dataframe
session.updated_at = datetime.now()
# Generate dataset info
missing_values = dataframe.isnull().sum().to_dict()
missing_values = {k: int(v) for k, v in missing_values.items()}
dataset_info = {
'name': dataset_name,
'rows': len(dataframe),
'columns': len(dataframe.columns),
'column_names': dataframe.columns.tolist(),
'dtypes': {col: str(dtype) for col, dtype in dataframe.dtypes.items()},
'numeric_columns': dataframe.select_dtypes(include=['number']).columns.tolist(),
'missing_values': missing_values,
'validation': validation
}
logger.info(f"Added dataset '{dataset_name}' to session {session_id}: {dataset_info['rows']} rows, {dataset_info['columns']} columns")
return {
'success': True,
'type': 'tabular_data',
'dataset_info': dataset_info,
'total_datasets': len(session.datasets),
'total_info_sheets': len(session.info_sheets)
}
except Exception as e:
logger.error(f"Error adding dataset to session {session_id}: {str(e)}")
return {
'success': False,
'error': str(e)
}
def upload_data(self, session_id: str, csv_file_path: str, dataframe: Optional[pd.DataFrame] = None) -> Dict[str, Any]:
"""Upload and process data for analysis
Args:
session_id: Session identifier
csv_file_path: Path to data file (used if dataframe not provided)
dataframe: Optional pre-loaded dataframe (for Excel sheets)
"""
session = self.get_session(session_id)
if not session:
raise ValueError(f"Session {session_id} not found")
try:
# Use provided dataframe or load from file
if dataframe is not None:
df = dataframe
else:
# Use smart CSV reader with automatic delimiter detection
df = smart_read_csv(csv_file_path)
session.dataframe = df
session.updated_at = datetime.now()
# Generate data summary - replace NaN with None for JSON serialization
missing_values = df.isnull().sum().to_dict()
missing_values = {k: int(v) for k, v in missing_values.items()} # Convert to int
# Convert preview data, replacing NaN with None
preview_data = df.head(20).fillna('').to_dict('records')
data_summary = {
'rows': len(df),
'columns': len(df.columns),
'column_names': df.columns.tolist(),
'dtypes': {col: str(dtype) for col, dtype in df.dtypes.items()},
'numeric_columns': df.select_dtypes(include=['number']).columns.tolist(),
'missing_values': missing_values,
'preview': preview_data
}
logger.info(f"Uploaded data to session {session_id}: {data_summary['rows']} rows, {data_summary['columns']} columns")
return {
'success': True,
'dataset_info': data_summary
}
except Exception as e:
logger.error(f"Error uploading data to session {session_id}: {str(e)}")
return {
'success': False,
'error': str(e)
}
def run_analysis(self, session_id: str, user_query: str,
model: str = 'gpt-4o', include_previous_context: bool = False,
interpretation_template_id: str = None) -> Dict[str, Any]:
"""Run SmartStat analysis with iterative debugging using AgentExecutor"""
session = self.get_session(session_id)
if not session:
raise ValueError(f"Session {session_id} not found")
if session.dataframe is None and len(session.datasets) == 0:
raise ValueError("No data loaded in session")
try:
# Load interpretation template if specified
interpretation_template = None
if interpretation_template_id:
try:
from pathlib import Path
import json
templates_file = Path(__file__).parent / "statistical_interpretation_templates.json"
if templates_file.exists():
with open(templates_file, 'r') as f:
templates = json.load(f)
if interpretation_template_id in templates:
interpretation_template = templates[interpretation_template_id]['template']
logger.info(f"Using interpretation template: {templates[interpretation_template_id]['name']}")
else:
logger.warning(f"Interpretation template '{interpretation_template_id}' not found")
else:
logger.warning("Interpretation templates file not found")
except Exception as e:
logger.error(f"Error loading interpretation template: {e}")
# Get data summary - handle both single and multiple datasets
logger.info(f"Preparing data summary: {len(session.datasets)} datasets, dataframe={'exists' if session.dataframe is not None else 'None'}")
if len(session.datasets) > 0:
# Multiple datasets
data_summary = {
'type': 'multi-dataset',
'dataset_count': len(session.datasets),
'datasets': {}
}
logger.info(f"Multi-dataset mode: {len(session.datasets)} datasets")
for name, df in session.datasets.items():
# Calculate the actual filename that will be saved (must match the save logic)
safe_name = "".join(c for c in name if c.isalnum() or c in (' ', '-', '_')).strip()
safe_name = safe_name.replace(' ', '_')
csv_filename = f"{safe_name}.csv"
logger.info(f" Dataset '{name}' → {csv_filename} ({len(df)} rows, {len(df.columns)} cols)")
data_summary['datasets'][name] = {
'csv_filename': csv_filename, # CRITICAL: Tell LLM the actual filename to load
'rows': len(df),
'columns': len(df.columns),
'column_names': df.columns.tolist(),
'dtypes': {col: str(dtype) for col, dtype in df.dtypes.items()},
'numeric_columns': df.select_dtypes(include=['number']).columns.tolist()
}
# Add information sheets context if available
if len(session.info_sheets) > 0:
data_summary['info_sheets'] = session.info_sheets
logger.info(f"Added {len(session.info_sheets)} info sheets to data_summary")
logger.info(f"DEBUG: data_summary prepared with type={data_summary.get('type')}, dataset_count={data_summary.get('dataset_count')}")
# Also include primary dataframe info for backward compatibility
if session.dataframe is not None:
df = session.dataframe
data_summary['rows'] = len(df)
data_summary['columns'] = len(df.columns)
data_summary['column_names'] = df.columns.tolist()
data_summary['dtypes'] = {col: str(dtype) for col, dtype in df.dtypes.items()}
data_summary['numeric_columns'] = df.select_dtypes(include=['number']).columns.tolist()
else:
# Single dataset (original format)
df = session.dataframe
data_summary = {
'type': 'single-dataset',
'rows': len(df),
'columns': len(df.columns),
'column_names': df.columns.tolist(),
'dtypes': {col: str(dtype) for col, dtype in df.dtypes.items()},
'numeric_columns': df.select_dtypes(include=['number']).columns.tolist()
}
# Build previous context if requested
previous_context = None
if include_previous_context and len(session.analysis_history) > 0:
previous_context = {
'count': len(session.analysis_history),
'type': 'previous',
'summary': f"You have performed {len(session.analysis_history)} previous analyses on this dataset.",
'analyses': session.analysis_history
}
# Use statistical agent to generate Python script
logger.info(f"Generating analysis script for query: {user_query}")
# Create simple analysis config
from smartstat_models import AnalysisConfiguration, AnalysisType
analysis_config = AnalysisConfiguration(
analysis_type=AnalysisType.DESCRIPTIVE,
target_variables=[],
grouping_variables=[],
control_variables=[]
)
# Generate initial script
script_result = self.statistical_agent.generate_analysis_script(
analysis_config=analysis_config,
data_summary=data_summary,
user_query=user_query,
model=model,
previous_context=previous_context
)
if not script_result['success']:
return {
'success': False,
'error': script_result.get('error', 'Failed to generate script')
}
script = script_result['script']
logger.info(f"Generated script length: {len(script)} characters")
# Save script to persistent location for debugging
from pathlib import Path
session_scripts_dir = Path(self.config.GENERATED_SCRIPTS_FOLDER) / session_id
session_scripts_dir.mkdir(parents=True, exist_ok=True)
script_num = len(session.analysis_history) + 1
script_file = session_scripts_dir / f"analysis_{script_num}.py"
script_file.write_text(script)
logger.info(f"Saved generated script to: {script_file}")
# Execute with iterative debugging using AgentExecutor
# Create a project directory with the script
project_path = session_scripts_dir / f"project_{script_num}"
project_path.mkdir(exist_ok=True)
# Save script to project file
script_file_proj = project_path / "analysis.py"
script_file_proj.write_text(script)
# Save data to CSV for script access
if len(session.datasets) > 0:
# Save all datasets with their names
for dataset_name, dataset_df in session.datasets.items():
# Sanitize filename
safe_name = "".join(c for c in dataset_name if c.isalnum() or c in (' ', '-', '_')).strip()
safe_name = safe_name.replace(' ', '_')
data_file = project_path / f"{safe_name}.csv"
dataset_df.to_csv(data_file, index=False)
logger.info(f"Saved dataset '{dataset_name}' to {data_file}")
# Save information sheets to both JSON and Markdown formats
if len(session.info_sheets) > 0:
import json
# Save as JSON (for programmatic access)
info_sheets_file = project_path / "info_sheets.json"
with open(info_sheets_file, 'w', encoding='utf-8') as f:
json.dump(session.info_sheets, f, indent=2, ensure_ascii=False)
logger.info(f"Saved {len(session.info_sheets)} info sheets to {info_sheets_file}")
# Also save as individual Markdown files (for readability)
info_sheets_dir = project_path / "info_sheets"
info_sheets_dir.mkdir(exist_ok=True)
for sheet_name, sheet_content in session.info_sheets.items():
# Sanitize filename
safe_name = "".join(c for c in sheet_name if c.isalnum() or c in (' ', '-', '_')).strip()
safe_name = safe_name.replace(' ', '_')
md_file = info_sheets_dir / f"{safe_name}.md"
# Write markdown file
with open(md_file, 'w', encoding='utf-8') as f:
f.write(sheet_content)
logger.info(f"Saved info sheet '{sheet_name}' to markdown: {md_file}")
# Create combined markdown with all sheets
combined_md = project_path / "ALL_INFO_SHEETS.md"
with open(combined_md, 'w', encoding='utf-8') as f:
f.write("# All Information Sheets\n\n")
f.write(f"*Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}*\n\n")
f.write("---\n\n")
for sheet_name, sheet_content in session.info_sheets.items():
f.write(f"{sheet_content}\n\n")
f.write("---\n\n")
logger.info(f"Created combined markdown with all info sheets: {combined_md}")
# Also save primary dataframe as data.csv for backward compatibility
if session.dataframe is not None:
data_file = project_path / "data.csv"
session.dataframe.to_csv(data_file, index=False)
else:
# Single dataset - save as data.csv
data_file = project_path / "data.csv"
df.to_csv(data_file, index=False)
# Create requirements.txt with common data science packages
# These are needed for most statistical analyses
requirements_file = project_path / "requirements.txt"
requirements_file.write_text("""pandas>=2.0.0
numpy>=1.24.0
matplotlib>=3.7.0
seaborn>=0.12.0
scipy>=1.10.0
scikit-learn>=1.3.0
statsmodels>=0.14.0
""")
# Use AgentExecutor with retry logic (max 3 iterations)
logger.info(f"Executing analysis with AgentExecutor (max 3 retry attempts)")
execution_result = self.agent_executor.execute_analysis_project(
project_dir=str(project_path),
max_iterations=3,
model=model
)
logger.info(f"Execution completed: success={execution_result['success']}, iterations={execution_result.get('iteration', 0)}")
if not execution_result['success']:
return {
'success': False,
'error': execution_result.get('error', 'Script execution failed after retries'),
'script': script,
'execution_output': execution_result.get('execution_output', ''),
'debug_iterations': execution_result.get('debug_iterations', [])
}
# Extract results from execution
results_text = execution_result.get('execution_output', '')
plot_files = execution_result.get('plots', [])
analysis_results = execution_result.get('analysis_results') # Get structured JSON results
# Convert plot file paths to URLs
plot_urls = []
# First, get plots from collected files
for plot in plot_files:
if isinstance(plot, dict):
# Use rel_path if available (for subdirectories like figures/plot.png)
if 'rel_path' in plot:
plot_url = f"/api/smartstat/plots/{session_id}/{plot['rel_path']}"
elif 'name' in plot:
plot_url = f"/api/smartstat/plots/{session_id}/{plot['name']}"
else:
continue
plot_urls.append(plot_url)
elif isinstance(plot, str):
# Legacy format - just a filename
import os
filename = os.path.basename(plot)
plot_url = f"/api/smartstat/plots/{session_id}/{filename}"
plot_urls.append(plot_url)
# Also check if plots are listed in analysis_results.json
if analysis_results and isinstance(analysis_results, dict) and 'plots' in analysis_results:
json_plots = analysis_results.get('plots', [])
for plot_filename in json_plots:
if isinstance(plot_filename, str):
# Handle both 'plot.png' and 'figures/plot.png' formats
plot_url = f"/api/smartstat/plots/{session_id}/{plot_filename}"
if plot_url not in plot_urls: # Avoid duplicates
plot_urls.append(plot_url)
logger.info(f"Found {len(plot_urls)} plots: {plot_urls}")
# Generate LLM interpretation of results
interpretation_result = None
if analysis_results and isinstance(analysis_results, dict):
try:
logger.info(f"Generating LLM interpretation of analysis results (keys: {list(analysis_results.keys())})")
interpretation_result = self.statistical_agent.interpret_results(
results=analysis_results,
analysis_config=analysis_config,
user_query=user_query,
model=model,
info_sheets=session.info_sheets, # Pass info sheets from session
interpretation_template=interpretation_template # Pass interpretation template
)
if interpretation_result and interpretation_result.get('success'):
# Replace the generic interpretation with the LLM-generated one
analysis_results['interpretation'] = interpretation_result.get('interpretation', '')
analysis_results['key_findings'] = interpretation_result.get('key_findings', [])
analysis_results['recommendations'] = interpretation_result.get('recommendations', [])
logger.info("Successfully generated LLM interpretation")
# Store interpretation separately - don't append to console output
# The interpretation is already in analysis_results and will be displayed by the UI
else:
logger.warning(f"Failed to generate interpretation: {interpretation_result.get('error') if interpretation_result else 'Unknown error'}")
except Exception as e:
logger.error(f"Error generating interpretation: {str(e)}")
import traceback
traceback.print_exc()
# Keep the basic interpretation that's already there
else:
logger.warning(f"Skipping LLM interpretation - analysis_results is {'None' if not analysis_results else 'not a dict (type: ' + type(analysis_results).__name__ + ')'}")
# Store analysis in history
analysis_record = {
'query': user_query,
'script': script,
'explanation': script_result.get('explanation', ''),
'results': analysis_results if analysis_results else results_text, # Prefer structured JSON
'plots': plot_urls, # Use URLs instead of file paths
'timestamp': datetime.now().isoformat(),
'iterations': execution_result.get('iteration', 1),
'debug_log': execution_result.get('execution_log', []), # Use execution_log for detailed timeline
'model': model # Store which LLM model was used
}
session.analysis_history.append(analysis_record)
# Update session plots
if plot_urls:
session.generated_plots.extend(plot_urls)
session.updated_at = datetime.now()
logger.info(f"Analysis completed successfully for session {session_id} after {execution_result.get('iteration', 1)} iterations")
# Automatic cleanup if enabled (remove venv after successful analysis)
if hasattr(self.config, 'AUTO_CLEANUP_ENABLED') and self.config.AUTO_CLEANUP_ENABLED:
try:
logger.info(f"Auto-cleanup enabled - removing venv for session {session_id}")
cleanup_result = self.agent_executor.cleanup_venv_directories(session_id)
if cleanup_result['success']:
logger.info(f"Cleaned up {cleanup_result['cleaned_count']} venv(s), freed {cleanup_result['space_freed_mb']:.2f} MB")
except Exception as e:
logger.warning(f"Auto-cleanup failed (non-critical): {str(e)}")
return {
'success': True,
'response': results_text,
'results': analysis_results if analysis_results else results_text, # Include structured results
'script': script,
'explanation': script_result.get('explanation', ''),
'plots': plot_urls,
'execution_output': results_text,
'iterations': execution_result.get('iteration', 1),
'debug_log': execution_result.get('execution_log', []) # Use execution_log for detailed timeline
}
except Exception as e:
logger.error(f"Error running analysis for session {session_id}: {str(e)}")
import traceback
traceback.print_exc()
return {
'success': False,
'error': str(e)
}
def generate_final_report(self, session_id: str) -> Dict[str, Any]:
"""Generate final report from all analyses in session"""
session = self.get_session(session_id)
if not session:
raise ValueError(f"Session {session_id} not found")
# Compile all analysis results
report_sections = []
for i, analysis in enumerate(session.analysis_history, 1):
# Extract interpretation from results if it's a dict
results_text = analysis['results']
if isinstance(results_text, dict):
# If results is a structured dict with interpretation, use that
if 'interpretation' in results_text:
results_text = results_text['interpretation']
else:
# Otherwise format the dict as JSON for display
import json
results_text = json.dumps(results_text, indent=2, default=str)
section = {
'number': i,
'query': analysis['query'],
'results': results_text, # Now guaranteed to be a string
'plots': analysis['plots'],
'timestamp': analysis['timestamp']
}
report_sections.append(section)
final_report = {
'title': session.title,
'created_at': session.created_at.isoformat(),
'completed_at': datetime.now().isoformat(),
'total_analyses': len(session.analysis_history),
'sections': report_sections,
'all_plots': session.generated_plots
}
session.final_report = final_report
return {
'success': True,
'report': final_report
}
def close_session(self, session_id: str):
"""Close and clean up a session"""
if session_id in self.sessions:
# Cleanup venvs before closing
try:
logger.info(f"Cleaning up session {session_id} before closing")
cleanup_result = self.agent_executor.cleanup_venv_directories(session_id)
if cleanup_result['success']:
logger.info(f"Cleaned up {cleanup_result['cleaned_count']} venv(s), freed {cleanup_result['space_freed_mb']:.2f} MB")
except Exception as e:
logger.warning(f"Session cleanup failed (non-critical): {str(e)}")
# Could save to database here
del self.sessions[session_id]
logger.info(f"Closed SmartStat session {session_id}")
Parameters
| Name | Type | Default | Kind |
|---|---|---|---|
bases |
- | - |
Parameter Details
bases: Parameter of type
Return Value
Returns unspecified type
Class Interface
Methods
__init__(self, config)
Purpose: Internal method: init
Parameters:
config: Parameter
Returns: None
create_session(self, data_section_id, title) -> str
Purpose: Create a new SmartStat analysis session
Parameters:
data_section_id: Type: strtitle: Type: str
Returns: Returns str
get_session(self, session_id) -> Optional[SmartStatSession]
Purpose: Get a session by ID
Parameters:
session_id: Type: str
Returns: Returns Optional[SmartStatSession]
add_dataset(self, session_id, dataset_name, dataframe, validation, context) -> Dict[str, Any]
Purpose: Add a named dataset or information sheet to a session Args: session_id: Session identifier dataset_name: Name for this dataset (e.g., sheet name, filename) dataframe: The dataframe to add validation: Optional format validation results context: Optional extracted context for information sheets Returns: Dict with success status and dataset info
Parameters:
session_id: Type: strdataset_name: Type: strdataframe: Type: pd.DataFramevalidation: Type: Optional[Dict]context: Type: Optional[str]
Returns: Returns Dict[str, Any]
upload_data(self, session_id, csv_file_path, dataframe) -> Dict[str, Any]
Purpose: Upload and process data for analysis Args: session_id: Session identifier csv_file_path: Path to data file (used if dataframe not provided) dataframe: Optional pre-loaded dataframe (for Excel sheets)
Parameters:
session_id: Type: strcsv_file_path: Type: strdataframe: Type: Optional[pd.DataFrame]
Returns: Returns Dict[str, Any]
run_analysis(self, session_id, user_query, model, include_previous_context, interpretation_template_id) -> Dict[str, Any]
Purpose: Run SmartStat analysis with iterative debugging using AgentExecutor
Parameters:
session_id: Type: struser_query: Type: strmodel: Type: strinclude_previous_context: Type: boolinterpretation_template_id: Type: str
Returns: Returns Dict[str, Any]
generate_final_report(self, session_id) -> Dict[str, Any]
Purpose: Generate final report from all analyses in session
Parameters:
session_id: Type: str
Returns: Returns Dict[str, Any]
close_session(self, session_id)
Purpose: Close and clean up a session
Parameters:
session_id: Type: str
Returns: None
Required Imports
import os
import uuid
import json
import logging
from datetime import datetime
Usage Example
# Example usage:
# result = SmartStatService(bases)
Tags
Similar Components
AI-powered semantic similarity - components with related functionality:
-
class SmartStatConfig 67.3% similar
-
class StatisticalAnalysisService_v1 65.9% similar
-
class DataAnalysisService 64.9% similar
-
class StatisticalAnalysisService 64.6% similar
-
class DatabaseManager_v1 62.6% similar