class AgentExecutor_v1
Agent-based script executor that generates standalone Python files, manages dependencies, and provides iterative debugging capabilities
/tf/active/vicechatdev/full_smartstat/agent_executor.py
25 - 938
moderate
Purpose
Agent-based script executor that generates standalone Python files, manages dependencies, and provides iterative debugging capabilities
Source Code
class AgentExecutor:
"""
Agent-based script executor that generates standalone Python files,
manages dependencies, and provides iterative debugging capabilities
"""
def __init__(self, config: Config):
self.config = config
self.statistical_agent = StatisticalAgent(config)
self.scripts_dir = Path(config.GENERATED_SCRIPTS_FOLDER)
self.sandbox_dir = Path(config.SANDBOX_FOLDER)
self.output_dir = Path(config.OUTPUT_DIR)
self.default_model = 'gpt-4o' # Default model, can be overridden
# Ensure directories exist
self.scripts_dir.mkdir(exist_ok=True)
self.sandbox_dir.mkdir(exist_ok=True)
self.output_dir.mkdir(exist_ok=True)
def cleanup_old_analyses(self, session_id: str, keep_recent: int = 5):
"""Clean up old analysis directories for a session"""
try:
session_dir = self.output_dir / session_id
if not session_dir.exists():
return
# Get all analysis directories
analysis_dirs = [d for d in session_dir.iterdir()
if d.is_dir() and d.name.startswith('analysis_')]
# Sort by modification time (newest first)
analysis_dirs.sort(key=lambda x: x.stat().st_mtime, reverse=True)
# Remove old analysis directories
for old_dir in analysis_dirs[keep_recent:]:
logger.info(f"Cleaning up old analysis directory: {old_dir}")
shutil.rmtree(old_dir)
except Exception as e:
logger.warning(f"Error cleaning up old analyses for session {session_id}: {str(e)}")
def cleanup_session(self, session_id: str):
"""Clean up all files for a session"""
try:
# Clean up output directory
session_dir = self.output_dir / session_id
if session_dir.exists():
logger.info(f"Cleaning up session directory: {session_dir}")
shutil.rmtree(session_dir)
# Clean up scripts directory
scripts_session_dir = self.scripts_dir / session_id
if scripts_session_dir.exists():
logger.info(f"Cleaning up scripts directory: {scripts_session_dir}")
shutil.rmtree(scripts_session_dir)
# Clean up sandbox directory
sandbox_session_dir = self.sandbox_dir / session_id
if sandbox_session_dir.exists():
logger.info(f"Cleaning up sandbox directory: {sandbox_session_dir}")
shutil.rmtree(sandbox_session_dir)
# Clean up sessions folder (data files)
sessions_folder = Path(self.config.SESSIONS_FOLDER)
if sessions_folder.exists():
for file_path in sessions_folder.glob(f"*{session_id}*"):
if file_path.is_file():
logger.info(f"Cleaning up session data file: {file_path}")
file_path.unlink()
except Exception as e:
logger.warning(f"Error cleaning up session {session_id}: {str(e)}")
def generate_analysis_project(self, session_id: str, user_query: str,
data_summary: Dict[str, Any], analysis_config: Any,
session_data: Any = None, model: str = None) -> Dict[str, Any]:
"""
Generate complete analysis project with Python script, requirements.txt, and data
"""
if model is None:
model = self.default_model
try:
# Create session directory
session_dir = self.output_dir / session_id
session_dir.mkdir(exist_ok=True)
# Clean up old analyses to prevent accumulation
if self.config.AUTO_CLEANUP_ENABLED:
self.cleanup_old_analyses(session_id, keep_recent=self.config.KEEP_RECENT_ANALYSES)
# Generate unique project ID
project_id = str(uuid.uuid4())[:8]
project_dir = session_dir / f"analysis_{project_id}"
project_dir.mkdir(exist_ok=True)
# Generate Python script
script_result = self._generate_analysis_script(
user_query, data_summary, analysis_config, project_dir, model
)
if not script_result['success']:
return script_result
# Generate requirements.txt
requirements_result = self._generate_requirements(
script_result['script'], project_dir, model
)
if not requirements_result['success']:
return requirements_result
# Create data input file
data_input_result = self._create_data_input(
session_id, project_dir, session_data
)
if not data_input_result['success']:
return data_input_result
return {
'success': True,
'project_id': project_id,
'project_dir': str(project_dir),
'script_path': str(project_dir / 'analysis.py'),
'requirements_path': str(project_dir / 'requirements.txt'),
'data_path': str(project_dir / 'input_data.csv'),
'script_content': script_result['script'],
'requirements': requirements_result['requirements']
}
except Exception as e:
logger.error(f"Error generating analysis project: {str(e)}")
return {
'success': False,
'error': str(e)
}
def _generate_analysis_script(self, user_query: str, data_summary: Dict[str, Any],
analysis_config: Any, project_dir: Path, model: str = None) -> Dict[str, Any]:
"""Generate standalone Python analysis script"""
if model is None:
model = self.default_model
prompt = f"""Generate a complete, standalone Python analysis script that addresses this request:
USER QUERY: "{user_query}"
ANALYSIS CONFIGURATION:
- Type: {analysis_config.analysis_type.value if analysis_config and hasattr(analysis_config, 'analysis_type') else 'descriptive'}
- Target variables: {getattr(analysis_config, 'target_variables', []) if analysis_config else []}
- Grouping variables: {getattr(analysis_config, 'grouping_variables', []) if analysis_config else []}
- Significance level: {getattr(analysis_config, 'significance_level', 0.05) if analysis_config else 0.05}
DATA INFORMATION:
- Shape: {data_summary.get('shape', 'Unknown')}
- Columns: {list(data_summary.get('column_info', {}).keys())}
- Column types: {json.dumps(data_summary.get('column_info', {}), indent=2)}
REQUIREMENTS:
1. Create a standalone Python script that reads data from 'input_data.csv'
2. Include all necessary imports at the top
3. Perform comprehensive statistical analysis based on the user query
4. Generate visualizations and save them as PNG files with descriptive names
5. Create summary tables and save them as CSV files
6. Write textual conclusions and interpretations to a 'conclusions.txt' file
7. Handle errors gracefully with informative error messages
8. Use professional statistical practices and proper data validation
OUTPUT STRUCTURE:
- Save plots as: plot_01_description.png, plot_02_description.png, etc.
- Save tables as: table_01_description.csv, table_02_description.csv, etc.
- Save conclusions as: conclusions.txt
- Print progress messages to console
SCRIPT TEMPLATE:
```python
#!/usr/bin/env python3
\"\"\"
Statistical Analysis Script
Generated by SmartStat Agent
Query: {user_query}
Generated: {datetime.now().isoformat()}
\"\"\"
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from scipy import stats
import warnings
warnings.filterwarnings('ignore')
def main():
print("Starting statistical analysis...")
print(f"Query: {user_query}")
# Load data
try:
df = pd.read_csv('input_data.csv')
print(f"Data loaded successfully: {{df.shape}}")
except Exception as e:
print(f"Error loading data: {{e}}")
return
# Your analysis code here...
print("Analysis completed successfully!")
if __name__ == "__main__":
main()
```
Generate the complete Python script following these guidelines."""
try:
# Generate script using LLM
response = self.statistical_agent._call_llm(prompt, model=model, max_tokens=12000)
# Extract Python code from response
if '```python' in response:
script_start = response.find('```python') + 9
script_end = response.find('```', script_start)
if script_end == -1:
script_code = response[script_start:].strip()
else:
script_code = response[script_start:script_end].strip()
else:
script_code = response.strip()
# Basic syntax validation
try:
compile(script_code, '<string>', 'exec')
except SyntaxError as e:
logger.warning(f"Generated script has syntax error: {e}")
# Don't fail here, let the execution loop handle it
# Check if script appears complete
if not self._is_script_complete(script_code):
logger.warning("Generated script appears to be incomplete (possibly truncated)")
# Don't fail here, let the execution loop handle it
# Save script to file
script_path = project_dir / 'analysis.py'
with open(script_path, 'w') as f:
f.write(script_code)
return {
'success': True,
'script': script_code,
'script_path': str(script_path)
}
except Exception as e:
logger.error(f"Error generating analysis script: {str(e)}")
return {
'success': False,
'error': str(e)
}
def _generate_requirements(self, script_content: str, project_dir: Path, model: str = None) -> Dict[str, Any]:
"""Generate requirements.txt based on script imports"""
if model is None:
model = self.default_model
prompt = f"""Analyze this Python script and generate a requirements.txt file with the exact package versions needed:
PYTHON SCRIPT:
```python
{script_content}
```
Generate a requirements.txt file that includes:
1. All imported packages with specific versions for reproducibility
2. Common data science packages (pandas, numpy, matplotlib, seaborn, scipy, etc.)
3. Any specialized statistical packages that might be needed
4. Use recent stable versions
Format as a clean requirements.txt file with one package per line in format: package==version
Example:
pandas==2.0.3
numpy==1.24.3
matplotlib==3.7.2
seaborn==0.12.2
scipy==1.11.1
Provide only the requirements.txt content, no explanations."""
try:
response = self.statistical_agent._call_llm(prompt, model=model, max_tokens=1000)
# Clean up the response
requirements_content = response.strip()
if '```' in requirements_content:
start = requirements_content.find('```')
end = requirements_content.rfind('```')
if start != -1 and end != -1 and end > start:
requirements_content = requirements_content[start+3:end].strip()
# Remove any language specifiers
if requirements_content.startswith(('txt', 'text', 'requirements', 'pip-requirements', 'plaintext')):
lines = requirements_content.split('\n')
# Find first line that looks like a package requirement
start_idx = 0
for i, line in enumerate(lines):
if '==' in line or '>=' in line or '<=' in line or '>' in line or '<' in line:
start_idx = i
break
requirements_content = '\n'.join(lines[start_idx:])
# Ensure clean format - only package requirements
lines = requirements_content.split('\n')
clean_lines = []
for line in lines:
line = line.strip()
if line and not line.startswith('#') and ('==' in line or '>=' in line):
clean_lines.append(line)
requirements_content = '\n'.join(clean_lines)
# Save requirements.txt
requirements_path = project_dir / 'requirements.txt'
with open(requirements_path, 'w') as f:
f.write(requirements_content)
# Parse requirements for return
requirements_list = [line.strip() for line in requirements_content.split('\n')
if line.strip() and not line.strip().startswith('#')]
return {
'success': True,
'requirements': requirements_list,
'requirements_path': str(requirements_path),
'requirements_content': requirements_content
}
except Exception as e:
logger.error(f"Error generating requirements: {str(e)}")
return {
'success': False,
'error': str(e)
}
def _create_data_input(self, session_id: str, project_dir: Path, session_data: Any = None) -> Dict[str, Any]:
"""Copy session data to project directory as input_data.csv"""
try:
df = None
# Try to use provided session data first
if session_data is not None:
df = session_data
else:
# Fallback to loading from session storage
try:
from services import StatisticalAnalysisService
service = StatisticalAnalysisService(self.config)
df = service._load_session_data(session_id)
except ImportError:
# Handle circular import by loading data directly
import pandas as pd
data_path = self.config.SESSIONS_FOLDER / f"{session_id}_data.pkl"
if data_path.exists():
df = pd.read_pickle(data_path)
if df is None:
return {
'success': False,
'error': 'No data found for session'
}
# Save to project directory
data_path = project_dir / 'input_data.csv'
df.to_csv(data_path, index=False)
return {
'success': True,
'data_path': str(data_path),
'shape': df.shape
}
except Exception as e:
logger.error(f"Error creating data input: {str(e)}")
return {
'success': False,
'error': str(e)
}
def execute_analysis_project(self, project_dir: str, max_iterations: int = 3, model: str = None) -> Dict[str, Any]:
"""
Execute the analysis project with iterative debugging and comprehensive logging
"""
if model is None:
model = self.default_model
project_path = Path(project_dir)
iteration = 0
# Enhanced execution tracking
execution_log = []
debug_iterations = []
execution_log.append({
'timestamp': datetime.now().isoformat(),
'event': 'execution_started',
'message': f'Starting analysis execution with max {max_iterations} iterations',
'model': model
})
while iteration < max_iterations:
iteration += 1
logger.info(f"Execution attempt {iteration}/{max_iterations}")
execution_log.append({
'timestamp': datetime.now().isoformat(),
'event': 'iteration_started',
'iteration': iteration,
'message': f'Starting execution attempt {iteration}/{max_iterations}'
})
# Install requirements
install_result = self._install_requirements(project_path)
if not install_result['success']:
execution_log.append({
'timestamp': datetime.now().isoformat(),
'event': 'install_failed',
'iteration': iteration,
'error': install_result['error']
})
return {
**install_result,
'execution_log': execution_log,
'debug_iterations': debug_iterations
}
execution_log.append({
'timestamp': datetime.now().isoformat(),
'event': 'requirements_installed',
'iteration': iteration,
'message': 'Python packages installed successfully'
})
# Execute script
execution_result = self._execute_script(project_path)
execution_log.append({
'timestamp': datetime.now().isoformat(),
'event': 'script_executed',
'iteration': iteration,
'success': execution_result['success'],
'output_length': len(execution_result.get('output', '')),
'error_present': bool(execution_result.get('error'))
})
if execution_result['success']:
# Collect output files
output_result = self._collect_output_files(project_path)
execution_log.append({
'timestamp': datetime.now().isoformat(),
'event': 'execution_completed',
'iteration': iteration,
'files_generated': len(output_result.get('files', [])),
'plots': len(output_result.get('plots', [])),
'tables': len(output_result.get('tables', [])),
'message': f'Analysis completed successfully on iteration {iteration}'
})
return {
'success': True,
'iteration': iteration,
'execution_output': execution_result['output'],
'execution_error': execution_result['error'],
'files': output_result['files'],
'generated_files': output_result.get('files', []),
'plots': output_result.get('plots', []),
'tables': output_result.get('tables', []),
'project_dir': str(project_path),
'execution_log': execution_log,
'debug_iterations': debug_iterations,
'execution_time': self._calculate_execution_time(execution_log)
}
else:
# Debug and retry if not last iteration
execution_log.append({
'timestamp': datetime.now().isoformat(),
'event': 'execution_failed',
'iteration': iteration,
'error': execution_result['error'][:500] + '...' if len(execution_result['error']) > 500 else execution_result['error']
})
if iteration < max_iterations:
execution_log.append({
'timestamp': datetime.now().isoformat(),
'event': 'debug_started',
'iteration': iteration,
'message': f'Starting LLM-based debugging for iteration {iteration}'
})
debug_result = self._debug_script(
project_path, execution_result['error'], iteration, model
)
debug_iterations.append({
'iteration': iteration,
'original_error': execution_result['error'],
'debug_success': debug_result['success'],
'debug_timestamp': datetime.now().isoformat()
})
if not debug_result['success']:
execution_log.append({
'timestamp': datetime.now().isoformat(),
'event': 'debug_failed',
'iteration': iteration,
'error': debug_result['error']
})
return {
**debug_result,
'execution_log': execution_log,
'debug_iterations': debug_iterations
}
else:
execution_log.append({
'timestamp': datetime.now().isoformat(),
'event': 'debug_completed',
'iteration': iteration,
'message': 'Script debugged and corrected by LLM'
})
else:
execution_log.append({
'timestamp': datetime.now().isoformat(),
'event': 'max_iterations_reached',
'iteration': iteration,
'message': f'Max iterations ({max_iterations}) reached, execution failed'
})
return {
'success': False,
'error': execution_result['error'],
'iteration': iteration,
'project_dir': str(project_path),
'execution_log': execution_log,
'debug_iterations': debug_iterations
}
return {
'success': False,
'error': f'Max iterations ({max_iterations}) exceeded',
'iteration': iteration,
'project_dir': str(project_path),
'execution_log': execution_log,
'debug_iterations': debug_iterations
}
def _calculate_execution_time(self, execution_log: List[Dict]) -> Dict[str, Any]:
"""Calculate execution timing from logs"""
if not execution_log:
return {}
start_time = None
end_time = None
for log_entry in execution_log:
if log_entry['event'] == 'execution_started':
start_time = datetime.fromisoformat(log_entry['timestamp'])
elif log_entry['event'] in ['execution_completed', 'max_iterations_reached']:
end_time = datetime.fromisoformat(log_entry['timestamp'])
if start_time and end_time:
duration = end_time - start_time
return {
'start_time': start_time.isoformat(),
'end_time': end_time.isoformat(),
'duration_seconds': duration.total_seconds(),
'duration_formatted': str(duration)
}
return {}
def _install_requirements(self, project_path: Path) -> Dict[str, Any]:
"""Install requirements for the project"""
try:
requirements_path = project_path / 'requirements.txt'
if not requirements_path.exists():
return {
'success': False,
'error': 'requirements.txt not found'
}
# Create virtual environment for this project
venv_path = project_path / 'venv'
if not venv_path.exists():
subprocess.run([
sys.executable, '-m', 'venv', str(venv_path)
], check=True, capture_output=True, text=True)
# Install requirements in virtual environment
if os.name == 'nt': # Windows
pip_executable = venv_path / 'Scripts' / 'pip'
else: # Unix/Linux/Mac
pip_executable = venv_path / 'bin' / 'pip'
result = subprocess.run([
str(pip_executable), 'install', '-r', str(requirements_path)
], cwd=str(project_path), capture_output=True, text=True, timeout=300)
if result.returncode == 0:
return {
'success': True,
'output': result.stdout,
'venv_path': str(venv_path)
}
else:
return {
'success': False,
'error': f"Package installation failed: {result.stderr}"
}
except subprocess.TimeoutExpired:
return {
'success': False,
'error': 'Package installation timed out'
}
except Exception as e:
logger.error(f"Error installing requirements: {str(e)}")
return {
'success': False,
'error': str(e)
}
def _execute_script(self, project_path: Path) -> Dict[str, Any]:
"""Execute the analysis script"""
try:
script_path = project_path / 'analysis.py'
if not script_path.exists():
return {
'success': False,
'error': 'analysis.py not found'
}
# Use virtual environment Python
venv_path = project_path / 'venv'
if os.name == 'nt': # Windows
python_executable = venv_path / 'Scripts' / 'python'
else: # Unix/Linux/Mac
python_executable = venv_path / 'bin' / 'python'
# Execute script
result = subprocess.run([
str(python_executable), 'analysis.py'
], cwd=str(project_path), capture_output=True, text=True, timeout=300)
return {
'success': result.returncode == 0,
'output': result.stdout,
'error': result.stderr,
'returncode': result.returncode
}
except subprocess.TimeoutExpired:
return {
'success': False,
'output': '',
'error': 'Script execution timed out'
}
except Exception as e:
logger.error(f"Error executing script: {str(e)}")
return {
'success': False,
'output': '',
'error': str(e)
}
def _debug_script(self, project_path: Path, error_message: str, iteration: int, model: str = None) -> Dict[str, Any]:
"""Debug and fix the script using LLM"""
if model is None:
model = self.default_model
try:
script_path = project_path / 'analysis.py'
with open(script_path, 'r') as f:
current_script = f.read()
# Get data info for debugging context
data_path = project_path / 'input_data.csv'
data_info = ""
if data_path.exists():
try:
df = pd.read_csv(data_path)
data_info = f"Data shape: {df.shape}, Columns: {list(df.columns)}"
except:
data_info = "Could not read data file"
debug_prompt = f"""Fix this Python analysis script that failed to execute:
CURRENT SCRIPT:
```python
{current_script}
```
ERROR MESSAGE:
{error_message}
DATA INFO:
{data_info}
DEBUGGING ITERATION: {iteration}
⚠️ CRITICAL: If the script appears incomplete or ends abruptly (like ending with 'stats.' or incomplete function calls),
this means the original script was truncated. You MUST complete the truncated code AND ensure the entire script is
self-contained and complete. Check if the script ends mid-statement and complete any unfinished code blocks.
CRITICAL DEBUGGING INSTRUCTIONS:
If you see "SyntaxError: unterminated string literal", this means there are incomplete string quotes in the code.
You MUST:
1. Carefully check ALL string literals (f-strings, print statements, etc.)
2. Ensure every opening quote has a matching closing quote
3. Fix any line breaks within strings using proper escape sequences
4. Pay special attention to f-strings and multi-line strings
OTHER COMMON FIXES:
- Import errors: Add missing imports at the top
- File not found: Check if input_data.csv exists and handle gracefully
- Column name errors: Use df.columns to check available columns first
- Data type issues: Convert data types explicitly (pd.to_numeric, pd.to_datetime)
- Empty dataframe issues: Check if df.empty before processing
- Memory issues: Use chunking for large datasets
- Plotting errors: Ensure matplotlib backend is set correctly
Please provide a corrected version of the complete script that:
1. FIRST AND FOREMOST: Fixes all syntax errors (especially unterminated strings)
2. Includes robust error handling with try-except blocks
3. Validates data existence and structure before processing
4. Uses defensive programming (check if columns exist, handle empty data)
5. Adds informative print statements showing progress and debug info
6. Handles common pandas/matplotlib issues gracefully
7. Ensures all file operations are wrapped in try-except blocks
8. Uses proper matplotlib configuration for headless environments
CRITICAL: Every string must be properly terminated. Check every single quote mark.
Return only the corrected Python script, no explanations."""
response = self.statistical_agent._call_llm(debug_prompt, model=model, max_tokens=12000)
# Extract corrected code
if '```python' in response:
script_start = response.find('```python') + 9
script_end = response.find('```', script_start)
if script_end == -1:
corrected_script = response[script_start:].strip()
else:
corrected_script = response[script_start:script_end].strip()
else:
corrected_script = response.strip()
# Validate corrected script syntax
try:
compile(corrected_script, '<string>', 'exec')
logger.info(f"Debug iteration {iteration}: Syntax validation passed")
except SyntaxError as e:
logger.warning(f"Debug iteration {iteration}: Generated script still has syntax error: {e}")
# Still save it, the next iteration might fix it
# Check if corrected script appears complete
if not self._is_script_complete(corrected_script):
logger.warning(f"Debug iteration {iteration}: Script appears incomplete (possibly truncated)")
# Still save it, the next iteration might fix it
# Save corrected script
with open(script_path, 'w') as f:
f.write(corrected_script)
return {
'success': True,
'corrected_script': corrected_script
}
except Exception as e:
logger.error(f"Error debugging script: {str(e)}")
return {
'success': False,
'error': str(e)
}
def _collect_output_files(self, project_path: Path) -> Dict[str, Any]:
"""Collect all output files generated by the analysis"""
try:
output_files = {
'plots': [],
'tables': [],
'conclusions': None,
'other': []
}
# Scan project directory for output files
for file_path in project_path.iterdir():
if file_path.is_file():
filename = file_path.name.lower()
if filename.endswith('.png') or filename.endswith('.jpg') or filename.endswith('.svg'):
output_files['plots'].append({
'name': file_path.name,
'path': str(file_path),
'size': file_path.stat().st_size
})
elif filename.endswith('.csv') and not filename == 'input_data.csv':
output_files['tables'].append({
'name': file_path.name,
'path': str(file_path),
'size': file_path.stat().st_size
})
elif filename == 'conclusions.txt':
with open(file_path, 'r') as f:
output_files['conclusions'] = {
'name': file_path.name,
'path': str(file_path),
'content': f.read()
}
elif filename not in ['analysis.py', 'requirements.txt', 'input_data.csv']:
output_files['other'].append({
'name': file_path.name,
'path': str(file_path),
'size': file_path.stat().st_size
})
return {
'success': True,
'files': output_files
}
except Exception as e:
logger.error(f"Error collecting output files: {str(e)}")
return {
'success': False,
'error': str(e),
'files': {}
}
def get_project_status(self, project_dir: str) -> Dict[str, Any]:
"""Get current status of an analysis project"""
try:
project_path = Path(project_dir)
if not project_path.exists():
return {
'success': False,
'error': 'Project directory not found'
}
status = {
'project_exists': True,
'has_script': (project_path / 'analysis.py').exists(),
'has_requirements': (project_path / 'requirements.txt').exists(),
'has_data': (project_path / 'input_data.csv').exists(),
'has_venv': (project_path / 'venv').exists(),
'output_files': {}
}
# Get output files
output_result = self._collect_output_files(project_path)
if output_result['success']:
status['output_files'] = output_result['files']
return {
'success': True,
'status': status
}
except Exception as e:
logger.error(f"Error getting project status: {str(e)}")
return {
'success': False,
'error': str(e)
}
def _is_script_complete(self, script_content: str) -> bool:
"""Check if the script appears to be complete (not truncated)"""
try:
# Look for common signs of incomplete scripts
incomplete_indicators = [
'stats.', # Incomplete function call like 'stats.'
'print("', # Incomplete print statement
'f"', # Incomplete f-string
'= ', # Assignment without value
'import ', # Incomplete import
'.append(', # Incomplete method call
'for ', # Incomplete for loop
'if ', # Incomplete if statement
'def ', # Incomplete function definition
]
lines = script_content.strip().split('\n')
if not lines:
return False
last_line = lines[-1].strip()
# Check if last line is incomplete
for indicator in incomplete_indicators:
if last_line.endswith(indicator):
return False
# Check for unmatched quotes or parentheses in last few lines
for line in lines[-5:]: # Check last 5 lines
if line.count('"') % 2 != 0 or line.count("'") % 2 != 0:
return False
if line.count('(') != line.count(')'):
return False
if line.count('[') != line.count(']'):
return False
if line.count('{') != line.count('}'):
return False
return True
except Exception:
return False
Parameters
| Name | Type | Default | Kind |
|---|---|---|---|
bases |
- | - |
Parameter Details
bases: Parameter of type
Return Value
Returns unspecified type
Class Interface
Methods
__init__(self, config)
Purpose: Internal method: init
Parameters:
config: Type: Config
Returns: None
cleanup_old_analyses(self, session_id, keep_recent)
Purpose: Clean up old analysis directories for a session
Parameters:
session_id: Type: strkeep_recent: Type: int
Returns: None
cleanup_session(self, session_id)
Purpose: Clean up all files for a session
Parameters:
session_id: Type: str
Returns: None
generate_analysis_project(self, session_id, user_query, data_summary, analysis_config, session_data, model) -> Dict[str, Any]
Purpose: Generate complete analysis project with Python script, requirements.txt, and data
Parameters:
session_id: Type: struser_query: Type: strdata_summary: Type: Dict[str, Any]analysis_config: Type: Anysession_data: Type: Anymodel: Type: str
Returns: Returns Dict[str, Any]
_generate_analysis_script(self, user_query, data_summary, analysis_config, project_dir, model) -> Dict[str, Any]
Purpose: Generate standalone Python analysis script
Parameters:
user_query: Type: strdata_summary: Type: Dict[str, Any]analysis_config: Type: Anyproject_dir: Type: Pathmodel: Type: str
Returns: Returns Dict[str, Any]
_generate_requirements(self, script_content, project_dir, model) -> Dict[str, Any]
Purpose: Generate requirements.txt based on script imports
Parameters:
script_content: Type: strproject_dir: Type: Pathmodel: Type: str
Returns: Returns Dict[str, Any]
_create_data_input(self, session_id, project_dir, session_data) -> Dict[str, Any]
Purpose: Copy session data to project directory as input_data.csv
Parameters:
session_id: Type: strproject_dir: Type: Pathsession_data: Type: Any
Returns: Returns Dict[str, Any]
execute_analysis_project(self, project_dir, max_iterations, model) -> Dict[str, Any]
Purpose: Execute the analysis project with iterative debugging and comprehensive logging
Parameters:
project_dir: Type: strmax_iterations: Type: intmodel: Type: str
Returns: Returns Dict[str, Any]
_calculate_execution_time(self, execution_log) -> Dict[str, Any]
Purpose: Calculate execution timing from logs
Parameters:
execution_log: Type: List[Dict]
Returns: Returns Dict[str, Any]
_install_requirements(self, project_path) -> Dict[str, Any]
Purpose: Install requirements for the project
Parameters:
project_path: Type: Path
Returns: Returns Dict[str, Any]
_execute_script(self, project_path) -> Dict[str, Any]
Purpose: Execute the analysis script
Parameters:
project_path: Type: Path
Returns: Returns Dict[str, Any]
_debug_script(self, project_path, error_message, iteration, model) -> Dict[str, Any]
Purpose: Debug and fix the script using LLM
Parameters:
project_path: Type: Patherror_message: Type: striteration: Type: intmodel: Type: str
Returns: Returns Dict[str, Any]
_collect_output_files(self, project_path) -> Dict[str, Any]
Purpose: Collect all output files generated by the analysis
Parameters:
project_path: Type: Path
Returns: Returns Dict[str, Any]
get_project_status(self, project_dir) -> Dict[str, Any]
Purpose: Get current status of an analysis project
Parameters:
project_dir: Type: str
Returns: Returns Dict[str, Any]
_is_script_complete(self, script_content) -> bool
Purpose: Check if the script appears to be complete (not truncated)
Parameters:
script_content: Type: str
Returns: Returns bool
Required Imports
import os
import sys
import subprocess
import tempfile
import json
Usage Example
# Example usage:
# result = AgentExecutor(bases)
Tags
Similar Components
AI-powered semantic similarity - components with related functionality:
-
class AgentExecutor_v2 99.0% similar
-
class AgentExecutor 98.9% similar
-
class ScriptExecutor 59.8% similar
-
function test_agent_executor 59.1% similar
-
class ScriptExecutor_v1 58.9% similar