class EnhancedSQLWorkflow
Enhanced SQL workflow with iterative optimization
/tf/active/vicechatdev/full_smartstat/enhanced_sql_workflow.py
58 - 2004
moderate
Purpose
Enhanced SQL workflow with iterative optimization
Source Code
class EnhancedSQLWorkflow:
"""Enhanced SQL workflow with iterative optimization"""
def __init__(self, config: Config, statistical_agent: StatisticalAgent, discovered_schema=None):
self.config = config
self.statistical_agent = statistical_agent
self.data_processor = DataProcessor(config)
self.max_iterations = 3
self.discovered_schema = discovered_schema # Use dynamic schema if available
def execute_integrated_workflow(self,
combined_request: str,
session_id: str = None,
schema_file: str = 'database_schema_20251003_120434.json',
connection_config: str = 'sql_config.py',
max_rows: Optional[int] = 10000,
model: str = 'gpt-4o',
quality_threshold: int = 70,
include_incomplete: bool = True,
optimize_for_analysis: bool = True,
include_dataset_context: bool = False,
target_tables: List[str] = None,
specific_columns: List[str] = None,
specific_relationships: List[str] = None) -> Dict[str, Any]:
"""
Execute the enhanced workflow that combines data selection and analysis
Args:
combined_request: User's combined data and analysis request
schema_file: Database schema file
connection_config: Database connection configuration
max_rows: Maximum number of rows to retrieve
model: AI model to use for LLM calls
quality_threshold: Minimum quality score (60-90)
include_incomplete: Whether to include records with missing data
optimize_for_analysis: Whether to optimize data structure for analysis
include_dataset_context: Whether to include previous dataset and SQL context
target_tables: Specific tables to focus on (optional)
specific_columns: Specific columns to include (optional)
specific_relationships: Specific relationships to include in JOINs (optional)
Returns:
Dict containing the final dataset, SQL queries, and analysis preparation
"""
try:
logger.info(f"Starting enhanced SQL workflow for: {combined_request}")
# Step 1: Parse the combined request
analysis_request = self._parse_combined_request(combined_request, max_rows, model)
# Step 1.5: Check for SQL enhancement scenario (check BEFORE parsing to preserve keywords)
if include_dataset_context and session_id:
logger.info(f"Checking for SQL enhancement - include_dataset_context: {include_dataset_context}, session_id: {session_id}")
existing_sql, sql_context = self._get_existing_sql_for_enhancement(session_id)
logger.info(f"Retrieved existing SQL: {len(existing_sql) if existing_sql else 0} characters")
if existing_sql:
# Check if user is asking to enhance/add to existing data using ORIGINAL request
enrichment_keywords = ['add', 'enrich', 'augment', 'include', 'extend', 'enhance', 'combine with', 'more information', 'additional']
is_enrichment_request = any(keyword in combined_request.lower() for keyword in enrichment_keywords)
logger.info(f"Enhancement detection - original request: '{combined_request}', contains enrichment keywords: {is_enrichment_request}")
if is_enrichment_request:
logger.info(f"Detected SQL enhancement request - building upon existing SQL query")
# Use SQL enhancement workflow instead of regular workflow
workflow_results = self._enhance_existing_sql(
existing_sql,
combined_request,
connection_config,
max_rows
)
# Process the enhanced SQL results
iterations = workflow_results.get('iterations', [])
final_dataset = workflow_results.get('dataframe', pd.DataFrame())
# Enhanced success detection: consider enhancement successful if we got good data
enhancement_produced_data = not final_dataset.empty
formal_success = workflow_results.get('final_success', False)
# Calculate success based on data quality, not just formal success flag
data_quality_good = (len(final_dataset) > 100 and len(final_dataset.columns) > 5) if enhancement_produced_data else False
enhancement_actually_successful = formal_success or data_quality_good
if enhancement_actually_successful and enhancement_produced_data:
logger.info(f"SQL enhancement successful: {len(final_dataset)} rows, {len(final_dataset.columns)} columns (formal_success: {formal_success}, data_quality_good: {data_quality_good})")
# Skip the regular two-pass workflow since we used enhancement
skip_regular_workflow = True
else:
logger.warning(f"SQL enhancement insufficient: formal_success={formal_success}, data_rows={len(final_dataset) if enhancement_produced_data else 0}, data_cols={len(final_dataset.columns) if enhancement_produced_data else 0}")
# Even if enhancement failed, we still want to use the enhanced context
# Don't skip workflow, but ensure context is preserved
skip_regular_workflow = False
# If enhancement produced decent data despite formal failure, consider using it
if enhancement_produced_data and len(final_dataset.columns) >= 5:
logger.info(f"Enhancement produced reasonable results despite formal failure: {len(final_dataset)} rows, {len(final_dataset.columns)} columns - using as primary result")
# Use the enhancement results even if marked as "failed"
iterations = workflow_results.get('iterations', [])
skip_regular_workflow = True
else:
skip_regular_workflow = False
# Include comprehensive context for regular workflow
previous_context = self._get_previous_dataset_context(session_id)
if previous_context:
# Enhanced context with explicit instructions to preserve/extend existing dataset
enhanced_data_request = f"""DATASET ENHANCEMENT REQUEST:
{analysis_request.data_description}
{previous_context}
CRITICAL INSTRUCTIONS FOR SQL GENERATION:
1. This is a request to ENHANCE/EXTEND the existing dataset, not replace it
2. The existing SQL query should be used as the foundation
3. Add JOINs to include the requested additional information
4. Preserve ALL existing columns and rows from the current dataset
5. Use LEFT JOINs to ensure no data loss
6. Focus on adding the specific new information requested while maintaining existing structure
7. Target result should have the same or more rows as the existing dataset"""
analysis_request.data_description = enhanced_data_request
else:
skip_regular_workflow = False
else:
skip_regular_workflow = False
# Step 3: Execute SQL workflow (enhancement or regular)
if not skip_regular_workflow:
# Step 2: Load database schema (prefer dynamic schema if available)
if self.discovered_schema:
schema = self._convert_discovered_schema_to_database_schema(self.discovered_schema)
logger.info(f"Using dynamic schema with {schema.system_architecture.get('total_tables', 0)} tables")
else:
schema = DatabaseSchema.from_json(schema_file)
logger.info(f"Using static schema file: {schema_file}")
sql_generator = SQLQueryGenerator(schema, self.statistical_agent)
# Use Two-Pass SQL Workflow for robust SQL generation
from two_pass_sql_workflow import TwoPassSqlWorkflow
# Get schema discovery instance - use dynamic schema if available
schema_discovery_instance = None
if self.discovered_schema:
# Create a mock schema discovery that returns our discovered schema
class MockSchemaDiscovery:
def __init__(self, discovered_schema):
self.discovered_schema = discovered_schema
def discover_schema(self):
return self.discovered_schema
schema_discovery_instance = MockSchemaDiscovery(self.discovered_schema)
two_pass_workflow = TwoPassSqlWorkflow(
schema_discovery=schema_discovery_instance,
data_processor=self.data_processor,
statistical_agent=self.statistical_agent
)
# Enhance user request with preferences
enhanced_request = analysis_request.data_description
if target_tables:
enhanced_request += f" (Focus on these tables: {', '.join(target_tables)})"
if specific_columns:
enhanced_request += f" (Include these columns: {', '.join(specific_columns)})"
if specific_relationships:
# Parse relationship strings and add to request
relationship_descriptions = []
for rel_str in specific_relationships:
# Format: "FromTable.FromColumn->ToTable.ToColumn"
if '->' in rel_str:
from_part, to_part = rel_str.split('->', 1)
if '.' in from_part and '.' in to_part:
from_table, from_col = from_part.split('.', 1)
to_table, to_col = to_part.split('.', 1)
relationship_descriptions.append(f"{from_table}.{from_col} ā {to_table}.{to_col}")
if relationship_descriptions:
enhanced_request += f" (Use these specific relationships for JOINs: {', '.join(relationship_descriptions)})"
# Run the two-pass workflow
workflow_results = two_pass_workflow.generate_sql_with_iterations(
user_request=enhanced_request,
max_rows=max_rows
)
# Extract results from regular workflow
raw_iterations = workflow_results.get('iterations', [])
final_dataset = workflow_results.get('dataframe', pd.DataFrame())
# Convert workflow results to enhanced workflow format
iterations = []
# Process iterations based on workflow type
if skip_regular_workflow:
# Enhancement workflow - iterations are already processed
iterations = workflow_results.get('iterations', [])
if not final_dataset.empty:
logger.info(f"Enhancement workflow completed with {len(final_dataset)} rows")
else:
# Regular two-pass workflow - convert iteration format
for iteration_result in raw_iterations:
# Get data sample for this iteration
iteration_data = pd.DataFrame()
if iteration_result.execution_success and workflow_results.get('final_data') is not None:
iteration_data = workflow_results['final_data'].head(100)
# Assess data quality with user preferences
data_assessment = {"execution_success": iteration_result.execution_success}
if iteration_result.execution_success and not iteration_data.empty:
quality_assessment = self._assess_data_quality_with_threshold(
iteration_data, quality_threshold, include_incomplete, optimize_for_analysis
)
data_assessment.update(quality_assessment)
data_assessment["message"] = f"Query executed successfully - Quality: {quality_assessment['quality_score']}%"
else:
data_assessment.update({
"message": f"Query failed: {iteration_result.error_message}",
"quality_score": 0,
"meets_threshold": False,
"quality_issues": [iteration_result.error_message or "Query execution failed"]
})
# Convert two-pass iteration to enhanced workflow iteration
query_iteration = QueryIteration(
iteration_number=iteration_result.iteration_number,
sql_query=iteration_result.sql_generation.sql_query,
query_explanation=iteration_result.sql_generation.explanation,
data_sample=iteration_data,
data_assessment=data_assessment,
suggested_improvements=data_assessment.get('recommendations', []),
is_satisfactory=data_assessment.get('meets_threshold', False) and iteration_result.execution_success,
improvement_reason=iteration_result.error_message if not iteration_result.execution_success else None
)
iterations.append(query_iteration)
if iteration_result.execution_success:
final_dataset = workflow_results.get('final_data')
break
# If two-pass workflow didn't succeed, try fallback
if not workflow_results['final_success'] and (final_dataset is None or final_dataset.empty):
logger.info("Two-pass workflow failed, attempting fallback simple query...")
fallback_result = self._try_fallback_query(connection_config, max_rows)
if not fallback_result['dataframe'].empty:
final_dataset = fallback_result['dataframe']
logger.info(f"Fallback query successful: {len(final_dataset)} rows retrieved")
# Add fallback iteration
fallback_iteration = QueryIteration(
iteration_number=len(iterations) + 1,
sql_query=fallback_result.get('sql_query', 'Fallback query'),
query_explanation="Fallback query when two-pass workflow failed",
data_sample=final_dataset.head(100),
data_assessment={
"execution_success": True,
"message": "Fallback data retrieved",
"data_quality_score": 50,
"quality_issues": ["Using fallback query due to two-pass workflow failure"]
},
suggested_improvements=["Use more specific query when two-pass workflow is fixed"],
is_satisfactory=True,
improvement_reason=None
)
iterations.append(fallback_iteration)
# Step 4: Prepare analysis configuration
analysis_config = self._prepare_analysis_configuration(
final_dataset, analysis_request, iterations[-1]
)
# Step 5: Generate comprehensive metadata
# Perform final quality assessment
final_quality_assessment = self._assess_data_quality_with_threshold(
final_dataset, quality_threshold, include_incomplete, optimize_for_analysis
)
metadata = self._generate_workflow_metadata(
analysis_request, iterations, final_dataset, analysis_config, final_quality_assessment
)
# Ensure we have a valid dataframe
if final_dataset is None:
final_dataset = pd.DataFrame()
logger.warning("No valid dataset obtained, returning empty DataFrame")
return {
'success': True,
'dataframe': final_dataset,
'metadata': metadata,
'iterations': [iter.to_dict() for iter in iterations],
'analysis_config': analysis_config,
'final_query': iterations[-1].sql_query if iterations else '',
'query_explanation': iterations[-1].query_explanation if iterations else '',
'analysis_request': analysis_request.to_dict()
}
except Exception as e:
logger.error(f"Enhanced SQL workflow failed: {str(e)}")
return {
'success': False,
'error': str(e),
'dataframe': pd.DataFrame(), # Always return empty DataFrame on error
'metadata': {},
'iterations': []
}
def _analyze_data_structure(self, data_sample: pd.DataFrame, analysis_request: AnalysisRequest) -> Dict[str, Any]:
"""Analyze what's wrong with the current data structure and what improvements are needed"""
improvements = []
if data_sample.empty:
improvements.extend([
"Query returned no data - check filters and table joins",
"Verify table names and column names exist in schema",
"Use LEFT JOINs instead of INNER JOINs to avoid data loss"
])
return {'required_improvements': improvements}
# Analyze for specific analysis types
if analysis_request.expected_result_type == 'descriptive_statistics':
# Check if data is raw (individual records) vs aggregated (summaries)
has_aggregated_columns = any(keyword in col.lower() for col in data_sample.columns
for keyword in ['count', 'sum', 'avg', 'total', 'percentage'])
if not has_aggregated_columns and len(data_sample) > 500:
improvements.extend([
"Data contains individual records instead of statistical summaries",
"Add GROUP BY clauses with COUNT(*), AVG(), SUM() functions",
"Group by categorical variables (AnalysisType, Department, etc.)",
"Calculate percentages and distributions rather than raw data"
])
# Check for categorical grouping columns
categorical_cols = data_sample.select_dtypes(include=['object', 'category']).columns
if len(categorical_cols) == 0:
improvements.append("Add categorical columns for grouping (AnalysisType, AnalysisGroup)")
elif 'distribution' in analysis_request.analysis_description.lower():
# Distribution analysis needs frequency data
if 'count' not in ' '.join(data_sample.columns).lower():
improvements.extend([
"Distribution analysis needs COUNT(*) GROUP BY for frequencies",
"Include percentage calculations: COUNT(*) * 100.0 / SUM(COUNT(*)) OVER()",
"Group by categories to show distribution patterns"
])
# Check for numerical columns if numerical analysis is requested
if 'numerical' in analysis_request.analysis_description.lower():
numeric_cols = data_sample.select_dtypes(include=[np.number]).columns
if len(numeric_cols) < 2:
improvements.append("Include more numerical columns for statistical analysis")
# Check for time-based analysis
if any(keyword in analysis_request.analysis_description.lower() for keyword in ['time', 'temporal', 'trend']):
date_cols = [col for col in data_sample.columns if 'date' in col.lower() or 'time' in col.lower()]
if not date_cols:
improvements.append("Add date/time columns for temporal analysis")
return {'required_improvements': improvements}
def _analyze_iteration_progression(self, iterations: List[QueryIteration]) -> str:
"""Analyze the progression pattern across iterations"""
if len(iterations) < 2:
return "Not enough iterations to analyze progression"
scores = [iter.data_assessment.get('data_quality_score', 0) for iter in iterations]
row_counts = [len(iter.data_sample) for iter in iterations]
# Analyze score progression
if scores[-1] > scores[0]:
score_trend = "improving"
elif scores[-1] < scores[0]:
score_trend = "declining"
else:
score_trend = "stable"
# Analyze data volume progression
if row_counts[-1] < row_counts[0] * 0.5:
volume_trend = "drastically reducing data volume"
elif row_counts[-1] < row_counts[0]:
volume_trend = "reducing data volume"
else:
volume_trend = "maintaining data volume"
return f"Quality scores {score_trend} ({scores[0]}ā{scores[-1]}), {volume_trend} ({row_counts[0]}ā{row_counts[-1]} rows)"
def _generate_target_data_examples(self, analysis_request: AnalysisRequest) -> str:
"""Generate concrete examples of what the target data structure should look like"""
if analysis_request.expected_result_type == 'descriptive_statistics':
return """
TARGET DATA STRUCTURE FOR DESCRIPTIVE STATISTICS:
Instead of individual records like:
SampleID | AnalysisType | Result | Date
12345 | Serologie | 1 | 2023-01-01
12346 | Serologie | 0 | 2023-01-01
The query should return aggregated statistics like:
AnalysisType | TotalSamples | PositiveCount | PositivePercentage | AvgValue
Serologie | 1500 | 450 | 30.0 | 0.3
Bacterio | 800 | 240 | 30.0 | 0.3
Use queries like:
SELECT AnalysisType,
COUNT(*) as TotalSamples,
SUM(CASE WHEN Result = 1 THEN 1 ELSE 0 END) as PositiveCount,
(SUM(CASE WHEN Result = 1 THEN 1 ELSE 0 END) * 100.0 / COUNT(*)) as PositivePercentage
FROM tblResults
GROUP BY AnalysisType
"""
elif 'numerical' in analysis_request.analysis_description.lower():
return """
TARGET DATA STRUCTURE FOR NUMERICAL ANALYSIS:
Should include multiple numerical columns for statistical analysis:
AnalysisType | Temperature | pH | Concentration | Count
Serologie | 37.5 | 7.2 | 15.6 | 100
Bacterio | 36.8 | 6.9 | 12.3 | 85
Focus on numerical result columns from tblSerologieResult, tblBactResult tables.
"""
elif 'distribution' in analysis_request.analysis_description.lower():
return """
TARGET DATA STRUCTURE FOR DISTRIBUTION ANALYSIS:
Should show frequency distributions, not individual records:
Category | Count | Percentage
Positive | 450 | 45.0
Negative | 550 | 55.0
Or time-based distributions:
Year_Month | AnalysisType | Count | Percentage
2023-01 | Serologie | 120 | 35.2
2023-02 | Serologie | 98 | 28.7
"""
else:
return """
TARGET DATA STRUCTURE:
Ensure the query returns data suitable for the requested analysis type.
Include relevant numerical and categorical columns.
Apply appropriate aggregations if summary statistics are needed.
"""
def _parse_combined_request(self, combined_request: str, max_rows: Optional[int] = 10000, model: str = 'gpt-4o') -> AnalysisRequest:
"""Parse combined data and analysis request using LLM"""
prompt = f"""
Parse this combined data and analysis request into structured components. PRESERVE USER INTENT AND SPECIFIC INSTRUCTIONS.
USER REQUEST: "{combined_request}"
Please extract while maintaining all user-specified requirements:
1. Data Description: What data they want to retrieve (preserve specific table/field mentions)
2. Analysis Description: What analysis they want to perform (keep exact user words for specific analysis types)
3. Expected Result Type: Choose the most appropriate from: descriptive_statistics, distribution_analysis, trend_analysis, comparison, correlation, prediction, summary
4. Specific Requirements: Any specific columns, filters, visualizations, or constraints mentioned by user
ANALYSIS TYPE MAPPING (preserve user's specific requests):
- "descriptive statistics" ā "descriptive_statistics"
- "distribution" ā "distribution_analysis"
- "over time" or "temporal" ā "trend_analysis"
- "compare" or "comparison" ā "comparison"
- "single plot" or "one plot" ā preserve this specific requirement
- "table showing" ā preserve table output requirement
CRITICAL: If user asks for "single plot" or "one plot", include this in analysis_description verbatim.
Return as JSON:
{{
"data_description": "description of what data is needed",
"analysis_description": "EXACT analysis requested by user with all specific requirements",
"expected_result_type": "type of analysis expected",
"specific_requirements": ["list", "of", "all", "user", "requirements", "including", "visualization", "specs"],
"max_rows": {max_rows}
}}
"""
try:
response = self.statistical_agent.query_llm(prompt, model=model)
# Extract JSON from response
import re
json_match = re.search(r'\{.*\}', response, re.DOTALL)
if json_match:
parsed = json.loads(json_match.group())
return AnalysisRequest(
data_description=parsed.get('data_description', ''),
analysis_description=parsed.get('analysis_description', ''),
max_rows=parsed.get('max_rows', max_rows),
expected_result_type=parsed.get('expected_result_type', 'summary'),
specific_columns=parsed.get('specific_requirements', [])
)
except Exception as e:
logger.warning(f"Failed to parse request with LLM: {e}")
# Fallback: create basic request
return AnalysisRequest(
data_description=combined_request,
analysis_description=combined_request,
max_rows=max_rows
)
def _generate_contextual_sql(self,
sql_generator: SQLQueryGenerator,
analysis_request: AnalysisRequest,
previous_iterations: List[QueryIteration],
model: str = 'gpt-4o') -> Dict[str, str]:
"""Generate SQL query with context from previous iterations - IMPROVED LEARNING LOGIC"""
# Build comprehensive context from previous iterations
context = ""
specific_requirements = []
column_error_schema_info = ""
if previous_iterations:
# Analyze the most recent iteration in detail
last_iteration = previous_iterations[-1]
# Check if there were column errors in the last iteration
if hasattr(last_iteration, 'column_errors'):
logger.info(f"Processing column errors from iteration {last_iteration.iteration_number}: {last_iteration.column_errors}")
column_error_schema_info = self._generate_targeted_schema_info(
sql_generator.schema, last_iteration.column_errors
)
logger.info(f"Generated targeted schema info: {column_error_schema_info[:200]}...")
else:
logger.info(f"No column errors found in iteration {last_iteration.iteration_number}")
# Get actual data sample structure for analysis
data_sample = last_iteration.data_sample
data_analysis = self._analyze_data_structure(data_sample, analysis_request)
context = f"\n\nPREVIOUS ITERATION ANALYSIS:\n"
context += f"=== ITERATION {last_iteration.iteration_number} RESULTS ===\n"
context += f"SQL Query Generated:\n{last_iteration.sql_query}\n\n"
# Add column error specific feedback if present
if column_error_schema_info:
context += column_error_schema_info + "\n\n"
context += f"Data Retrieved: {len(data_sample)} rows, {len(data_sample.columns) if not data_sample.empty else 0} columns\n"
if not data_sample.empty:
context += f"Column Structure:\n"
for col in data_sample.columns:
sample_values = data_sample[col].dropna().head(3).tolist()
context += f" - {col}: {data_sample[col].dtype} (examples: {sample_values})\n"
context += f"\nData Quality Issues Found:\n"
for issue in last_iteration.data_assessment.get('quality_issues', []):
context += f" ā {issue}\n"
context += f"\nWhy This Data Was Insufficient:\n"
context += f" - Quality Score: {last_iteration.data_assessment.get('data_quality_score', 0)}/100\n"
context += f" - Reason: {last_iteration.improvement_reason}\n"
# Build specific requirements based on actual data analysis
specific_requirements = [
"CRITICAL: Learn from previous iteration and improve the query:",
f"- Previous query returned {len(data_sample)} rows with {len(data_sample.columns) if not data_sample.empty else 0} columns",
f"- Previous issues: {', '.join(last_iteration.data_assessment.get('quality_issues', [])[:3])}"
]
# Add specific improvements needed based on data analysis
specific_requirements.extend(data_analysis['required_improvements'])
# Show what all previous iterations tried (learning from history)
if len(previous_iterations) > 1:
context += f"\n=== LEARNING FROM ALL {len(previous_iterations)} ITERATIONS ===\n"
for i, iter in enumerate(previous_iterations, 1):
context += f"Iteration {i}: {len(iter.data_sample)} rows ā {iter.data_assessment.get('data_quality_score', 0)}/100 score\n"
context += f"\nProgression shows: {self._analyze_iteration_progression(previous_iterations)}\n"
# Add analysis-type specific requirements
if 'descriptive' in analysis_request.analysis_description.lower():
specific_requirements.extend([
"DESCRIPTIVE STATISTICS REQUIREMENTS:",
"- Use GROUP BY with categorical columns (AnalysisType, AnalysisGroup)",
"- Include COUNT(*) for frequency distribution",
"- Add percentage calculations for relative frequencies",
"- Use aggregation functions: COUNT, AVG, SUM, MIN, MAX",
"- Return summary statistics, NOT individual records"
])
elif 'distribution' in analysis_request.analysis_description.lower():
specific_requirements.extend([
"DISTRIBUTION ANALYSIS REQUIREMENTS:",
"- Focus on frequency counts and percentages",
"- Group by categories to show distribution patterns",
"- Include time-based grouping for temporal distribution",
"- Return aggregated counts, NOT raw data rows"
])
requirements_text = "\n".join(specific_requirements) if specific_requirements else ""
# Add concrete target examples
target_examples = self._generate_target_data_examples(analysis_request)
# Put column error corrections at the very top for maximum visibility
error_corrections = ""
if column_error_schema_info:
error_corrections = f"""
šØšØšØ CRITICAL SQL ERROR CORRECTION REQUIRED! šØšØšØ
THE PREVIOUS QUERY FAILED DUE TO INVALID COLUMN NAMES!
{column_error_schema_info}
š„ MANDATORY CORRECTIONS:
- DO NOT USE any column names that caused errors in the previous query
- REPLACE invalid column names with the suggested alternatives provided above
- VERIFY all column names exist in the schema before using them
- If unsure about a column name, omit it entirely rather than guess
ā FORBIDDEN: DO NOT repeat the same column errors from the previous query!
ā
REQUIRED: Use only the available columns listed above!
"""
enhanced_prompt = f"""
{error_corrections}Generate an optimized SQL query for this combined data and analysis request:
DATA NEEDED: {analysis_request.data_description}
ANALYSIS TO PERFORM: {analysis_request.analysis_description}
EXPECTED RESULT TYPE: {analysis_request.expected_result_type}
MAX ROWS: {analysis_request.max_rows}
{context}
{requirements_text}
{target_examples}
CRITICAL ITERATION INSTRUCTIONS:
1. If this is iteration 2+, you MUST learn from the previous query and data
2. Don't simplify - IMPROVE and address the specific issues identified
3. The previous query structure shows you what approach was tried - build on it
4. Look at the actual data columns returned - include better/more columns
5. Address the specific quality issues found in the previous data
GENERAL REQUIREMENTS:
1. The data should be suitable for {analysis_request.expected_result_type} analysis
2. Include relevant columns for the intended analysis
3. Apply appropriate filters and joins (use LEFT JOIN for Companies table)
4. Ensure data quality (avoid excessive nulls, ensure proper data types)
5. Generate a BETTER query than the previous iteration, not a simpler one
Generate a SQL query that will provide optimal data for the requested analysis.
"""
# Use the enhanced SQL prompt with numerical result guidance
enhanced_user_query = self._generate_enhanced_sql_prompt(
combined_request=analysis_request.data_description,
improvements=[req for req in specific_requirements if req and not req.endswith(':')]
)
# Use the SQL generator with enhanced context
sql_query, metadata = sql_generator.generate_sql_query(enhanced_user_query, analysis_request.max_rows, model)
# Log the generated SQL query for debugging
logger.info(f"Generated SQL Query: {sql_query}")
return {
'sql_query': sql_query,
'explanation': metadata.get('explanation', 'Generated SQL query for analysis'),
'metadata': metadata
}
def _execute_and_sample_query(self, sql_query: str, connection_config: str) -> Dict[str, Any]:
"""Execute SQL query and return full dataset plus sample"""
try:
# Load connection configuration using ConnectionConfig class
from sql_query_generator import ConnectionConfig
config_path = Path(connection_config)
if config_path.exists():
conn_config = ConnectionConfig.from_config_file(str(config_path))
else:
# Use default configuration
from sql_query_generator import get_default_connection_config
conn_config = get_default_connection_config()
# Execute query using data processor
from models import DataSource, DataSourceType
data_source = DataSource(
source_type=DataSourceType.SQL_QUERY,
sql_connection=conn_config.to_connection_string(),
sql_query=sql_query
)
df, metadata = self.data_processor.load_data(data_source)
# Create a representative sample (first 100 rows)
sample_size = min(100, len(df))
sample_df = df.head(sample_size) if len(df) > 0 else df
return {
'dataframe': df,
'sample': sample_df,
'metadata': metadata,
'row_count': len(df),
'column_count': len(df.columns) if not df.empty else 0
}
except Exception as e:
logger.error(f"Failed to execute SQL query: {str(e)}")
return {
'dataframe': pd.DataFrame(),
'sample': pd.DataFrame(),
'metadata': {'error': str(e)},
'row_count': 0,
'column_count': 0
}
def _extract_column_error_info(self, error_msg: str, sql_query: str) -> Dict[str, Any]:
"""Extract column error information to provide targeted schema feedback"""
import re
column_errors = []
# Extract invalid column name from error message
# Example: "Invalid column name 'Code'"
column_match = re.search(r"Invalid column name '([^']+)'", error_msg)
if column_match:
invalid_column = column_match.group(1)
# Try to find which table this column was intended for by analyzing the SQL
table_aliases = self._extract_table_aliases(sql_query)
# Find the table context for this column
for alias, table_name in table_aliases.items():
if f"{alias}.{invalid_column}" in sql_query or f"{table_name}.{invalid_column}" in sql_query:
column_errors.append({
'invalid_column': invalid_column,
'table_name': table_name,
'alias': alias,
'context': f"{alias}.{invalid_column} in query"
})
break
else:
# Column mentioned without explicit table reference
column_errors.append({
'invalid_column': invalid_column,
'table_name': 'unknown',
'alias': '',
'context': f"Column '{invalid_column}' used without table prefix"
})
return {
'error_message': error_msg,
'column_errors': column_errors,
'sql_query': sql_query
}
def _extract_table_aliases(self, sql_query: str) -> Dict[str, str]:
"""Extract table aliases from SQL query"""
import re
aliases = {}
# Pattern to match table aliases: "FROM TableName alias" or "JOIN TableName alias"
patterns = [
r'FROM\s+(\w+)\s+(\w+)',
r'JOIN\s+(\w+)\s+(\w+)',
r'INNER\s+JOIN\s+(\w+)\s+(\w+)',
r'LEFT\s+JOIN\s+(\w+)\s+(\w+)',
r'RIGHT\s+JOIN\s+(\w+)\s+(\w+)'
]
for pattern in patterns:
matches = re.findall(pattern, sql_query, re.IGNORECASE)
for table_name, alias in matches:
aliases[alias] = table_name
return aliases
def _generate_targeted_schema_info(self, schema, column_errors: Dict[str, Any]) -> str:
"""Generate targeted schema information for columns that caused errors"""
schema_info = ""
for error_info in column_errors.get('column_errors', []):
invalid_column = error_info['invalid_column']
table_name = error_info['table_name']
alias = error_info.get('alias', '')
# Get actual columns for this table
if table_name != 'unknown':
available_columns = schema.get_available_columns(table_name)
if available_columns:
# Check if the "invalid" column actually exists in our schema
if invalid_column in available_columns:
# This is a schema mismatch - database error says column doesn't exist but schema says it does
schema_info += f"\nļæ½ DATABASE CONNECTION ISSUE: Column '{invalid_column}' exists in schema but database reports it doesn't exist!\n"
schema_info += f"š” POSSIBLE CAUSES:\n"
schema_info += f" - Database connection pointing to wrong database instance\n"
schema_info += f" - Outdated schema file (but this seems unlikely)\n"
schema_info += f" - Permission issues with database user\n"
schema_info += f" - Database structure has changed since schema was generated\n"
schema_info += f"\nš§ IMMEDIATE ACTION: Try using a completely different column or table approach.\n"
schema_info += f" Avoid using table '{table_name}' entirely if possible.\n"
# Show alternative tables that might work
schema_info += f"\nā
Consider using these alternative tables for analysis:\n"
# Get some core tables that are likely to exist
core_tables = ['Results', 'Requests', 'Samples']
for core_table in core_tables:
if core_table != table_name:
core_cols = schema.get_available_columns(core_table)
if core_cols:
schema_info += f" - {core_table}: {', '.join(core_cols[:5])}\n"
else:
# This is a genuine column error
schema_info += f"\nš„ COLUMN ERROR: '{invalid_column}' does not exist in table '{table_name}'\n"
# Try to suggest similar column names
suggestions = self._find_similar_columns(invalid_column, available_columns)
if suggestions:
schema_info += f"šÆ IMMEDIATE REPLACEMENT for '{alias}.{invalid_column}' ā Use '{alias}.{suggestions[0]}'\n"
if len(suggestions) > 1:
schema_info += f" Other options: {', '.join([f'{alias}.{s}' for s in suggestions[1:3]])}\n"
schema_info += f"\nā
ALL AVAILABLE COLUMNS in table '{table_name}' (alias '{alias}'):\n"
# Show all columns for the problematic table
for i, col in enumerate(available_columns[:15]): # Limit to first 15 columns
schema_info += f" {alias}.{col}\n"
if len(available_columns) > 15:
schema_info += f" ... and {len(available_columns) - 15} more columns\n"
else:
schema_info += f"ā ļø Could not retrieve column information for table '{table_name}'\n"
else:
schema_info += f"ā ļø Could not determine the table for column '{invalid_column}'\n"
return schema_info
def _find_similar_columns(self, target_column: str, available_columns: List[str]) -> List[str]:
"""Find columns with similar names using difflib similarity matching"""
import difflib
# Use difflib to find similar column names with a reasonable cutoff
suggestions = difflib.get_close_matches(
target_column.lower(),
[col.lower() for col in available_columns],
n=5,
cutoff=0.3
)
# Map back to original case
result = []
for suggestion in suggestions:
for col in available_columns:
if col.lower() == suggestion:
result.append(col)
break
# Also check for partial matches (contains)
for col in available_columns:
if (target_column.lower() in col.lower() or col.lower() in target_column.lower()) and col not in result:
result.append(col)
if len(result) >= 5: # Limit total suggestions
break
return result
def _try_fallback_query(self, connection_config: str, max_rows: Optional[int]) -> Dict[str, Any]:
"""Try a very simple fallback query when complex queries return no data"""
# Simple query that should almost always return some data
fallback_query = f"""
SELECT TOP {max_rows}
r.Id AS RequestID,
r.DateCreated AS RequestDate,
s.SampleNr AS SampleNumber,
s.DateSampling AS SamplingDate
FROM Requests r
LEFT JOIN Samples s ON s.Sample_Request = r.Id
WHERE r.DateCreated >= DATEADD(MONTH, -12, GETDATE())
ORDER BY r.DateCreated DESC
"""
logger.info(f"Fallback query: {fallback_query}")
return self._execute_and_sample_query(fallback_query, connection_config)
def _detect_analysis_type(self, request_text: str) -> str:
"""Detect the type of analysis requested based on keywords and patterns"""
analysis_types = {
'descriptive_statistics': {
'keywords': ['descriptive', 'statistics', 'distribution', 'frequency', 'count', 'percentage', 'summary', 'overview'],
'required_elements': ['COUNT', 'frequency', 'percentage', 'GROUP BY'],
'data_requirements': {'min_rows': 100, 'requires_grouping': True}
},
'time_series': {
'keywords': ['time', 'temporal', 'trend', 'over time', 'period', 'monthly', 'yearly', 'historical'],
'required_elements': ['date', 'time', 'ORDER BY'],
'data_requirements': {'min_rows': 50, 'requires_time_column': True}
},
'correlation_analysis': {
'keywords': ['correlation', 'relationship', 'association', 'compare', 'versus'],
'required_elements': ['numeric columns', 'correlation'],
'data_requirements': {'min_rows': 30, 'requires_numeric': True}
},
'comparison_analysis': {
'keywords': ['compare', 'comparison', 'versus', 'vs', 'between', 'difference'],
'required_elements': ['GROUP BY', 'comparison groups'],
'data_requirements': {'min_rows': 50, 'requires_grouping': True}
}
}
request_lower = request_text.lower()
best_match = 'general'
max_score = 0
for analysis_type, config in analysis_types.items():
score = sum(1 for keyword in config['keywords'] if keyword in request_lower)
if score > max_score:
max_score = score
best_match = analysis_type
return best_match
def _assess_data_quality_with_threshold(self, dataframe: pd.DataFrame, quality_threshold: int = 70,
include_incomplete: bool = True, optimize_for_analysis: bool = True) -> Dict[str, Any]:
"""
Assess data quality using user-specified threshold and preferences
Args:
dataframe: The data to assess
quality_threshold: Minimum quality score required (60-90)
include_incomplete: Whether to include records with missing data
optimize_for_analysis: Whether to optimize data structure for analysis
Returns:
Dict with quality assessment and recommendations
"""
if dataframe is None or dataframe.empty:
return {
'quality_score': 0,
'meets_threshold': False,
'issues': ['No data returned'],
'recommendations': ['Modify query to return data'],
'action_required': 'regenerate_query'
}
# Calculate quality metrics
total_rows = len(dataframe)
total_cols = len(dataframe.columns)
# Completeness metrics
missing_cells = dataframe.isnull().sum().sum()
total_cells = total_rows * total_cols
completeness_ratio = (total_cells - missing_cells) / total_cells if total_cells > 0 else 0
# Calculate base quality score
quality_score = 0
# Data volume score (0-30 points)
if total_rows >= 1000:
quality_score += 30
elif total_rows >= 100:
quality_score += 20
elif total_rows >= 10:
quality_score += 10
# Completeness score (0-40 points)
quality_score += int(completeness_ratio * 40)
# Column diversity score (0-20 points)
if total_cols >= 10:
quality_score += 20
elif total_cols >= 5:
quality_score += 15
elif total_cols >= 3:
quality_score += 10
# Data type diversity score (0-10 points)
numeric_cols = dataframe.select_dtypes(include=[np.number]).columns.tolist()
date_cols = dataframe.select_dtypes(include=['datetime', 'datetimetz']).columns.tolist()
if len(numeric_cols) > 0 and len(date_cols) > 0:
quality_score += 10
elif len(numeric_cols) > 0 or len(date_cols) > 0:
quality_score += 5
# Apply user preferences
issues = []
recommendations = []
# Handle incomplete data based on user preference
if not include_incomplete and completeness_ratio < 0.9:
issues.append(f'High missing data rate: {(1-completeness_ratio)*100:.1f}%')
recommendations.append('Filter out incomplete records')
quality_score -= 10 # Penalty for not accepting incomplete data
# Check against user threshold
meets_threshold = quality_score >= quality_threshold
if not meets_threshold:
if quality_threshold >= 90:
recommendations.append('Very strict threshold - consider relaxing to 80%')
elif quality_threshold >= 80:
recommendations.append('Strict threshold - may need query refinement')
if total_rows < 100:
recommendations.append('Increase data volume by expanding date range or filters')
if completeness_ratio < 0.7:
recommendations.append('Improve data completeness by selecting different columns')
# Optimization recommendations
if optimize_for_analysis:
if len(numeric_cols) == 0:
recommendations.append('Consider including numeric columns for analysis')
if len(date_cols) == 0 and 'time' in str(dataframe.columns).lower():
recommendations.append('Consider including date/time columns for temporal analysis')
# Determine action required
action_required = 'none'
if quality_score < quality_threshold - 20:
action_required = 'regenerate_query'
elif quality_score < quality_threshold:
action_required = 'optimize_query'
return {
'quality_score': min(quality_score, 100), # Cap at 100
'meets_threshold': meets_threshold,
'threshold_used': quality_threshold,
'completeness_ratio': completeness_ratio,
'total_rows': total_rows,
'total_columns': total_cols,
'numeric_columns': len(numeric_cols),
'date_columns': len(date_cols),
'issues': issues,
'recommendations': recommendations,
'action_required': action_required,
'user_preferences': {
'include_incomplete': include_incomplete,
'optimize_for_analysis': optimize_for_analysis
}
}
def _detect_required_data_types(self, combined_request: str) -> Dict[str, bool]:
"""
Detect what types of data are likely needed based on the user request
Returns dict indicating if numerical values, dates, categorical data, etc. are needed
"""
request_text = combined_request.lower()
return {
'numerical_results': any(word in request_text for word in [
'count', 'value', 'measurement', 'result', 'concentration', 'level',
'amount', 'quantity', 'titer', 'dilution', 'positive', 'negative',
'cfu', 'colony', 'bacterial', 'pathogen', 'organism', 'number'
]),
'temporal_data': any(word in request_text for word in [
'date', 'time', 'when', 'period', 'month', 'year', 'day',
'trend', 'over time', 'timeline', 'evolution', 'history'
]),
'categorical_data': any(word in request_text for word in [
'type', 'category', 'group', 'species', 'breed', 'sample type',
'analysis', 'test', 'method', 'classification'
]),
'customer_data': any(word in request_text for word in [
'customer', 'client', 'company', 'veterinarian', 'farm', 'owner'
]),
'sample_data': any(word in request_text for word in [
'sample', 'specimen', 'material', 'flock', 'animal', 'herd'
])
}
def _get_available_schema_fields(self) -> Dict[str, List[str]]:
"""
Extract available fields from database schema for user guidance
Returns categorized field suggestions based on schema analysis
"""
schema_fields = {
'numerical_fields': [
# From Results and specialized result tables
'Results.Id', 'Results.SampleAnalyseGroupId',
# From serology results
'tblSerologieResult.Titer', 'tblSerologieResult.Dilution',
'tblSerologieLaboResults.Value', 'tblSerologieLaboResults.Result',
# From bacteriology results
'tblBactResult.ColonyCount', 'tblBactResult.CFU',
'tblBactResultLine.Quantity', 'tblBactResultLine.Concentration',
# From PCR results
'tblPCRResult.CtValue', 'tblPCRResult.Quantity',
# From hygienogram results
'HygienogramResults.TotalCount', 'HygienogramResults.Score'
],
'date_fields': [
'Requests.DateCreated', 'Samples.DateCreated', 'Results.DateCreated',
'Samples.DateSampling', 'Results.DateTechValidated', 'Results.DateBioValidated',
'tblSerologieResult.DateAnalysis', 'tblBactResult.DateAnalysis'
],
'categorical_fields': [
'Analyses.Name', 'AnalysisGroups.Name', 'SampleTypes.Name',
'Germs.Name', 'PosNegs.Name', 'Companies.Name',
'tblSerologieResult.Result', 'tblBactResult.ResultInterpretation',
'Samples.Identification', 'Breeds.Name'
],
'identification_fields': [
'Requests.RequestNr', 'Samples.SampleNr', 'Samples.Identification',
'Companies.Name', 'Requests.RefCustomer'
]
}
return schema_fields
def _assess_data_quality_enhanced(self, df, request_text: str) -> List[str]:
"""Enhanced data quality assessment based on analysis type and expected data"""
issues = []
analysis_type = self._detect_analysis_type(request_text)
data_requirements = self._detect_required_data_types(request_text)
if df.empty:
issues.append("Query returned no data")
return issues
# General data quality checks
if len(df) < 10:
issues.append("Very low row count - may not be sufficient for analysis")
null_percentage = df.isnull().sum().sum() / (len(df) * len(df.columns)) * 100
if null_percentage > 50:
issues.append("High percentage of null values")
# Check for expected data types based on user request
if data_requirements['numerical_results']:
# Check for actual numerical result values
result_keywords = ['value', 'titer', 'dilution', 'count', 'cfu', 'concentration', 'result', 'quantity']
numerical_result_cols = [col for col in df.columns if any(keyword in col.lower() for keyword in result_keywords)]
if not numerical_result_cols:
issues.append("Missing numerical result values - no columns with actual measurements detected")
# Check if we have only ID/metadata columns but no actual result data
id_like_cols = [col for col in df.columns if any(id_word in col.lower() for id_word in ['id', 'nr', 'number', 'code'])]
if len(id_like_cols) >= len(df.columns) * 0.7:
issues.append("Query returns mostly ID/metadata columns but lacks actual result measurements")
if data_requirements['temporal_data']:
time_cols = [col for col in df.columns if any(time_word in col.lower()
for time_word in ['date', 'time', 'year', 'month', 'day'])]
if not time_cols:
issues.append("Missing time-based columns for temporal analysis")
if data_requirements['categorical_data']:
categorical_cols = df.select_dtypes(include=['object']).columns
if len(categorical_cols) == 0:
issues.append("No categorical columns for grouping analysis")
# Analysis-specific checks
if analysis_type == 'descriptive_statistics':
if not any('count' in col.lower() for col in df.columns):
issues.append("Missing COUNT aggregations for frequency analysis")
if not any('percent' in col.lower() or '%' in str(df.dtypes) for col in df.columns):
issues.append("No percentage calculations for distribution analysis")
elif analysis_type == 'time_series':
if not any('count' in col.lower() for col in df.columns):
issues.append("Missing aggregated measures for time series")
elif analysis_type == 'correlation_analysis':
numeric_cols = df.select_dtypes(include=[np.number]).columns
if len(numeric_cols) < 2:
issues.append("Insufficient numeric columns for correlation analysis")
return issues
def _suggest_specific_improvements(self, issues: List[str], analysis_type: str, df, request_text: str) -> List[str]:
"""Generate specific improvements based on detected issues and analysis type"""
improvements = []
data_requirements = self._detect_required_data_types(request_text)
# Address specific issues
if "Query returned no data" in issues:
improvements.extend([
"Use LEFT JOIN instead of INNER JOIN for optional relationships",
"Expand date range to 6-12 months",
"Remove restrictive WHERE conditions",
"Start with core tables only (Requests, Samples, Results)"
])
if "Missing COUNT aggregations" in issues:
improvements.append("Add COUNT(*) with GROUP BY for frequency distributions")
if "No percentage calculations" in issues:
improvements.append("Include percentage calculations using COUNT(*) * 100.0 / SUM(COUNT(*)) OVER()")
if "Missing time-based columns" in issues:
improvements.append("Add YEAR(), MONTH(), or date grouping for temporal analysis")
if "Insufficient numeric columns" in issues:
improvements.append("Include numeric measures or calculated fields for analysis")
# Smart suggestions based on detected data requirements
if data_requirements['numerical_results']:
improvements.extend([
"JOIN with specialized result tables: tblSerologieResult for serology values, tblBactResult for bacteriology counts",
"Include numerical result fields: Titer, Dilution, ColonyCount, CFU, CtValue, Concentration",
"Add tblSerologieLaboResults.Value and tblSerologieLaboResults.Result for actual numerical measurements",
"Consider tblBactResultLine for detailed bacterial analysis results with quantities"
])
if data_requirements['temporal_data']:
improvements.extend([
"Include DateSampling, DateTechValidated, DateBioValidated for temporal analysis",
"Add YEAR(DateCreated), MONTH(DateCreated) for time-based grouping"
])
if data_requirements['categorical_data']:
improvements.extend([
"JOIN with Analyses table for analysis type names",
"Include Germs.Name for pathogen identification",
"Add SampleTypes.Name for sample classification"
])
# Analysis-type specific improvements with focus on numerical data
if analysis_type == 'descriptive_statistics':
improvements.extend([
"GROUP BY analysis type and include COUNT of positive/negative results",
"Calculate percentages of different result categories",
"Include statistical measures like AVG, MIN, MAX for numerical results"
])
elif analysis_type == 'time_series':
improvements.extend([
"Group by time periods (monthly/yearly) with SUM/AVG of numerical results",
"Order by date/time for proper chronological sequence",
"Include trend analysis with numerical measurements over time"
])
elif analysis_type == 'correlation_analysis':
improvements.extend([
"Include multiple numeric result columns for correlation matrix",
"Add categorical variables that might influence numerical results"
])
return improvements
def _generate_enhanced_sql_prompt(self, combined_request: str, issues: List[str] = None, improvements: List[str] = None) -> str:
"""
Generate enhanced SQL prompt with specific guidance about numerical result tables
"""
data_requirements = self._detect_required_data_types(combined_request)
base_prompt = f"""Generate SQL query for: {combined_request}
DATABASE CONTEXT (Poulpharm LIMS):
Core workflow: Requests ā Samples ā Results (main result table)
CRITICAL REQUIREMENTS:
1. ONLY use actual table names: Requests, Samples, Results, Analyses, AnalysisGroups, Companies
2. DO NOT create fictional table names like "RecentLabData" or "LabResults"
3. Use the exact proven JOINs: s.Sample_Request = r.Id, res.Result_Sample = s.Id
PROVEN WORKING QUERY STRUCTURE:
"""
if data_requirements['numerical_results']:
base_prompt += """
PROVEN WORKING QUERY PATTERNS for laboratory results:
MAIN LABORATORY RESULTS (contains most actual measurements):
```sql
SELECT TOP 10
r.Id AS RequestID,
s.SampleNr AS SampleNumber,
a.Name_nl AS AnalysisName,
res.Result_Value AS NumericValue, -- Float values
res.Result_Text AS TextResult, -- Text results like "3+", "negative"
res.Result_PosNeg AS PosNegResult, -- Coded results (1=pos, 2=neg, 3=resistant, 5=sensitive)
res.DateCreated AS ResultDate
FROM Requests r
JOIN Samples s ON s.Sample_Request = r.Id
JOIN Results res ON res.Result_Sample = s.Id
JOIN Analyses a ON a.Id = res.Result_Analysis
WHERE res.TechValidated = 1 AND res.BioValidated = 1
AND (res.Result_Value IS NOT NULL OR res.Result_Text IS NOT NULL OR res.Result_PosNeg IS NOT NULL)
ORDER BY res.DateCreated DESC
```
SEROLOGY AGGREGATE RESULTS:
```sql
LEFT JOIN tblSerologieResult sr ON sr.RequestID = r.Id -- Links to REQUEST, not sample
-- Columns: sr.AvgTiter, sr.STDev, sr.POSSamples, sr.NEGSamples
```
BACTERIOLOGY WORKFLOW RESULTS:
```sql
LEFT JOIN tblBactResult br ON br.SampleID = s.Id -- Case sensitive: SampleID
-- Columns: br.BactResultID, br.Bar_DatumInzet
```
CRITICAL CORRECTIONS:
- tblSerologieResult.RequestID = r.Id (NOT SampleId)
- tblBactResult.SampleID = s.Id (case sensitive, NOT SampleId)
- Most numerical results are in Results.Result_Value, Result_Text, Result_PosNeg
"""
if data_requirements['temporal_data']:
base_prompt += """
TEMPORAL COLUMNS:
- Requests.DateCreated, Samples.DateCreated, Results.DateCreated
- Samples.DateSampling (actual sampling date)
- Results.DateTechValidated, Results.DateBioValidated
"""
if data_requirements['categorical_data']:
base_prompt += """
CATEGORICAL REFERENCE TABLES:
- Analyses: JOIN for analysis type names
- Germs: JOIN for pathogen identification
- SampleTypes: JOIN for sample classification
- Companies: JOIN for customer information
"""
# Add specific improvements if provided
if improvements:
base_prompt += f"""
SPECIFIC IMPROVEMENTS NEEDED:
{chr(10).join(f'- {imp}' for imp in improvements)}
"""
# Add query requirements
base_prompt += """
QUERY REQUIREMENTS:
1. Always use LEFT JOIN for optional relationships (especially Companies)
2. Include actual numerical values (Titer, Value, ColonyCount, etc.) when requested
3. Use proper aggregation (COUNT, SUM, AVG) for statistical analysis
4. Include meaningful column aliases for clarity
5. Filter for validated results: TechValidated = 1 AND BioValidated = 1
CRITICAL COLUMN NAMES (use exactly as specified):
- Requests to Samples: s.Sample_Request = r.Id (NOT s.RequestId)
- Samples to SampleAnalyseGroups: sag.SampleAnalyseGroup_Sample = s.Id
- Results to Samples: res.Result_Sample = s.Id (NOT res.SampleAnalyseGroupId)
- Results to Analyses: res.Result_Analysis = a.Id
"""
return base_prompt
def get_field_suggestions(self, user_request: str) -> Dict[str, Any]:
"""
Get field suggestions based on user request and database schema
Returns suggestions for fields the user might want to include
"""
data_requirements = self._detect_required_data_types(user_request)
schema_fields = self._get_available_schema_fields()
suggestions = {
'detected_requirements': data_requirements,
'suggested_fields': {},
'example_queries': []
}
# Suggest fields based on detected requirements
if data_requirements['numerical_results']:
suggestions['suggested_fields']['numerical_values'] = [
'tblSerologieResult.Titer - Serology test titer values',
'tblSerologieLaboResults.Value - Numerical measurement results',
'tblBactResult.ColonyCount - Bacterial colony counts',
'tblBactResultLine.Concentration - Bacterial concentrations',
'tblPCRResult.CtValue - PCR cycle threshold values'
]
if data_requirements['temporal_data']:
suggestions['suggested_fields']['date_fields'] = [
'Samples.DateSampling - When sample was taken',
'Results.DateTechValidated - Technical validation date',
'Results.DateBioValidated - Biological validation date'
]
if data_requirements['categorical_data']:
suggestions['suggested_fields']['categories'] = [
'Analyses.Name - Type of analysis performed',
'Germs.Name - Identified pathogen/organism',
'SampleTypes.Name - Type of sample (feces, blood, etc.)',
'Companies.Name - Customer/farm name'
]
# Add example queries based on requirements
if data_requirements['numerical_results'] and data_requirements['temporal_data']:
suggestions['example_queries'].append(
"Monthly average serology titer values by analysis type"
)
if data_requirements['numerical_results'] and data_requirements['categorical_data']:
suggestions['example_queries'].append(
"Bacterial colony counts grouped by sample type and pathogen"
)
return suggestions
def _is_data_satisfactory_enhanced(self, df, request_text: str) -> bool:
"""Enhanced satisfactory check based on analysis requirements"""
if df.empty:
return False
analysis_type = self._detect_analysis_type(request_text)
issues = self._assess_data_quality_enhanced(df, request_text)
# Critical issues that make data unsatisfactory
critical_issues = [
"Query returned no data",
"Missing COUNT aggregations for frequency analysis",
"Missing time-based columns for temporal analysis",
"Insufficient numeric columns for correlation analysis"
]
has_critical_issues = any(issue in issues for issue in critical_issues)
# More reasonable minimum row requirements based on analysis type
min_rows = {
'descriptive_statistics': 10, # Reduced from 50
'time_series': 15, # Reduced from 30
'correlation_analysis': 10, # Reduced from 20
'comparison_analysis': 15, # Reduced from 40
'general': 5 # Reduced from 10
}.get(analysis_type, 5)
meets_row_requirement = len(df) >= min_rows
# Accept data if it has sufficient rows and columns, even with some minor issues
has_good_structure = len(df) >= min_rows and len(df.columns) >= 3
# Only reject if there are truly critical issues AND poor structure
return (not has_critical_issues and meets_row_requirement) or (has_good_structure and len(issues) <= 5)
def _assess_data_for_analysis(self, df, analysis_request: AnalysisRequest) -> Dict[str, Any]:
"""Enhanced assessment of retrieved data suitability for intended analysis"""
if df.empty:
return {
'is_satisfactory': False,
'improvement_reason': 'Query returned no data - try simpler query with broader date range',
'improvements': [
'Remove complex joins and use LEFT JOIN instead',
'Expand date range to 6 months or 1 year',
'Start with core tables only (Requests, Samples, Results)',
'Remove restrictive WHERE conditions'
],
'data_quality_score': 0,
'zero_rows': True # Special flag for zero row handling
}
# Use enhanced assessment
request_text = f"{analysis_request.data_description} {analysis_request.analysis_description}"
analysis_type = self._detect_analysis_type(request_text)
issues = self._assess_data_quality_enhanced(df, request_text)
is_satisfactory = self._is_data_satisfactory_enhanced(df, request_text)
improvements = self._suggest_specific_improvements(issues, analysis_type, df, request_text)
# Calculate quality score
max_issues = 10 # Maximum expected issues
quality_score = max(0, 100 - (len(issues) * 100 / max_issues))
assessment = {
'row_count': len(df),
'column_count': len(df.columns),
'null_percentage': df.isnull().sum().sum() / (len(df) * len(df.columns)) * 100,
'numeric_columns': len(df.select_dtypes(include=[np.number]).columns),
'categorical_columns': len(df.select_dtypes(include=['object', 'category']).columns),
'unique_values_per_column': {},
'improvements': improvements,
'is_satisfactory': is_satisfactory,
'data_quality_score': quality_score,
'analysis_type': analysis_type,
'quality_issues': issues,
'improvement_reason': f"Analysis type: {analysis_type}. Issues: {', '.join(issues[:3])}" if issues else "Data meets analysis requirements"
}
# Calculate unique values per column
for col in df.columns:
assessment['unique_values_per_column'][col] = df[col].nunique()
# Assess data quality issues
issues = []
score_deductions = 0
# Check for excessive nulls
if assessment['null_percentage'] > 50:
issues.append('More than 50% of data is missing')
assessment['improvements'].append('Use LEFT JOINs instead of INNER JOINs')
assessment['improvements'].append('Check for required vs optional relationships')
score_deductions += 30
# Check for insufficient data
if len(df) < 10:
issues.append('Very small dataset (less than 10 rows)')
assessment['improvements'].append('Expand date ranges or remove restrictive filters')
score_deductions += 20
# Enhanced analysis-specific requirements with more reasonable scoring
# Give bonus points for good datasets before applying deductions
bonus_points = 0
if len(df) >= 1000: # Large dataset bonus
bonus_points += 10
if assessment['column_count'] >= 10: # Rich dataset bonus
bonus_points += 10
if assessment['numeric_columns'] >= 3: # Multiple numeric columns bonus
bonus_points += 5
if analysis_request.expected_result_type in ['comparison', 'correlation']:
if assessment['numeric_columns'] < 2:
issues.append(f'Insufficient numeric columns for {analysis_request.expected_result_type} analysis')
assessment['improvements'].append('Include more numeric columns in SELECT clause')
assessment['improvements'].append('Add COUNT, SUM, AVG aggregations for numerical analysis')
score_deductions += 15 # Reduced from 25
elif analysis_request.expected_result_type == 'trend':
# Look for date/time columns
date_columns = [col for col in df.columns if 'date' in col.lower() or 'time' in col.lower()]
if not date_columns:
issues.append('No date/time columns found for trend analysis')
assessment['improvements'].append('Include date/time columns for temporal analysis')
score_deductions += 20 # Reduced from 30
elif (analysis_request.expected_result_type == 'descriptive_statistics' or
'descriptive' in analysis_request.analysis_description.lower() or
'statistics' in analysis_request.analysis_description.lower()):
# Be more lenient - raw data can be good for descriptive statistics
has_aggregations = any(keyword in col.lower() for col in df.columns
for keyword in ['count', 'sum', 'avg', 'total', 'percentage', 'distribution', 'frequency'])
if not has_aggregations and len(df) < 100: # Only penalize small raw datasets
issues.append('Small raw dataset - consider aggregated metrics for better statistics')
assessment['improvements'].append('Add COUNT(*) GROUP BY for distribution analysis')
assessment['improvements'].append('Include aggregation functions: COUNT, AVG, SUM, MIN, MAX, PERCENTAGE')
assessment['improvements'].append('Group by analysis types/categories for statistical distribution')
score_deductions += 15 # Greatly reduced from 40
elif not has_aggregations:
# Large raw datasets are often valuable for statistical analysis
assessment['improvements'].append('Raw data is good - can compute statistics during analysis')
# Check if we have categorical data for grouping
if assessment['categorical_columns'] == 0 and len(df) < 100:
issues.append('Small dataset needs categorical columns for grouping')
assessment['improvements'].append('Include categorical columns (AnalysisType, AnalysisGroup) for grouping')
score_deductions += 10 # Reduced from 20
# For time-based descriptive statistics, check for temporal grouping
if 'time' in analysis_request.analysis_description.lower() or 'temporal' in analysis_request.analysis_description.lower():
has_time_grouping = any(keyword in col.lower() for col in df.columns
for keyword in ['month', 'year', 'quarter', 'week', 'period'])
if not has_time_grouping:
issues.append('Temporal descriptive statistics need time-based grouping')
assessment['improvements'].append('Add time-based grouping (YEAR, MONTH, QUARTER)')
assessment['improvements'].append('Group counts by time periods for temporal distribution')
score_deductions += 15 # Reduced from 25
elif (analysis_request.expected_result_type == 'distribution_analysis' or
'distribution' in analysis_request.analysis_description.lower()):
# Distribution analysis needs frequency counts and categories
has_frequency_data = any(keyword in col.lower() for col in df.columns
for keyword in ['count', 'frequency', 'total', 'number', 'percentage'])
if not has_frequency_data and len(df) < 100: # Only penalize small datasets
issues.append('Small dataset needs frequency counts for distribution analysis')
assessment['improvements'].append('Use COUNT(*) GROUP BY to show distribution frequencies')
assessment['improvements'].append('Include percentage calculations for relative distribution')
score_deductions += 20 # Reduced from 35
elif not has_frequency_data:
# Large raw datasets can be analyzed for distribution
assessment['improvements'].append('Raw data available - can calculate distributions during analysis')
# Check for constant columns (no variation) - but be more lenient
constant_columns = [col for col in df.columns if df[col].nunique() <= 1]
if constant_columns and len(constant_columns) > len(df.columns) / 2: # Only penalize if majority are constant
issues.append(f'Many constant columns found: {constant_columns[:3]}{"..." if len(constant_columns) > 3 else ""}')
assessment['improvements'].append('Remove constant columns or add more diverse data')
score_deductions += 5 # Reduced from 10
# Calculate final score with bonus points and satisfactory status
raw_score = 100 - score_deductions + bonus_points
assessment['data_quality_score'] = min(100, max(0, raw_score)) # Cap at 100
# Much more reasonable satisfactory logic:
# - Score >= 60: satisfactory (was 85)
# - Score >= 40 with good data size: satisfactory for large datasets
# - Rich datasets (many columns/rows) get benefit of doubt
rich_dataset = len(df) >= 1000 and assessment['column_count'] >= 10
if assessment['data_quality_score'] >= 60:
assessment['is_satisfactory'] = True
elif assessment['data_quality_score'] >= 40 and rich_dataset:
assessment['is_satisfactory'] = True
logger.info(f"Accepting rich dataset with {len(df)} rows and {assessment['column_count']} columns despite lower score")
else:
assessment['is_satisfactory'] = False
assessment['improvement_reason'] = '; '.join(issues) if issues else None
logger.info(f"Data assessment: {assessment['data_quality_score']}/100, Satisfactory: {assessment['is_satisfactory']}")
return assessment
def _refine_analysis_request(self,
original_request: AnalysisRequest,
assessment: Dict[str, Any]) -> AnalysisRequest:
"""Enhanced refinement of analysis request based on detailed assessment"""
refined_request = AnalysisRequest(
data_description=original_request.data_description,
analysis_description=original_request.analysis_description,
max_rows=original_request.max_rows,
specific_columns=original_request.specific_columns,
expected_result_type=original_request.expected_result_type
)
# Get analysis type and improvements
analysis_type = assessment.get('analysis_type', 'general')
improvements = assessment.get('improvements', [])
issues = assessment.get('quality_issues', [])
# Special handling for zero rows - be more aggressive
if assessment.get('zero_rows', False):
refined_request.data_description = f"Basic laboratory data with broader date range (6-12 months) - {original_request.data_description}"
refined_request.analysis_description = f"Simplified analysis focusing on core data - {original_request.analysis_description}"
else:
# Analysis-type specific refinements
if analysis_type == 'descriptive_statistics':
if "Missing COUNT aggregations" in issues:
refined_request.analysis_description += " - Focus on frequency counts and distributions with COUNT(*) and GROUP BY"
if "No percentage calculations" in issues:
refined_request.analysis_description += " - Include percentage calculations for relative distributions"
elif analysis_type == 'time_series':
if "Missing time-based columns" in issues:
refined_request.analysis_description += " - Include temporal grouping by year, month, or date periods"
refined_request.analysis_description += " - Ensure chronological ordering for trend analysis"
elif analysis_type == 'correlation_analysis':
if "Insufficient numeric columns" in issues:
refined_request.analysis_description += " - Include multiple numeric variables for correlation analysis"
# Generic improvements
improvement_mappings = {
'LEFT JOIN': 'using inclusive LEFT JOINs to avoid data loss',
'date range': 'with extended date range for more comprehensive data',
'numeric': 'including additional numeric measures and calculations',
'GROUP BY': 'with proper grouping for aggregated analysis',
'COUNT': 'including frequency counts and aggregations',
'percentage': 'with percentage calculations for relative analysis'
}
for improvement in improvements:
for key, description in improvement_mappings.items():
if key.lower() in improvement.lower():
if description not in refined_request.data_description:
refined_request.data_description += f" {description}"
break
return refined_request
def _prepare_analysis_configuration(self,
df: pd.DataFrame,
analysis_request: AnalysisRequest,
final_iteration: QueryIteration) -> Dict[str, Any]:
"""Prepare analysis configuration based on final dataset"""
if df.empty:
return {'error': 'No data available for analysis configuration'}
numeric_columns = df.select_dtypes(include=[np.number]).columns.tolist()
categorical_columns = df.select_dtypes(include=['object', 'category']).columns.tolist()
date_columns = [col for col in df.columns if 'date' in col.lower() or 'time' in col.lower()]
config = {
'analysis_type': analysis_request.expected_result_type,
'dataset_shape': df.shape,
'available_columns': {
'numeric': numeric_columns,
'categorical': categorical_columns,
'datetime': date_columns,
'all': df.columns.tolist()
},
'suggested_analyses': [],
'recommended_visualizations': [],
'data_quality_notes': final_iteration.data_assessment
}
# Suggest specific analyses based on data structure
if len(numeric_columns) >= 2:
config['suggested_analyses'].extend([
'correlation_analysis',
'descriptive_statistics',
'distribution_analysis'
])
config['recommended_visualizations'].extend([
'correlation_heatmap',
'scatter_plots',
'box_plots'
])
if len(categorical_columns) >= 1 and len(numeric_columns) >= 1:
config['suggested_analyses'].append('group_comparison')
config['recommended_visualizations'].extend([
'grouped_bar_charts',
'violin_plots'
])
if date_columns:
config['suggested_analyses'].append('time_series_analysis')
config['recommended_visualizations'].extend([
'time_series_plots',
'trend_analysis'
])
return config
def _generate_workflow_metadata(self,
analysis_request: AnalysisRequest,
iterations: List[QueryIteration],
final_dataset: pd.DataFrame,
analysis_config: Dict[str, Any],
final_quality_assessment: Dict[str, Any] = None) -> Dict[str, Any]:
"""Generate comprehensive metadata for the workflow"""
# Use final quality assessment if available, otherwise use last iteration
quality_score = 0
if final_quality_assessment:
quality_score = final_quality_assessment['quality_score']
elif iterations:
quality_score = iterations[-1].data_assessment.get('quality_score', 0)
return {
'workflow_type': 'enhanced_sql_workflow',
'timestamp': datetime.now().isoformat(),
'original_request': {
'data_description': analysis_request.data_description,
'analysis_description': analysis_request.analysis_description,
'expected_result_type': analysis_request.expected_result_type
},
'iterations_count': len(iterations),
'final_query': iterations[-1].sql_query if iterations else None,
'query_explanation': iterations[-1].query_explanation if iterations else None,
'final_dataset_shape': final_dataset.shape if not final_dataset.empty else (0, 0),
'final_quality_score': quality_score,
'final_quality_assessment': final_quality_assessment,
'optimization_achieved': len(iterations) < 3, # Achieved optimization if didn't need max iterations
'analysis_readiness': analysis_config,
'iteration_summary': [
{
'iteration': iter.iteration_number,
'satisfactory': iter.is_satisfactory,
'improvements_needed': len(iter.suggested_improvements),
'data_rows': len(iter.data_sample)
} for iter in iterations
]
}
def _convert_discovered_schema_to_database_schema(self, discovered_schema) -> DatabaseSchema:
"""Convert dynamic discovered schema to DatabaseSchema format"""
# Create table categories based on table names and patterns
table_categories = {
'core_tables': [],
'result_tables': [],
'reference_tables': [],
'lookup_tables': []
}
# Complete table list
complete_table_list = [table.name for table in discovered_schema.tables]
# Categorize tables based on common patterns
for table in discovered_schema.tables:
table_name = table.name.lower()
if any(keyword in table_name for keyword in ['request', 'sample', 'result', 'analysis']):
table_categories['core_tables'].append(table.name)
elif any(keyword in table_name for keyword in ['result', 'test', 'measurement']):
table_categories['result_tables'].append(table.name)
elif any(keyword in table_name for keyword in ['company', 'customer', 'user', 'employee']):
table_categories['reference_tables'].append(table.name)
else:
table_categories['lookup_tables'].append(table.name)
# Create key relationships mapping
key_relationships = {}
for rel in discovered_schema.relationships:
# Handle both old and new relationship formats
source_table = rel.get('source_table') or rel.get('from_table')
target_table = rel.get('target_table') or rel.get('to_table')
source_column = rel.get('source_column') or rel.get('from_column')
target_column = rel.get('target_column') or rel.get('to_column')
relationship_type = rel.get('relationship_type', 'foreign_key')
if source_table not in key_relationships:
key_relationships[source_table] = []
key_relationships[source_table].append({
'target_table': target_table,
'relationship_type': relationship_type,
'source_column': source_column,
'target_column': target_column
})
# System architecture from discovered schema
system_architecture = discovered_schema.to_dict()['system_architecture']
# Create DatabaseSchema instance
return DatabaseSchema(
database_name=discovered_schema.database_name,
description=f"Live database schema discovered from {discovered_schema.server_name}",
table_categories=table_categories,
complete_table_list=complete_table_list,
key_relationships=key_relationships,
system_architecture=system_architecture
)
def _get_existing_sql_for_enhancement(self, session_id: str):
"""Get existing SQL query from current session for enhancement"""
try:
# Import here to avoid circular imports
from models import DatabaseManager
from config import Config
config = Config()
db_manager = DatabaseManager(config.DATABASE_URL)
# Get the current session
session = db_manager.get_session(session_id)
if not session or not session.sql_query:
return "", ""
return session.sql_query, f"Current session contains existing SQL query with {len(session.sql_query)} characters"
except Exception as e:
logger.warning(f"Could not retrieve existing SQL: {str(e)}")
return "", ""
def _enhance_existing_sql(self, existing_sql: str, enhancement_request: str, connection_config, max_rows: int = 50000) -> dict:
"""Enhance existing SQL query by adding JOINs and additional data"""
try:
from two_pass_sql_workflow import TwoPassSqlWorkflow
# Create enhancement prompt for the AI
enhancement_prompt = f"""
TASK: Enhance the following existing SQL query by adding the requested additional information.
EXISTING SQL QUERY:
{existing_sql}
ENHANCEMENT REQUEST:
{enhancement_request}
REQUIREMENTS:
1. Build upon the existing SQL query structure
2. Add necessary JOINs to include the requested additional information
3. Preserve ALL existing columns and data from the original query
4. Use LEFT JOINs to ensure no data loss from the original result set
5. Add only the new columns requested in the enhancement
6. Maintain the same filtering and ordering logic from the original query
7. Ensure the enhanced query is syntactically correct and efficient
Please analyze the existing query structure and enhance it with the requested additional information.
"""
# Get schema discovery instance - use dynamic schema if available
schema_discovery_instance = None
if self.discovered_schema:
# Create a mock schema discovery that returns our discovered schema
class MockSchemaDiscovery:
def __init__(self, discovered_schema):
self.discovered_schema = discovered_schema
def discover_schema(self):
return self.discovered_schema
schema_discovery_instance = MockSchemaDiscovery(self.discovered_schema)
# Use the existing data processor which is already properly configured
two_pass_workflow = TwoPassSqlWorkflow(
schema_discovery=schema_discovery_instance,
data_processor=self.data_processor,
statistical_agent=self.statistical_agent
)
# Use the two-pass workflow with enhancement context
workflow_results = two_pass_workflow.generate_sql_with_iterations(
user_request=enhancement_prompt,
max_rows=max_rows
)
return workflow_results
except Exception as e:
logger.error(f"Error enhancing existing SQL: {str(e)}")
return {
'final_success': False,
'error': str(e),
'dataframe': pd.DataFrame(),
'iterations': []
}
def _get_previous_dataset_context(self, session_id: str) -> str:
"""Get previous dataset and SQL context for a session"""
try:
# Import here to avoid circular imports
from models import DatabaseManager
from config import Config
config = Config()
db_manager = DatabaseManager(config.DATABASE_URL)
# Get the current session
session = db_manager.get_session(session_id)
if not session:
return ""
context_parts = []
# Include current session's SQL if available
if session.sql_query:
context_parts.append(f"Current session SQL query:\n{session.sql_query}")
# Look for previous sessions with SQL queries only if no current session context
if not context_parts:
recent_sessions = db_manager.get_recent_sessions(session.user_id, limit=5)
for prev_session in recent_sessions:
if prev_session.session_id != session_id and prev_session.sql_query:
# Get session steps to understand what was done
steps = db_manager.get_session_steps(prev_session.session_id)
successful_steps = [s for s in steps if s.execution_success]
if successful_steps:
context_parts.append(
f"Previous session (ID: {prev_session.session_id[:8]}...):\n"
f"Title: {prev_session.title or 'Untitled'}\n"
f"SQL Query: {prev_session.sql_query}\n"
f"Description: {prev_session.description or 'No description'}\n"
f"Completed steps: {len(successful_steps)}"
)
# Only include the most recent relevant session to avoid overwhelming context
break
if context_parts:
return "\n\n".join(context_parts)
else:
return ""
except Exception as e:
logger.warning(f"Could not retrieve previous dataset context: {str(e)}")
return ""
Parameters
| Name | Type | Default | Kind |
|---|---|---|---|
bases |
- | - |
Parameter Details
bases: Parameter of type
Return Value
Returns unspecified type
Class Interface
Methods
__init__(self, config, statistical_agent, discovered_schema)
Purpose: Internal method: init
Parameters:
config: Type: Configstatistical_agent: Type: StatisticalAgentdiscovered_schema: Parameter
Returns: None
execute_integrated_workflow(self, combined_request, session_id, schema_file, connection_config, max_rows, model, quality_threshold, include_incomplete, optimize_for_analysis, include_dataset_context, target_tables, specific_columns, specific_relationships) -> Dict[str, Any]
Purpose: Execute the enhanced workflow that combines data selection and analysis Args: combined_request: User's combined data and analysis request schema_file: Database schema file connection_config: Database connection configuration max_rows: Maximum number of rows to retrieve model: AI model to use for LLM calls quality_threshold: Minimum quality score (60-90) include_incomplete: Whether to include records with missing data optimize_for_analysis: Whether to optimize data structure for analysis include_dataset_context: Whether to include previous dataset and SQL context target_tables: Specific tables to focus on (optional) specific_columns: Specific columns to include (optional) specific_relationships: Specific relationships to include in JOINs (optional) Returns: Dict containing the final dataset, SQL queries, and analysis preparation
Parameters:
combined_request: Type: strsession_id: Type: strschema_file: Type: strconnection_config: Type: strmax_rows: Type: Optional[int]model: Type: strquality_threshold: Type: intinclude_incomplete: Type: booloptimize_for_analysis: Type: boolinclude_dataset_context: Type: booltarget_tables: Type: List[str]specific_columns: Type: List[str]specific_relationships: Type: List[str]
Returns: Returns Dict[str, Any]
_analyze_data_structure(self, data_sample, analysis_request) -> Dict[str, Any]
Purpose: Analyze what's wrong with the current data structure and what improvements are needed
Parameters:
data_sample: Type: pd.DataFrameanalysis_request: Type: AnalysisRequest
Returns: Returns Dict[str, Any]
_analyze_iteration_progression(self, iterations) -> str
Purpose: Analyze the progression pattern across iterations
Parameters:
iterations: Type: List[QueryIteration]
Returns: Returns str
_generate_target_data_examples(self, analysis_request) -> str
Purpose: Generate concrete examples of what the target data structure should look like
Parameters:
analysis_request: Type: AnalysisRequest
Returns: Returns str
_parse_combined_request(self, combined_request, max_rows, model) -> AnalysisRequest
Purpose: Parse combined data and analysis request using LLM
Parameters:
combined_request: Type: strmax_rows: Type: Optional[int]model: Type: str
Returns: Returns AnalysisRequest
_generate_contextual_sql(self, sql_generator, analysis_request, previous_iterations, model) -> Dict[str, str]
Purpose: Generate SQL query with context from previous iterations - IMPROVED LEARNING LOGIC
Parameters:
sql_generator: Type: SQLQueryGeneratoranalysis_request: Type: AnalysisRequestprevious_iterations: Type: List[QueryIteration]model: Type: str
Returns: Returns Dict[str, str]
_execute_and_sample_query(self, sql_query, connection_config) -> Dict[str, Any]
Purpose: Execute SQL query and return full dataset plus sample
Parameters:
sql_query: Type: strconnection_config: Type: str
Returns: Returns Dict[str, Any]
_extract_column_error_info(self, error_msg, sql_query) -> Dict[str, Any]
Purpose: Extract column error information to provide targeted schema feedback
Parameters:
error_msg: Type: strsql_query: Type: str
Returns: Returns Dict[str, Any]
_extract_table_aliases(self, sql_query) -> Dict[str, str]
Purpose: Extract table aliases from SQL query
Parameters:
sql_query: Type: str
Returns: Returns Dict[str, str]
_generate_targeted_schema_info(self, schema, column_errors) -> str
Purpose: Generate targeted schema information for columns that caused errors
Parameters:
schema: Parametercolumn_errors: Type: Dict[str, Any]
Returns: Returns str
_find_similar_columns(self, target_column, available_columns) -> List[str]
Purpose: Find columns with similar names using difflib similarity matching
Parameters:
target_column: Type: stravailable_columns: Type: List[str]
Returns: Returns List[str]
_try_fallback_query(self, connection_config, max_rows) -> Dict[str, Any]
Purpose: Try a very simple fallback query when complex queries return no data
Parameters:
connection_config: Type: strmax_rows: Type: Optional[int]
Returns: Returns Dict[str, Any]
_detect_analysis_type(self, request_text) -> str
Purpose: Detect the type of analysis requested based on keywords and patterns
Parameters:
request_text: Type: str
Returns: Returns str
_assess_data_quality_with_threshold(self, dataframe, quality_threshold, include_incomplete, optimize_for_analysis) -> Dict[str, Any]
Purpose: Assess data quality using user-specified threshold and preferences Args: dataframe: The data to assess quality_threshold: Minimum quality score required (60-90) include_incomplete: Whether to include records with missing data optimize_for_analysis: Whether to optimize data structure for analysis Returns: Dict with quality assessment and recommendations
Parameters:
dataframe: Type: pd.DataFramequality_threshold: Type: intinclude_incomplete: Type: booloptimize_for_analysis: Type: bool
Returns: Returns Dict[str, Any]
_detect_required_data_types(self, combined_request) -> Dict[str, bool]
Purpose: Detect what types of data are likely needed based on the user request Returns dict indicating if numerical values, dates, categorical data, etc. are needed
Parameters:
combined_request: Type: str
Returns: Returns Dict[str, bool]
_get_available_schema_fields(self) -> Dict[str, List[str]]
Purpose: Extract available fields from database schema for user guidance Returns categorized field suggestions based on schema analysis
Returns: Returns Dict[str, List[str]]
_assess_data_quality_enhanced(self, df, request_text) -> List[str]
Purpose: Enhanced data quality assessment based on analysis type and expected data
Parameters:
df: Parameterrequest_text: Type: str
Returns: Returns List[str]
_suggest_specific_improvements(self, issues, analysis_type, df, request_text) -> List[str]
Purpose: Generate specific improvements based on detected issues and analysis type
Parameters:
issues: Type: List[str]analysis_type: Type: strdf: Parameterrequest_text: Type: str
Returns: Returns List[str]
_generate_enhanced_sql_prompt(self, combined_request, issues, improvements) -> str
Purpose: Generate enhanced SQL prompt with specific guidance about numerical result tables
Parameters:
combined_request: Type: strissues: Type: List[str]improvements: Type: List[str]
Returns: Returns str
get_field_suggestions(self, user_request) -> Dict[str, Any]
Purpose: Get field suggestions based on user request and database schema Returns suggestions for fields the user might want to include
Parameters:
user_request: Type: str
Returns: Returns Dict[str, Any]
_is_data_satisfactory_enhanced(self, df, request_text) -> bool
Purpose: Enhanced satisfactory check based on analysis requirements
Parameters:
df: Parameterrequest_text: Type: str
Returns: Returns bool
_assess_data_for_analysis(self, df, analysis_request) -> Dict[str, Any]
Purpose: Enhanced assessment of retrieved data suitability for intended analysis
Parameters:
df: Parameteranalysis_request: Type: AnalysisRequest
Returns: Returns Dict[str, Any]
_refine_analysis_request(self, original_request, assessment) -> AnalysisRequest
Purpose: Enhanced refinement of analysis request based on detailed assessment
Parameters:
original_request: Type: AnalysisRequestassessment: Type: Dict[str, Any]
Returns: Returns AnalysisRequest
_prepare_analysis_configuration(self, df, analysis_request, final_iteration) -> Dict[str, Any]
Purpose: Prepare analysis configuration based on final dataset
Parameters:
df: Type: pd.DataFrameanalysis_request: Type: AnalysisRequestfinal_iteration: Type: QueryIteration
Returns: Returns Dict[str, Any]
_generate_workflow_metadata(self, analysis_request, iterations, final_dataset, analysis_config, final_quality_assessment) -> Dict[str, Any]
Purpose: Generate comprehensive metadata for the workflow
Parameters:
analysis_request: Type: AnalysisRequestiterations: Type: List[QueryIteration]final_dataset: Type: pd.DataFrameanalysis_config: Type: Dict[str, Any]final_quality_assessment: Type: Dict[str, Any]
Returns: Returns Dict[str, Any]
_convert_discovered_schema_to_database_schema(self, discovered_schema) -> DatabaseSchema
Purpose: Convert dynamic discovered schema to DatabaseSchema format
Parameters:
discovered_schema: Parameter
Returns: Returns DatabaseSchema
_get_existing_sql_for_enhancement(self, session_id)
Purpose: Get existing SQL query from current session for enhancement
Parameters:
session_id: Type: str
Returns: None
_enhance_existing_sql(self, existing_sql, enhancement_request, connection_config, max_rows) -> dict
Purpose: Enhance existing SQL query by adding JOINs and additional data
Parameters:
existing_sql: Type: strenhancement_request: Type: strconnection_config: Parametermax_rows: Type: int
Returns: Returns dict
_get_previous_dataset_context(self, session_id) -> str
Purpose: Get previous dataset and SQL context for a session
Parameters:
session_id: Type: str
Returns: Returns str
Required Imports
import json
import logging
import importlib.util
from typing import Dict
from typing import List
Usage Example
# Example usage:
# result = EnhancedSQLWorkflow(bases)
Tags
Similar Components
AI-powered semantic similarity - components with related functionality:
-
class TwoPassSqlWorkflow 71.0% similar
-
function enhanced_sql_workflow 68.8% similar
-
function execute_enhanced_workflow_background 67.5% similar
-
function test_enhanced_workflow 62.1% similar
-
function demonstrate_sql_workflow_v1 62.0% similar