class AgentExecutor_v2
Agent-based script executor that generates standalone Python files, manages dependencies, and provides iterative debugging capabilities
/tf/active/vicechatdev/smartstat/agent_executor.py
25 - 697
moderate
Purpose
Agent-based script executor that generates standalone Python files, manages dependencies, and provides iterative debugging capabilities
Source Code
class AgentExecutor:
"""
Agent-based script executor that generates standalone Python files,
manages dependencies, and provides iterative debugging capabilities
"""
def __init__(self, config: Config):
self.config = config
self.statistical_agent = StatisticalAgent(config)
self.scripts_dir = Path(config.GENERATED_SCRIPTS_FOLDER)
self.sandbox_dir = Path(config.SANDBOX_FOLDER)
self.output_dir = Path(config.OUTPUT_DIR)
# Ensure directories exist
self.scripts_dir.mkdir(exist_ok=True)
self.sandbox_dir.mkdir(exist_ok=True)
self.output_dir.mkdir(exist_ok=True)
def cleanup_old_analyses(self, session_id: str, keep_recent: int = 5):
"""Clean up old analysis directories for a session"""
try:
session_dir = self.output_dir / session_id
if not session_dir.exists():
return
# Get all analysis directories
analysis_dirs = [d for d in session_dir.iterdir()
if d.is_dir() and d.name.startswith('analysis_')]
# Sort by modification time (newest first)
analysis_dirs.sort(key=lambda x: x.stat().st_mtime, reverse=True)
# Remove old analysis directories
for old_dir in analysis_dirs[keep_recent:]:
logger.info(f"Cleaning up old analysis directory: {old_dir}")
shutil.rmtree(old_dir)
except Exception as e:
logger.warning(f"Error cleaning up old analyses for session {session_id}: {str(e)}")
def cleanup_session(self, session_id: str):
"""Clean up all files for a session"""
try:
# Clean up output directory
session_dir = self.output_dir / session_id
if session_dir.exists():
logger.info(f"Cleaning up session directory: {session_dir}")
shutil.rmtree(session_dir)
# Clean up scripts directory
scripts_session_dir = self.scripts_dir / session_id
if scripts_session_dir.exists():
logger.info(f"Cleaning up scripts directory: {scripts_session_dir}")
shutil.rmtree(scripts_session_dir)
# Clean up sandbox directory
sandbox_session_dir = self.sandbox_dir / session_id
if sandbox_session_dir.exists():
logger.info(f"Cleaning up sandbox directory: {sandbox_session_dir}")
shutil.rmtree(sandbox_session_dir)
# Clean up sessions folder (data files)
sessions_folder = Path(self.config.SESSIONS_FOLDER)
if sessions_folder.exists():
for file_path in sessions_folder.glob(f"*{session_id}*"):
if file_path.is_file():
logger.info(f"Cleaning up session data file: {file_path}")
file_path.unlink()
except Exception as e:
logger.warning(f"Error cleaning up session {session_id}: {str(e)}")
def generate_analysis_project(self, session_id: str, user_query: str,
data_summary: Dict[str, Any], analysis_config: Any,
session_data: Any = None) -> Dict[str, Any]:
"""
Generate complete analysis project with Python script, requirements.txt, and data
"""
try:
# Create session directory
session_dir = self.output_dir / session_id
session_dir.mkdir(exist_ok=True)
# Clean up old analyses to prevent accumulation
if self.config.AUTO_CLEANUP_ENABLED:
self.cleanup_old_analyses(session_id, keep_recent=self.config.KEEP_RECENT_ANALYSES)
# Generate unique project ID
project_id = str(uuid.uuid4())[:8]
project_dir = session_dir / f"analysis_{project_id}"
project_dir.mkdir(exist_ok=True)
# Generate Python script
script_result = self._generate_analysis_script(
user_query, data_summary, analysis_config, project_dir
)
if not script_result['success']:
return script_result
# Generate requirements.txt
requirements_result = self._generate_requirements(
script_result['script'], project_dir
)
if not requirements_result['success']:
return requirements_result
# Create data input file
data_input_result = self._create_data_input(
session_id, project_dir, session_data
)
if not data_input_result['success']:
return data_input_result
return {
'success': True,
'project_id': project_id,
'project_dir': str(project_dir),
'script_path': str(project_dir / 'analysis.py'),
'requirements_path': str(project_dir / 'requirements.txt'),
'data_path': str(project_dir / 'input_data.csv'),
'script_content': script_result['script'],
'requirements': requirements_result['requirements']
}
except Exception as e:
logger.error(f"Error generating analysis project: {str(e)}")
return {
'success': False,
'error': str(e)
}
def _generate_analysis_script(self, user_query: str, data_summary: Dict[str, Any],
analysis_config: Any, project_dir: Path) -> Dict[str, Any]:
"""Generate standalone Python analysis script"""
prompt = f"""Generate a complete, standalone Python analysis script that addresses this request:
USER QUERY: "{user_query}"
ANALYSIS CONFIGURATION:
- Type: {analysis_config.analysis_type.value if analysis_config and hasattr(analysis_config, 'analysis_type') else 'descriptive'}
- Target variables: {getattr(analysis_config, 'target_variables', []) if analysis_config else []}
- Grouping variables: {getattr(analysis_config, 'grouping_variables', []) if analysis_config else []}
- Significance level: {getattr(analysis_config, 'significance_level', 0.05) if analysis_config else 0.05}
DATA INFORMATION:
- Shape: {data_summary.get('shape', 'Unknown')}
- Columns: {list(data_summary.get('column_info', {}).keys())}
- Column types: {json.dumps(data_summary.get('column_info', {}), indent=2)}
REQUIREMENTS:
1. Create a standalone Python script that reads data from 'input_data.csv'
2. Include all necessary imports at the top
3. Perform comprehensive statistical analysis based on the user query
4. Generate visualizations and save them as PNG files with descriptive names
5. Create summary tables and save them as CSV files
6. Write textual conclusions and interpretations to a 'conclusions.txt' file
7. Handle errors gracefully with informative error messages
8. Use professional statistical practices and proper data validation
OUTPUT STRUCTURE:
- Save plots as: plot_01_description.png, plot_02_description.png, etc.
- Save tables as: table_01_description.csv, table_02_description.csv, etc.
- Save conclusions as: conclusions.txt
- Print progress messages to console
SCRIPT TEMPLATE:
```python
#!/usr/bin/env python3
\"\"\"
Statistical Analysis Script
Generated by SmartStat Agent
Query: {user_query}
Generated: {datetime.now().isoformat()}
\"\"\"
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from scipy import stats
import warnings
warnings.filterwarnings('ignore')
def main():
print("Starting statistical analysis...")
print(f"Query: {user_query}")
# Load data
try:
df = pd.read_csv('input_data.csv')
print(f"Data loaded successfully: {{df.shape}}")
except Exception as e:
print(f"Error loading data: {{e}}")
return
# Your analysis code here...
print("Analysis completed successfully!")
if __name__ == "__main__":
main()
```
Generate the complete Python script following these guidelines."""
try:
response = self.statistical_agent._call_llm(prompt, model="gpt-4o", max_tokens=3000)
# Extract Python code from response
if '```python' in response:
script_start = response.find('```python') + 9
script_end = response.find('```', script_start)
if script_end == -1:
script_code = response[script_start:].strip()
else:
script_code = response[script_start:script_end].strip()
else:
script_code = response.strip()
# Save script to file
script_path = project_dir / 'analysis.py'
with open(script_path, 'w') as f:
f.write(script_code)
return {
'success': True,
'script': script_code,
'script_path': str(script_path)
}
except Exception as e:
logger.error(f"Error generating analysis script: {str(e)}")
return {
'success': False,
'error': str(e)
}
def _generate_requirements(self, script_content: str, project_dir: Path) -> Dict[str, Any]:
"""Generate requirements.txt based on script imports"""
prompt = f"""Analyze this Python script and generate a requirements.txt file with the exact package versions needed:
PYTHON SCRIPT:
```python
{script_content}
```
Generate a requirements.txt file that includes:
1. All imported packages with specific versions for reproducibility
2. Common data science packages (pandas, numpy, matplotlib, seaborn, scipy, etc.)
3. Any specialized statistical packages that might be needed
4. Use recent stable versions
Format as a clean requirements.txt file with one package per line in format: package==version
Example:
pandas==2.0.3
numpy==1.24.3
matplotlib==3.7.2
seaborn==0.12.2
scipy==1.11.1
Provide only the requirements.txt content, no explanations."""
try:
response = self.statistical_agent._call_llm(prompt, model="gpt-4o", max_tokens=1000)
# Clean up the response
requirements_content = response.strip()
if '```' in requirements_content:
start = requirements_content.find('```')
end = requirements_content.rfind('```')
if start != -1 and end != -1 and end > start:
requirements_content = requirements_content[start+3:end].strip()
# Remove any language specifiers
if requirements_content.startswith(('txt', 'text', 'requirements', 'pip-requirements', 'plaintext')):
lines = requirements_content.split('\n')
# Find first line that looks like a package requirement
start_idx = 0
for i, line in enumerate(lines):
if '==' in line or '>=' in line or '<=' in line or '>' in line or '<' in line:
start_idx = i
break
requirements_content = '\n'.join(lines[start_idx:])
# Ensure clean format - only package requirements
lines = requirements_content.split('\n')
clean_lines = []
for line in lines:
line = line.strip()
if line and not line.startswith('#') and ('==' in line or '>=' in line):
clean_lines.append(line)
requirements_content = '\n'.join(clean_lines)
# Save requirements.txt
requirements_path = project_dir / 'requirements.txt'
with open(requirements_path, 'w') as f:
f.write(requirements_content)
# Parse requirements for return
requirements_list = [line.strip() for line in requirements_content.split('\n')
if line.strip() and not line.strip().startswith('#')]
return {
'success': True,
'requirements': requirements_list,
'requirements_path': str(requirements_path),
'requirements_content': requirements_content
}
except Exception as e:
logger.error(f"Error generating requirements: {str(e)}")
return {
'success': False,
'error': str(e)
}
def _create_data_input(self, session_id: str, project_dir: Path, session_data: Any = None) -> Dict[str, Any]:
"""Copy session data to project directory as input_data.csv"""
try:
df = None
# Try to use provided session data first
if session_data is not None:
df = session_data
else:
# Fallback to loading from session storage
try:
from services import StatisticalAnalysisService
service = StatisticalAnalysisService(self.config)
df = service._load_session_data(session_id)
except ImportError:
# Handle circular import by loading data directly
import pandas as pd
data_path = self.config.SESSIONS_FOLDER / f"{session_id}_data.pkl"
if data_path.exists():
df = pd.read_pickle(data_path)
if df is None:
return {
'success': False,
'error': 'No data found for session'
}
# Save to project directory
data_path = project_dir / 'input_data.csv'
df.to_csv(data_path, index=False)
return {
'success': True,
'data_path': str(data_path),
'shape': df.shape
}
except Exception as e:
logger.error(f"Error creating data input: {str(e)}")
return {
'success': False,
'error': str(e)
}
def execute_analysis_project(self, project_dir: str, max_iterations: int = 3) -> Dict[str, Any]:
"""
Execute the analysis project with iterative debugging
"""
project_path = Path(project_dir)
iteration = 0
while iteration < max_iterations:
iteration += 1
logger.info(f"Execution attempt {iteration}/{max_iterations}")
# Install requirements
install_result = self._install_requirements(project_path)
if not install_result['success']:
return install_result
# Execute script
execution_result = self._execute_script(project_path)
if execution_result['success']:
# Collect output files
output_result = self._collect_output_files(project_path)
return {
'success': True,
'iteration': iteration,
'execution_output': execution_result['output'],
'execution_error': execution_result['error'],
'files': output_result['files'],
'project_dir': str(project_path)
}
else:
# Debug and retry if not last iteration
if iteration < max_iterations:
debug_result = self._debug_script(
project_path, execution_result['error'], iteration
)
if not debug_result['success']:
return debug_result
else:
return {
'success': False,
'error': execution_result['error'],
'iteration': iteration,
'project_dir': str(project_path)
}
return {
'success': False,
'error': f'Max iterations ({max_iterations}) exceeded',
'iteration': iteration,
'project_dir': str(project_path)
}
def _install_requirements(self, project_path: Path) -> Dict[str, Any]:
"""Install requirements for the project"""
try:
requirements_path = project_path / 'requirements.txt'
if not requirements_path.exists():
return {
'success': False,
'error': 'requirements.txt not found'
}
# Create virtual environment for this project
venv_path = project_path / 'venv'
if not venv_path.exists():
subprocess.run([
sys.executable, '-m', 'venv', str(venv_path)
], check=True, capture_output=True, text=True)
# Install requirements in virtual environment
if os.name == 'nt': # Windows
pip_executable = venv_path / 'Scripts' / 'pip'
else: # Unix/Linux/Mac
pip_executable = venv_path / 'bin' / 'pip'
result = subprocess.run([
str(pip_executable), 'install', '-r', str(requirements_path)
], cwd=str(project_path), capture_output=True, text=True, timeout=300)
if result.returncode == 0:
return {
'success': True,
'output': result.stdout,
'venv_path': str(venv_path)
}
else:
return {
'success': False,
'error': f"Package installation failed: {result.stderr}"
}
except subprocess.TimeoutExpired:
return {
'success': False,
'error': 'Package installation timed out'
}
except Exception as e:
logger.error(f"Error installing requirements: {str(e)}")
return {
'success': False,
'error': str(e)
}
def _execute_script(self, project_path: Path) -> Dict[str, Any]:
"""Execute the analysis script"""
try:
script_path = project_path / 'analysis.py'
if not script_path.exists():
return {
'success': False,
'error': 'analysis.py not found'
}
# Use virtual environment Python
venv_path = project_path / 'venv'
if os.name == 'nt': # Windows
python_executable = venv_path / 'Scripts' / 'python'
else: # Unix/Linux/Mac
python_executable = venv_path / 'bin' / 'python'
# Execute script
result = subprocess.run([
str(python_executable), 'analysis.py'
], cwd=str(project_path), capture_output=True, text=True, timeout=300)
return {
'success': result.returncode == 0,
'output': result.stdout,
'error': result.stderr,
'returncode': result.returncode
}
except subprocess.TimeoutExpired:
return {
'success': False,
'output': '',
'error': 'Script execution timed out'
}
except Exception as e:
logger.error(f"Error executing script: {str(e)}")
return {
'success': False,
'output': '',
'error': str(e)
}
def _debug_script(self, project_path: Path, error_message: str, iteration: int) -> Dict[str, Any]:
"""Debug and fix the script using LLM"""
try:
script_path = project_path / 'analysis.py'
with open(script_path, 'r') as f:
current_script = f.read()
# Get data info for debugging context
data_path = project_path / 'input_data.csv'
data_info = ""
if data_path.exists():
try:
df = pd.read_csv(data_path)
data_info = f"Data shape: {df.shape}, Columns: {list(df.columns)}"
except:
data_info = "Could not read data file"
debug_prompt = f"""Fix this Python analysis script that failed to execute:
CURRENT SCRIPT:
```python
{current_script}
```
ERROR MESSAGE:
{error_message}
DATA INFO:
{data_info}
DEBUGGING ITERATION: {iteration}
Please provide a corrected version of the complete script that:
1. Fixes the specific error mentioned
2. Includes proper error handling
3. Validates data before processing
4. Uses try-catch blocks for file operations
5. Provides informative print statements for debugging
Return only the corrected Python script, no explanations."""
response = self.statistical_agent._call_llm(debug_prompt, model="gpt-4o", max_tokens=3000)
# Extract corrected code
if '```python' in response:
script_start = response.find('```python') + 9
script_end = response.find('```', script_start)
if script_end == -1:
corrected_script = response[script_start:].strip()
else:
corrected_script = response[script_start:script_end].strip()
else:
corrected_script = response.strip()
# Save corrected script
with open(script_path, 'w') as f:
f.write(corrected_script)
return {
'success': True,
'corrected_script': corrected_script
}
except Exception as e:
logger.error(f"Error debugging script: {str(e)}")
return {
'success': False,
'error': str(e)
}
def _collect_output_files(self, project_path: Path) -> Dict[str, Any]:
"""Collect all output files generated by the analysis"""
try:
output_files = {
'plots': [],
'tables': [],
'conclusions': None,
'other': []
}
# Scan project directory for output files
for file_path in project_path.iterdir():
if file_path.is_file():
filename = file_path.name.lower()
if filename.endswith('.png') or filename.endswith('.jpg') or filename.endswith('.svg'):
output_files['plots'].append({
'name': file_path.name,
'path': str(file_path),
'size': file_path.stat().st_size
})
elif filename.endswith('.csv') and not filename == 'input_data.csv':
output_files['tables'].append({
'name': file_path.name,
'path': str(file_path),
'size': file_path.stat().st_size
})
elif filename == 'conclusions.txt':
with open(file_path, 'r') as f:
output_files['conclusions'] = {
'name': file_path.name,
'path': str(file_path),
'content': f.read()
}
elif filename not in ['analysis.py', 'requirements.txt', 'input_data.csv']:
output_files['other'].append({
'name': file_path.name,
'path': str(file_path),
'size': file_path.stat().st_size
})
return {
'success': True,
'files': output_files
}
except Exception as e:
logger.error(f"Error collecting output files: {str(e)}")
return {
'success': False,
'error': str(e),
'files': {}
}
def get_project_status(self, project_dir: str) -> Dict[str, Any]:
"""Get current status of an analysis project"""
try:
project_path = Path(project_dir)
if not project_path.exists():
return {
'success': False,
'error': 'Project directory not found'
}
status = {
'project_exists': True,
'has_script': (project_path / 'analysis.py').exists(),
'has_requirements': (project_path / 'requirements.txt').exists(),
'has_data': (project_path / 'input_data.csv').exists(),
'has_venv': (project_path / 'venv').exists(),
'output_files': {}
}
# Get output files
output_result = self._collect_output_files(project_path)
if output_result['success']:
status['output_files'] = output_result['files']
return {
'success': True,
'status': status
}
except Exception as e:
logger.error(f"Error getting project status: {str(e)}")
return {
'success': False,
'error': str(e)
}
Parameters
| Name | Type | Default | Kind |
|---|---|---|---|
bases |
- | - |
Parameter Details
bases: Parameter of type
Return Value
Returns unspecified type
Class Interface
Methods
__init__(self, config)
Purpose: Internal method: init
Parameters:
config: Type: Config
Returns: None
cleanup_old_analyses(self, session_id, keep_recent)
Purpose: Clean up old analysis directories for a session
Parameters:
session_id: Type: strkeep_recent: Type: int
Returns: None
cleanup_session(self, session_id)
Purpose: Clean up all files for a session
Parameters:
session_id: Type: str
Returns: None
generate_analysis_project(self, session_id, user_query, data_summary, analysis_config, session_data) -> Dict[str, Any]
Purpose: Generate complete analysis project with Python script, requirements.txt, and data
Parameters:
session_id: Type: struser_query: Type: strdata_summary: Type: Dict[str, Any]analysis_config: Type: Anysession_data: Type: Any
Returns: Returns Dict[str, Any]
_generate_analysis_script(self, user_query, data_summary, analysis_config, project_dir) -> Dict[str, Any]
Purpose: Generate standalone Python analysis script
Parameters:
user_query: Type: strdata_summary: Type: Dict[str, Any]analysis_config: Type: Anyproject_dir: Type: Path
Returns: Returns Dict[str, Any]
_generate_requirements(self, script_content, project_dir) -> Dict[str, Any]
Purpose: Generate requirements.txt based on script imports
Parameters:
script_content: Type: strproject_dir: Type: Path
Returns: Returns Dict[str, Any]
_create_data_input(self, session_id, project_dir, session_data) -> Dict[str, Any]
Purpose: Copy session data to project directory as input_data.csv
Parameters:
session_id: Type: strproject_dir: Type: Pathsession_data: Type: Any
Returns: Returns Dict[str, Any]
execute_analysis_project(self, project_dir, max_iterations) -> Dict[str, Any]
Purpose: Execute the analysis project with iterative debugging
Parameters:
project_dir: Type: strmax_iterations: Type: int
Returns: Returns Dict[str, Any]
_install_requirements(self, project_path) -> Dict[str, Any]
Purpose: Install requirements for the project
Parameters:
project_path: Type: Path
Returns: Returns Dict[str, Any]
_execute_script(self, project_path) -> Dict[str, Any]
Purpose: Execute the analysis script
Parameters:
project_path: Type: Path
Returns: Returns Dict[str, Any]
_debug_script(self, project_path, error_message, iteration) -> Dict[str, Any]
Purpose: Debug and fix the script using LLM
Parameters:
project_path: Type: Patherror_message: Type: striteration: Type: int
Returns: Returns Dict[str, Any]
_collect_output_files(self, project_path) -> Dict[str, Any]
Purpose: Collect all output files generated by the analysis
Parameters:
project_path: Type: Path
Returns: Returns Dict[str, Any]
get_project_status(self, project_dir) -> Dict[str, Any]
Purpose: Get current status of an analysis project
Parameters:
project_dir: Type: str
Returns: Returns Dict[str, Any]
Required Imports
import os
import sys
import subprocess
import tempfile
import json
Usage Example
# Example usage:
# result = AgentExecutor(bases)
Tags
Similar Components
AI-powered semantic similarity - components with related functionality:
-
class AgentExecutor_v1 99.0% similar
-
class AgentExecutor 98.4% similar
-
class ScriptExecutor 59.8% similar
-
function test_agent_executor 58.9% similar
-
class ScriptExecutor_v1 58.4% similar