class SQLQueryGenerator
Generates SQL queries based on user requests and database schema
/tf/active/vicechatdev/full_smartstat/sql_query_generator.py
347 - 1909
moderate
Purpose
Generates SQL queries based on user requests and database schema
Source Code
class SQLQueryGenerator:
"""Generates SQL queries based on user requests and database schema"""
def __init__(self, schema: DatabaseSchema, statistical_agent=None):
self.schema = schema
self.statistical_agent = statistical_agent
self.is_poulpharm = schema.database_name.lower() == 'poulpharm'
self._load_dynamic_schema_info()
self._initialize_common_patterns()
self._setup_poulpharm_optimizations() if self.is_poulpharm else None
def _load_dynamic_schema_info(self):
"""Load comprehensive schema information - now uses enhanced schema data"""
self.detailed_tables = {}
self.table_relationships = {}
self.business_context = {}
# Use comprehensive schema data if available (new approach)
if hasattr(self.schema, 'system_architecture') and self.schema.system_architecture:
logger.info(f"Using comprehensive schema with {len(self.schema.complete_table_list)} tables")
self._load_from_comprehensive_schema()
else:
# Fallback to legacy loading methods
logger.info("Using legacy schema loading methods")
# Load from table_list.json if available
self._load_table_descriptions()
# Load from detailed markdown files if available
self._load_detailed_schema_from_markdown()
# Extract relationships from schema data
self._extract_table_relationships()
def _load_from_comprehensive_schema(self):
"""Load schema information from comprehensive schema data"""
try:
# Load table information from comprehensive schema
for table_name in self.schema.complete_table_list:
if table_name not in self.detailed_tables:
self.detailed_tables[table_name] = {}
# Add table metadata if available in categories
for category, tables in self.schema.table_categories.items():
if table_name in tables:
self.detailed_tables[table_name]['category'] = category
self.detailed_tables[table_name]['business_purpose'] = self._infer_business_purpose_from_category(table_name, category)
break
# Load relationships from comprehensive schema
if isinstance(self.schema.key_relationships, list):
for relationship in self.schema.key_relationships:
if isinstance(relationship, dict):
source_table = relationship.get('source_table', '')
target_table = relationship.get('target_table', '')
if source_table and target_table:
if source_table not in self.table_relationships:
self.table_relationships[source_table] = []
self.table_relationships[source_table].append({
'target_table': target_table,
'relationship_type': relationship.get('relationship_type', 'Unknown'),
'source_columns': relationship.get('source_columns', []),
'target_columns': relationship.get('target_columns', [])
})
logger.info(f"Loaded comprehensive schema: {len(self.detailed_tables)} tables, {len(self.schema.key_relationships) if isinstance(self.schema.key_relationships, list) else 0} relationships")
except Exception as e:
logger.error(f"Error loading comprehensive schema: {str(e)}")
# Fallback to legacy methods if comprehensive loading fails
self._load_table_descriptions()
self._load_detailed_schema_from_markdown()
self._extract_table_relationships()
def _infer_business_purpose_from_category(self, table_name: str, category: str) -> str:
"""Infer business purpose from table category and name"""
table_lower = table_name.lower()
if category == 'results':
if 'serologie' in table_lower:
return 'Serology test results and measurements'
elif 'bact' in table_lower:
return 'Bacteriology test results and analysis'
elif 'pcr' in table_lower:
return 'PCR test results and genetic analysis'
elif 'result' in table_lower:
return 'Laboratory test results and measurements'
else:
return 'Analysis results and test outcomes'
elif category == 'lookup':
return 'Reference data and lookup values'
elif category == 'core_data':
if 'sample' in table_lower:
return 'Sample tracking and management'
elif 'analysis' in table_lower:
return 'Analysis definitions and configurations'
else:
return 'Core business data and entities'
elif category == 'system':
return 'System administration and user management'
elif category == 'audit':
return 'Audit trail and system logging'
else:
return 'Database table for business operations'
def _load_table_descriptions(self):
"""Load table descriptions from comprehensive database schema"""
try:
# Use new comprehensive schema files instead of deprecated table_list.json
schema_path = Path(__file__).parent / "database_schema_20251003_120434.json"
if schema_path.exists():
with open(schema_path, 'r', encoding='utf-8') as f:
schema_data = json.load(f)
# Extract table information from comprehensive schema
if 'columns_by_table' in schema_data:
for table_name, table_info in schema_data['columns_by_table'].items():
if table_name not in self.detailed_tables:
self.detailed_tables[table_name] = {}
# Store actual column information - columns_by_table[table_name] is a list
if isinstance(table_info, list):
self.detailed_tables[table_name]['columns'] = table_info
self.detailed_tables[table_name]['column_count'] = len(table_info)
# Infer business purpose from table name
self.detailed_tables[table_name]['business_purpose'] = self._infer_business_purpose_from_name(table_name)
logger.info(f"Loaded comprehensive schema for {len(self.detailed_tables)} tables from database_schema_20251003_120434.json")
except Exception as e:
logger.warning(f"Could not load comprehensive schema: {str(e)}")
def _infer_business_purpose_from_name(self, table_name: str) -> str:
"""Infer business purpose from table name only"""
# Categorize tables by their business function
if any(keyword in table_name.lower() for keyword in ['request', 'order']):
return 'workflow_initiation'
elif any(keyword in table_name.lower() for keyword in ['sample', 'specimen']):
return 'sample_management'
elif any(keyword in table_name.lower() for keyword in ['result', 'analysis', 'test']):
return 'testing_results'
elif any(keyword in table_name.lower() for keyword in ['company', 'customer', 'client']):
return 'customer_management'
elif any(keyword in table_name.lower() for keyword in ['bact', 'pcr', 'serolog']):
return 'specialized_testing'
else:
return 'general_operations'
def _infer_business_purpose(self, table_name: str, description: str) -> str:
"""Infer business purpose from table name and description"""
# Categorize tables by their business function
if any(keyword in table_name.lower() for keyword in ['request', 'order']):
return 'workflow_initiation'
elif any(keyword in table_name.lower() for keyword in ['sample', 'specimen']):
return 'sample_management'
elif any(keyword in table_name.lower() for keyword in ['result', 'analysis', 'test']):
return 'testing_results'
elif any(keyword in table_name.lower() for keyword in ['company', 'customer', 'client']):
return 'customer_management'
elif any(keyword in table_name.lower() for keyword in ['bact', 'pcr', 'serolog']):
return 'specialized_testing'
elif any(keyword in description.lower() for keyword in ['audit', 'log', 'track']):
return 'audit_tracking'
else:
return 'supporting_data'
def _load_detailed_schema_from_markdown(self):
"""Load detailed schema from markdown reference files"""
try:
labosoft_dir = Path(__file__).parent.parent / "labosoft_SQL"
schema_files = list(labosoft_dir.glob("poulpharm_lims_database_reference_*.md"))
if schema_files:
latest_schema = sorted(schema_files)[-1]
logger.info(f"Loading detailed schema from {latest_schema}")
# Parse the markdown file for column information
self._parse_markdown_schema(latest_schema)
self._enhance_with_schema_categories()
else:
logger.warning("No detailed schema markdown files found")
except Exception as e:
logger.warning(f"Could not load detailed schema from markdown: {str(e)}")
def _parse_markdown_schema(self, schema_file_path):
"""Parse markdown schema file to extract table and column information"""
try:
with open(schema_file_path, 'r', encoding='utf-8') as f:
content = f.read()
import re
# Find table sections (#### TableName)
table_pattern = r'#### (\w+)\s*\n\s*\|[^|]*\|[^|]*\|\s*\n[^#]*?\*\*Columns:\*\*\s*\n\s*\|[^|]*\|[^|]*\|[^|]*\|[^|]*\|[^|]*\|\s*\n[^|]*\n((?:\|[^|]*\|[^|]*\|[^|]*\|[^|]*\|[^|]*\|\s*\n)*)'
# Extract key tables and their columns
key_tables = ['Requests', 'Samples', 'Results', 'Analyses', 'AnalysisGroups', 'SampleAnalyseGroups', 'Companies']
for table_name in key_tables:
if table_name not in self.detailed_tables:
self.detailed_tables[table_name] = {}
# Look for table-specific column patterns in the markdown
table_section_pattern = f'#### {table_name}.*?\\*\\*Columns:\\*\\*(.*?)(?=####|$)'
table_match = re.search(table_section_pattern, content, re.DOTALL | re.IGNORECASE)
if table_match:
columns_section = table_match.group(1)
# Extract column names from table rows
column_pattern = r'\|\s*`([^`]+)`\s*\|'
columns = re.findall(column_pattern, columns_section)
if columns:
self.detailed_tables[table_name]['columns'] = columns
logger.info(f"Extracted {len(columns)} columns for {table_name}")
except Exception as e:
logger.warning(f"Could not parse markdown schema: {str(e)}")
def _enhance_with_schema_categories(self):
"""Enhance table information with categories from main schema"""
try:
# Check if schema has table_categories attribute
if hasattr(self.schema, 'table_categories') and self.schema.table_categories:
table_categories = self.schema.table_categories
if isinstance(table_categories, dict):
for category_name, category_info in table_categories.items():
if isinstance(category_info, dict) and 'tables' in category_info:
for table_entry in category_info['tables']:
# Handle both string names and dict entries
if isinstance(table_entry, str):
table_name = table_entry
elif isinstance(table_entry, dict) and 'name' in table_entry:
table_name = table_entry['name']
else:
continue
if table_name not in self.detailed_tables:
self.detailed_tables[table_name] = {}
self.detailed_tables[table_name]['category'] = category_name
self.detailed_tables[table_name]['category_context'] = category_info.get('description', '')
# If it's a dict entry, extract additional info
if isinstance(table_entry, dict):
if 'description' in table_entry:
self.detailed_tables[table_name]['description'] = table_entry['description']
if 'schema' in table_entry:
self.detailed_tables[table_name]['schema'] = table_entry['schema']
except Exception as e:
logger.warning(f"Could not enhance with schema categories: {str(e)}")
def _extract_table_relationships(self):
"""Extract table relationships from schema information"""
# Initialize with empty dict and safely copy key_relationships if they exist
self.table_relationships = {}
if hasattr(self.schema, 'key_relationships') and self.schema.key_relationships:
try:
self.table_relationships = self.schema.key_relationships.copy()
except Exception as e:
logger.warning(f"Could not copy key_relationships: {str(e)}")
# Infer additional relationships from foreign key patterns
for table_name in self.schema.complete_table_list:
# Common foreign key patterns
potential_fks = []
# ID patterns (assumes standard naming conventions)
if table_name.endswith('s') and table_name != 'Analyses': # Plural table
singular = table_name[:-1]
potential_fks.append(f"{singular}ID")
# Add to relationships if not already present
if table_name not in self.table_relationships:
self.table_relationships[table_name] = {
'inferred_foreign_keys': potential_fks,
'related_tables': self._find_related_tables(table_name)
}
def _find_related_tables(self, table_name: str) -> List[str]:
"""Find tables that are likely related to the given table"""
related = []
# Look for tables with similar names or obvious relationships
base_name = table_name.lower().replace('tbl', '').replace('_', '')
for other_table in self.schema.complete_table_list:
if other_table == table_name:
continue
other_base = other_table.lower().replace('tbl', '').replace('_', '')
# Check for name similarities or known patterns
if (base_name in other_base or other_base in base_name or
any(keyword in base_name and keyword in other_base
for keyword in ['result', 'analysis', 'sample', 'request'])):
related.append(other_table)
return related[:5] # Limit to avoid too many relationships
def _initialize_common_patterns(self):
"""Initialize common query patterns for different types of analysis"""
self.common_query_patterns = {
'customer_stats': {
'keywords': ['customer', 'requests', 'statistics', 'stats', 'company'],
'tables': ['Requests', 'Companies', 'Accounts', 'Samples'],
'avoid_tables': ['tblBactResult', 'tblBactResultLine', 'tblBactResultLineAnalyse']
},
'bacteriology': {
'keywords': ['bacteria', 'antibiogram', 'sensitivity', 'resistance', 'germ'],
'tables': ['Requests', 'Samples', 'tblBactResult', 'tblBactResultLine', 'tblBactResultLineAnalyse', 'Germs'],
'required_joins': [
'Samples s ON s.Sample_Request = r.Id',
'tblBactResult br ON br.SampleID = s.Id',
'tblBactResultLine brl ON br.BactResultID = brl.BactResultID',
'tblBactResultLineAnalyse brla ON brl.BactResultLineID = brla.BactResultLineID'
]
},
'sample_workflow': {
'keywords': ['sample', 'workflow', 'analysis', 'testing'],
'tables': ['Requests', 'Samples', 'SampleAnalyseGroups', 'AnalysisGroups'],
'avoid_tables': ['tblBactResult', 'tblBactResultLine']
},
'analysis_results': {
'keywords': ['analysis', 'results', 'testing', 'laboratory', 'distribution', 'types', 'over time'],
'tables': ['Requests', 'Samples', 'SampleAnalyseGroups', 'Results', 'Analyses', 'AnalysisGroups'],
'required_joins': [
'Samples s ON s.Sample_Request = r.Id',
'SampleAnalyseGroups sag ON sag.SampleAnalyseGroup_Sample = s.Id',
'Results res ON res.Result_Sample = s.Id',
'Analyses a ON a.Id = res.Result_Analysis',
'AnalysisGroups ag ON ag.Id = sag.RequestAnalyseGroup_AnalysisGroup'
]
},
'descriptive_statistics': {
'keywords': ['descriptive', 'statistics', 'distribution', 'frequency', 'count', 'temporal', 'time'],
'tables': ['Requests', 'Samples', 'SampleAnalyseGroups', 'Results', 'Analyses', 'AnalysisGroups'],
'required_joins': [
'Samples s ON s.Sample_Request = r.Id',
'SampleAnalyseGroups sag ON sag.SampleAnalyseGroup_Sample = s.Id',
'Results res ON res.Result_Sample = s.Id',
'Analyses a ON a.Id = res.Result_Analysis',
'AnalysisGroups ag ON ag.Id = sag.RequestAnalyseGroup_AnalysisGroup'
]
}
}
def _load_detailed_schema(self):
"""Load detailed schema information from database reference files if available"""
self.detailed_tables = {}
self.table_relationships = {}
if self.is_poulpharm:
# Try to load from the detailed markdown files
try:
self._parse_poulpharm_detailed_schema()
except Exception as e:
logger.warning(f"Could not load detailed schema: {str(e)}")
self._setup_fallback_poulpharm_schema()
def _parse_poulpharm_detailed_schema(self):
"""Parse detailed schema from Poulpharm LIMS database reference files"""
import os
from pathlib import Path
# Look for detailed schema files in the labosoft_SQL directory
labosoft_dir = Path(__file__).parent.parent / "labosoft_SQL"
schema_files = list(labosoft_dir.glob("poulpharm_lims_database_reference_*.md"))
if schema_files:
# Use the most recent schema file (by filename)
latest_schema = sorted(schema_files)[-1]
logger.info(f"Loading detailed schema from {latest_schema}")
self._parse_markdown_schema(latest_schema)
else:
raise FileNotFoundError("No detailed schema files found")
def _parse_markdown_schema(self, schema_file_path):
"""Parse table and column information from markdown schema file"""
# Try to load detailed schema first, fall back to basic schema if needed
self._load_detailed_database_schema()
def _load_detailed_database_schema(self):
"""Load detailed database schema from JSON files"""
schema_file = 'database_schema_20251003_120434.json'
relationships_file = 'database_relationships_20251003_130415.json'
try:
# Load detailed schema
if os.path.exists(schema_file):
with open(schema_file, 'r', encoding='utf-8') as f:
schema_data = json.load(f)
# Extract column information by table
if 'columns_by_table' in schema_data:
for table_name, columns in schema_data['columns_by_table'].items():
if table_name not in self.detailed_tables:
self.detailed_tables[table_name] = {}
# Extract column names and their types
column_names = []
column_details = {}
for col in columns:
col_name = col['COLUMN_NAME']
column_names.append(col_name)
column_details[col_name] = {
'data_type': col['DATA_TYPE'],
'is_nullable': col['IS_NULLABLE'],
'is_primary_key': col.get('IS_PRIMARY_KEY', 'NO'),
'is_foreign_key': col.get('IS_FOREIGN_KEY', 'NO'),
'referenced_table': col.get('REFERENCED_TABLE_NAME'),
'referenced_column': col.get('REFERENCED_COLUMN_NAME')
}
self.detailed_tables[table_name]['columns'] = column_names
self.detailed_tables[table_name]['column_details'] = column_details
# Infer business purpose
self.detailed_tables[table_name]['business_purpose'] = self._infer_business_purpose(table_name, '')
logger.info(f"Loaded detailed schema for {len(self.detailed_tables)} tables")
# Load relationships
if os.path.exists(relationships_file):
with open(relationships_file, 'r', encoding='utf-8') as f:
relationships_data = json.load(f)
if 'relationships' in relationships_data:
for rel in relationships_data['relationships']:
source_table = rel['source_table']
target_table = rel['target_table']
if source_table not in self.table_relationships:
self.table_relationships[source_table] = {'related_tables': []}
if target_table not in self.table_relationships[source_table]['related_tables']:
self.table_relationships[source_table]['related_tables'].append(target_table)
logger.info(f"Loaded {len(relationships_data.get('relationships', []))} table relationships")
except Exception as e:
logger.warning(f"Could not load detailed schema files: {e}")
self._setup_fallback_poulpharm_schema()
def _setup_fallback_poulpharm_schema(self):
"""Setup fallback - this should not be used when dynamic schema is available"""
logger.warning("Using fallback schema - dynamic schema discovery should be preferred")
# Keep minimal fallback for emergency situations
self.detailed_tables = {
'Requests': {
'primary_key': 'Id',
'columns': {
'Id': 'int',
'RequestNr': 'nvarchar(15)',
'DateCreated': 'datetime',
'sCustomer': 'nvarchar(255)',
'Customer_ID': 'int',
'Veterinarian_ID': 'int',
'sVeterinarian': 'nvarchar(255)',
'TestCode': 'nvarchar(255)',
'StartNumber': 'int'
},
'foreign_keys': {
'Customer_ID': 'Companies.Id',
'Veterinarian_ID': 'tblDierenarts.DierenartsID'
}
},
'Samples': {
'primary_key': 'Id',
'columns': {
'Id': 'int',
'SampleNr': 'nvarchar(50)',
'Identification': 'nvarchar(255)',
'DateCreated': 'datetime',
'DateSampling': 'datetime',
'Sample_Request': 'int',
'Sample_SampleType': 'int'
},
'foreign_keys': {
'Sample_Request': 'Requests.Id',
'Sample_SampleType': 'SampleTypes.Id'
}
},
'SampleAnalyseGroups': {
'primary_key': 'Id',
'columns': {
'Id': 'int',
'Germ': 'nvarchar(255)',
'SampleAnalyseGroup_Sample': 'int',
'RequestAnalyseGroup_AnalysisGroup': 'int',
'IsAnalyseDone': 'bit',
'IsAnalyseExported': 'bit'
},
'foreign_keys': {
'SampleAnalyseGroup_Sample': 'Samples.Id',
'RequestAnalyseGroup_AnalysisGroup': 'AnalysisGroups.Id'
}
},
'Results': {
'primary_key': 'Id',
'columns': {
'Id': 'int',
'DateCreated': 'datetime',
'DateLastModified': 'datetime',
'CreatedBy': 'nvarchar(255)',
'ModifiedBy': 'nvarchar(255)',
'TechValidated': 'bit',
'BioValidated': 'bit',
'TechValidatedBy': 'nvarchar(255)',
'BioValidatedBy': 'nvarchar(255)',
'DateTechValidated': 'datetime',
'DateBioValidated': 'datetime',
'Result_Sample': 'int',
'Result_Analysis': 'int',
'Result_AnalysisGroup': 'int',
'Result_AnalysisAnalysisGroup': 'int'
},
'foreign_keys': {
'Result_Sample': 'Samples.Id',
'Result_Analysis': 'Analyses.Id'
}
},
'Analyses': {
'primary_key': 'Id',
'columns': {
'Id': 'int',
'Name_nl': 'nvarchar(255)',
'Name_fr': 'nvarchar(255)',
'Name_en': 'nvarchar(255)',
'ResultCode': 'nvarchar(255)',
'ShowOnReport': 'bit',
'RapportCode': 'nvarchar(5)',
'AnalyseCode': 'nvarchar(15)'
},
'foreign_keys': {}
},
'AnalysisGroups': {
'primary_key': 'Id',
'columns': {
'Id': 'int',
'Name_nl': 'nvarchar(255)',
'Name_fr': 'nvarchar(255)',
'Name_en': 'nvarchar(255)',
'Item_ItemCode': 'nvarchar(255)',
'Abbrev': 'nvarchar(255)',
'Sequence': 'int',
'IsCanceled': 'bit',
'AnalysisGroup_AnalysisCategory': 'int',
'ReportSmall': 'bit',
'IsAutomated': 'bit'
},
'foreign_keys': {
'AnalysisGroup_AnalysisCategory': 'AnalysisCategories.Id'
}
},
'SampleTypes': {
'primary_key': 'Id',
'columns': {
'Id': 'int',
'Name_nl': 'nvarchar(255)',
'Name_fr': 'nvarchar(255)',
'Name_en': 'nvarchar(255)',
'IsSalmonella': 'bit'
},
'foreign_keys': {}
},
'Accounts': {
'primary_key': 'Id',
'columns': {
'Id': 'int',
'Name': 'varchar(255)', # Note: This exists in Accounts table
'VatNumber': 'varchar(50)',
'StableNumber': 'varchar(50)',
'RegistrationNumber': 'varchar(50)',
'AddressStreet': 'varchar(100)',
'AddressCity': 'varchar(100)',
'AddressZipCode': 'varchar(50)',
'AddressCountryCode': 'varchar(5)',
'EmailAddress': 'varchar(100)'
},
'foreign_keys': {}
},
'Companies': {
'primary_key': 'Id',
'columns': {
'Id': 'int',
'Name_nl': 'nvarchar(255)', # Changed from 'Name' to 'Name_nl'
'Name_fr': 'nvarchar(255)',
'Name_en': 'nvarchar(255)',
'Stad': 'nvarchar(255)',
'Land': 'nvarchar(255)',
'ReportPath': 'nvarchar(255)',
'Telefoon': 'nvarchar(255)',
'Email': 'nvarchar(255)'
}
},
'AnalysisGroups': {
'primary_key': 'Id',
'columns': {
'Id': 'int',
'Name_en': 'nvarchar(255)',
'Name_nl': 'nvarchar(255)',
'Name_fr': 'nvarchar(255)',
'Abbrev': 'nvarchar(255)'
}
},
'Analyses': {
'primary_key': 'Id',
'columns': {
'Id': 'int',
'Name_en': 'nvarchar(255)',
'Name_nl': 'nvarchar(255)',
'Name_fr': 'nvarchar(255)',
'AnalyseCode': 'nvarchar(15)'
}
},
'SampleTypes': {
'primary_key': 'Id',
'columns': {
'Id': 'int',
'Name_en': 'nvarchar(255)',
'Name_nl': 'nvarchar(255)',
'Name_fr': 'nvarchar(255)',
'IsSalmonella': 'bit'
}
},
# Bacteriology tables - based on database reference
'tblBactResult': {
'primary_key': 'BactResultID',
'columns': {
'BactResultID': 'int',
'SampleID': 'int',
'Bar_DatumInzet': 'datetime',
'Bar_VerificatieAt': 'datetime',
'Bar_VerificatieBy': 'nvarchar(50)',
'Bar_IsAfgewerkt': 'bit'
},
'foreign_keys': {
'SampleID': 'Samples.Id'
}
},
'tblBactResultLine': {
'primary_key': 'BactResultLineID',
'columns': {
'BactResultLineID': 'int',
'BactResultID': 'int',
'KiemID': 'int',
'Brl_KiemenDescr': 'nvarchar(50)',
'Brl_Plaat': 'nvarchar(15)'
},
'foreign_keys': {
'BactResultID': 'tblBactResult.BactResultID',
'KiemID': 'Germs.GermId'
}
},
'tblBactResultLineAnalyse': {
'primary_key': 'BactResultLineAnalyseID',
'columns': {
'BactResultLineAnalyseID': 'int',
'BactResultLineID': 'int',
'AnalyseID': 'int',
'Brla_AantalMM': 'int',
'PosNegsID': 'int'
},
'foreign_keys': {
'BactResultLineID': 'tblBactResultLine.BactResultLineID',
'AnalyseID': 'Analyses.Id',
'PosNegsID': 'PosNegs.Id'
}
},
'Germs': {
'primary_key': 'GermId',
'columns': {
'GermId': 'int',
'Name_en': 'nvarchar(255)',
'Name_nl': 'nvarchar(255)',
'Name_fr': 'nvarchar(255)'
}
},
'PosNegs': {
'primary_key': 'Id',
'columns': {
'Id': 'int',
'Name_en': 'nvarchar(255)',
'Name_nl': 'nvarchar(255)'
}
}
}
def get_best_name_column(self, table_name: str) -> str:
"""Get the best available name column for a table based on dynamic schema"""
available_columns = self.get_available_columns(table_name)
# Priority order for name columns (preferred first)
name_column_priorities = [
'Name_nl', # Dutch name (preferred for Poulpharm)
'Name', # Generic name
'Naam', # Dutch alternative
'Name_en', # English name
'Name_fr', # French name
'Title', # Alternative name field
'Description', # Fallback description
'Code', # Code as identifier
'Id' # Last resort - ID field
]
for preferred_name in name_column_priorities:
if preferred_name in available_columns:
logger.info(f"Using '{preferred_name}' as name column for table '{table_name}'")
return preferred_name
# If no standard name column found, return the first non-ID column
for col in available_columns:
if col.lower() not in ['id', 'datecreated', 'datemodified', 'createdby', 'modifiedby']:
logger.warning(f"Using fallback column '{col}' as name for table '{table_name}'")
return col
# Ultimate fallback - use Id
logger.warning(f"No suitable name column found for table '{table_name}', using 'Id'")
return 'Id'
def validate_column_exists(self, table_name: str, column_name: str) -> bool:
"""Check if a column actually exists in the dynamic schema"""
available_columns = self.get_available_columns(table_name)
return column_name in available_columns
def get_safe_column_reference(self, table_name: str, preferred_column: str, fallback_column: str = 'Id') -> str:
"""Get a safe column reference, falling back if the preferred column doesn't exist"""
if self.validate_column_exists(table_name, preferred_column):
return preferred_column
elif self.validate_column_exists(table_name, fallback_column):
logger.warning(f"Column '{preferred_column}' not found in table '{table_name}', using '{fallback_column}'")
return fallback_column
else:
# Try to find the best available column
best_column = self.get_best_name_column(table_name)
logger.warning(f"Neither '{preferred_column}' nor '{fallback_column}' found in table '{table_name}', using '{best_column}'")
return best_column
def _setup_poulpharm_optimizations(self):
"""Setup Poulpharm LIMS specific query optimizations"""
# Common query patterns for Poulpharm LIMS
self.common_patterns = {
'recent_requests': {
'keywords': ['recent', 'latest', 'new', 'current'],
'base_query': 'Requests WHERE DateCreated >= DATEADD(month, -3, GETDATE())'
},
'bacteriology': {
'keywords': ['bacteria', 'antibiogram', 'sensitivity', 'resistance', 'germ'],
'tables': ['tblBactResult', 'tblAntibiogram', 'tblBactResultLine']
},
'pcr_analysis': {
'keywords': ['pcr', 'molecular', 'genetic'],
'tables': ['tblPCRResult', 'tblPCRKit', 'tblPCRImport']
},
'customer_analysis': {
'keywords': ['customer', 'company', 'veterinarian', 'client'],
'tables': ['Companies', 'tblDierenarts', 'Requests']
},
'sample_workflow': {
'keywords': ['sample', 'workflow', 'processing', 'status'],
'tables': ['Samples', 'SampleAnalyseGroups', 'AnalysisGroups']
}
}
def _detect_query_intent(self, user_query: str) -> Dict[str, Any]:
"""Detect user query intent and suggest appropriate tables/approach"""
query_lower = user_query.lower()
# Score each pattern
pattern_scores = {}
for pattern_name, pattern_info in self.common_query_patterns.items():
score = 0
for keyword in pattern_info['keywords']:
if keyword in query_lower:
score += 1
pattern_scores[pattern_name] = score
# Find best matching pattern
best_pattern = max(pattern_scores.items(), key=lambda x: x[1])
if best_pattern[1] > 0:
pattern_info = self.common_query_patterns[best_pattern[0]]
return {
'pattern': best_pattern[0],
'confidence': best_pattern[1] / len(pattern_info['keywords']),
'suggested_tables': pattern_info['tables'],
'avoid_tables': pattern_info.get('avoid_tables', []),
'required_joins': pattern_info.get('required_joins', [])
}
# Default fallback
return {
'pattern': 'generic',
'confidence': 0.1,
'suggested_tables': ['Requests', 'Samples'],
'avoid_tables': [],
'required_joins': []
}
def _format_top_clause(self, max_rows) -> str:
"""Helper method to format TOP clause for SQL queries"""
if max_rows is None:
return "" # No TOP clause for unlimited
return f"TOP {max_rows}"
def generate_sql_query(self, user_query: str, max_rows: Optional[int] = 10000, model: str = 'claude-sonnet-4-5-20250929') -> Tuple[str, Dict[str, Any]]:
"""
Generate SQL query based on user analysis request
Returns: (sql_query, metadata)
"""
try:
# Detect query intent
logger.debug("Detecting query intent...")
intent = self._detect_query_intent(user_query)
logger.info(f"Detected query intent: {intent['pattern']} (confidence: {intent['confidence']:.2f})")
# Prepare context for the LLM
logger.debug("Preparing database context...")
context = self._prepare_database_context()
# Create prompt for SQL generation with intent guidance
logger.debug("Creating SQL generation prompt...")
prompt = self._create_sql_generation_prompt(user_query, context, max_rows, intent)
# Use statistical agent to generate SQL if available
if self.statistical_agent:
response = self._generate_with_llm(prompt, model)
sql_query, explanation = self._parse_llm_response(response)
# Validate generated SQL
is_valid, validation_errors = self._validate_generated_sql(sql_query)
if not is_valid:
logger.warning(f"Generated SQL has validation errors: {validation_errors}")
# Try fallback approach
sql_query, explanation = self._generate_basic_sql(user_query, max_rows)
else:
# Fallback: basic SQL generation
sql_query, explanation = self._generate_basic_sql(user_query, max_rows)
metadata = {
'user_query': user_query,
'database_name': self.schema.database_name,
'explanation': explanation,
'generated_at': datetime.now().isoformat(),
'max_rows': max_rows,
'detected_intent': intent,
'generation_method': 'llm' if self.statistical_agent else 'fallback'
}
return sql_query, metadata
except Exception as e:
logger.error(f"Error generating SQL query: {str(e)}")
raise
def _prepare_database_context(self) -> str:
"""Prepare comprehensive database schema context using dynamic schema information"""
logger.debug("Starting _prepare_database_context")
context_parts = []
# Database overview
context_parts.append(f"DATABASE: {self.schema.database_name}")
context_parts.append(f"DESCRIPTION: {self.schema.description}")
context_parts.append(f"TOTAL TABLES: {len(self.schema.complete_table_list)}")
# Add business context from table categories (if available)
if hasattr(self.schema, 'table_categories') and self.schema.table_categories:
context_parts.append("\nBUSINESS DOMAIN CATEGORIES:")
try:
table_categories = self.schema.table_categories
if isinstance(table_categories, dict):
for category, info in table_categories.items():
context_parts.append(f" {category}:")
if isinstance(info, dict):
if 'description' in info:
context_parts.append(f" Purpose: {info['description']}")
if 'tables' in info and isinstance(info['tables'], list):
# Extract table names from the list (which may contain dicts)
table_names = []
for table_entry in info['tables'][:10]: # Limit display
if isinstance(table_entry, str):
table_names.append(table_entry)
elif isinstance(table_entry, dict) and 'name' in table_entry:
table_names.append(table_entry['name'])
if table_names:
context_parts.append(f" Tables: {', '.join(table_names)}")
except Exception as e:
logger.warning(f"Could not add table categories to context: {str(e)}")
# Add detailed table information from our dynamic loading
context_parts.append("\nKEY TABLES WITH BUSINESS CONTEXT:")
# Group tables by business purpose for better context
purpose_groups = {}
for table_name, table_info in self.detailed_tables.items():
purpose = table_info.get('business_purpose', 'supporting_data')
if purpose not in purpose_groups:
purpose_groups[purpose] = []
purpose_groups[purpose].append((table_name, table_info))
# Present tables organized by business purpose
try:
for purpose, tables in purpose_groups.items():
context_parts.append(f"\n{purpose.upper().replace('_', ' ')} TABLES:")
tables_list = list(tables) if not isinstance(tables, list) else tables
for table_name, table_info in tables_list[:5]: # Limit to avoid context overflow
context_parts.append(f" {table_name}:")
if 'description' in table_info:
context_parts.append(f" Purpose: {table_info['description']}")
if 'category' in table_info:
context_parts.append(f" Category: {table_info['category']}")
except Exception as e:
logger.error(f"Error in business purpose tables section: {e}")
context_parts.append("\nBUSINESS PURPOSE TABLES: Error loading table information")
# Add complete table list for reference
context_parts.append(f"\nCOMPLETE TABLE LIST ({len(self.schema.complete_table_list)} tables):")
context_parts.append(", ".join(self.schema.complete_table_list))
# Add relationship context
try:
context_parts.append("\nTABLE RELATIONSHIPS:")
for table_name, relationships in self.table_relationships.items():
if 'related_tables' in relationships and relationships['related_tables']:
# Ensure we only join strings, not dicts
related_names = []
related_tables = relationships['related_tables']
if isinstance(related_tables, list):
for item in related_tables[:3]:
if isinstance(item, str):
related_names.append(item)
elif isinstance(item, dict) and 'name' in item:
related_names.append(item['name'])
else:
related_names.append(str(item))
else:
logger.warning(f"related_tables for {table_name} is not a list: {type(related_tables)}")
if related_names:
context_parts.append(f" {table_name} relates to: {', '.join(related_names)}")
except Exception as e:
logger.error(f"Error in table relationships section: {e}")
context_parts.append("\nTABLE RELATIONSHIPS: Error loading relationship information")
# Add actual column information for key tables
context_parts.append("\nKEY TABLE COLUMNS (Actual column names to use in SQL):")
key_tables_for_columns = ['Requests', 'Samples', 'Results', 'Analyses', 'AnalysisGroups', 'SampleAnalyseGroups', 'Companies']
for table_name in key_tables_for_columns:
if table_name in self.detailed_tables and 'columns' in self.detailed_tables[table_name]:
columns_data = self.detailed_tables[table_name]['columns']
# Handle both list and dict formats for columns
if isinstance(columns_data, list):
columns = columns_data[:15] # Show first 15 columns for better context
context_parts.append(f" {table_name}: {', '.join(columns)}")
# Add special note for tables with language-specific columns
if any('_nl' in col or '_fr' in col or '_en' in col for col in columns):
lang_cols = [col for col in columns if '_nl' in col or '_fr' in col or '_en' in col]
context_parts.append(f" ā Language columns: {', '.join(lang_cols)}")
# Get best name column for this specific table
best_name_col = self.get_best_name_column(table_name)
context_parts.append(f" ā For {table_name}: Use '{best_name_col}' for name/description")
elif isinstance(columns_data, dict):
# If it's a dict (column_name: data_type), extract just the column names
column_names = list(columns_data.keys())[:15]
context_parts.append(f" {table_name}: {', '.join(column_names)}")
else:
logger.warning(f"Columns for {table_name} is unexpected type: {type(columns_data)}")
# Add general schema guidance (specific column errors will be handled dynamically in iterations)
context_parts.append("\nļæ½ SCHEMA GUIDANCE:")
context_parts.append("⢠Always verify column names exist in their respective tables")
context_parts.append("⢠Use proper table aliases for clarity")
context_parts.append("⢠Companies table may be empty - consider using r.sCustomer for customer names")
context_parts.append("⢠CRITICAL: Use exact column names from the schema above - verify each column exists in its table")
# Add table-specific column guidance based on common usage patterns
context_parts.append("\nļæ½ TABLE-SPECIFIC COLUMN GUIDANCE:")
common_tables = ['Companies', 'Analyses', 'SampleTypes', 'AnalysisGroups', 'Results', 'Requests', 'Samples']
for table in common_tables:
available_columns = self.get_available_columns(table)
if available_columns:
best_name_col = self.get_best_name_column(table)
context_parts.append(f"⢠{table}: Use '{best_name_col}' for names/descriptions")
# Add recommended JOIN patterns
context_parts.append("\nš” RECOMMENDED JOIN PATTERNS:")
joins = self.schema.get_recommended_joins()
for join_name, join_info in joins.items():
context_parts.append(f"ā
{join_info['description']}:")
context_parts.append(f" FROM {join_info['tables'][0]} {join_info['tables'][1] if len(join_info['tables']) > 1 else ''}")
if len(join_info['tables']) > 1:
context_parts.append(f" JOIN {join_info['tables'][1]} ON {join_info['join_condition']}")
# Add high-value result tables
context_parts.append("\nš HIGH-VALUE RESULT TABLES (prioritize these for laboratory analysis):")
result_tables = self.schema.get_best_result_tables()
for table in result_tables:
available_columns = self.schema.get_available_columns(table)
if available_columns:
value_columns = [col for col in available_columns if any(keyword in col.lower() for keyword in ['result', 'value', 'text', 'posneg'])]
context_parts.append(f" š {table}: {', '.join(value_columns[:5])}")
# Add query optimization guidance
context_parts.append("\nQUERY OPTIMIZATION GUIDANCE:")
context_parts.append("- CRITICAL: Use exact column names as shown above - avoid generic names like 'Name'")
context_parts.append("- For name columns: Check the exact column names above - different tables use different naming patterns")
context_parts.append("- Join tables logically based on business relationships")
context_parts.append("- Include relevant result tables when users ask for analysis outcomes")
context_parts.append("- Consider the full workflow: Request -> Sample -> Analysis -> Results")
context_parts.append("- Use appropriate date filtering for temporal analysis")
context_parts.append("- ALWAYS verify column names exist in the table before using them")
context_parts.append("- ā ļø NEVER use Companies.City, Samples.SampleType, or Users.Name - they don't exist!")
return "\n".join(context_parts)
def _validate_generated_sql(self, sql_query: str) -> Tuple[bool, List[str]]:
"""Validate generated SQL against known schema"""
errors = []
# Extract table names from SQL
import re
# Find FROM and JOIN clauses
table_pattern = r'(?:FROM|JOIN)\s+(\w+)(?:\s+(\w+))?'
matches = re.findall(table_pattern, sql_query, re.IGNORECASE)
for match in matches:
table_name = match[0]
if table_name not in self.detailed_tables:
errors.append(f"Unknown table: {table_name}")
# Find column references (basic check)
column_pattern = r'(\w+)\.(\w+)'
col_matches = re.findall(column_pattern, sql_query)
for table_alias, column_name in col_matches:
# This is a simplified check - would need more sophisticated parsing for full validation
pass
return len(errors) == 0, errors
def _create_sql_generation_prompt(self, user_query: str, db_context: str, max_rows: int, intent: Dict[str, Any] = None) -> str:
"""Create prompt for SQL generation with intent guidance"""
# Build intent-specific guidance
intent_guidance = ""
if intent:
try:
intent_guidance = f"""
DETECTED QUERY INTENT: {intent.get('pattern', 'unknown').upper()} (confidence: {intent.get('confidence', 0.0):.2f})
RECOMMENDED TABLES: {', '.join(intent.get('suggested_tables', []))}
"""
if intent.get('avoid_tables'):
intent_guidance += f"AVOID TABLES: {', '.join(intent.get('avoid_tables', []))}\n"
if intent.get('required_joins'):
intent_guidance += "REQUIRED JOIN PATTERN:\n"
for join in intent.get('required_joins', []):
intent_guidance += f" JOIN {join}\n"
except Exception as e:
logger.error(f"Error building intent guidance: {e}")
intent_guidance = "INTENT GUIDANCE: Error processing query intent"
# Build adaptive guidance based on user query analysis
query_analysis = self._analyze_user_query(user_query)
prompt = f"""You are an expert SQL analyst for the {self.schema.database_name} database. Your goal is to create the OPTIMAL SQL query to answer the user's specific analysis request using the comprehensive schema information provided.
{intent_guidance}
USER ANALYSIS REQUEST: "{user_query}"
QUERY ANALYSIS:
- Primary Focus: {query_analysis['primary_focus']}
- Expected Data Types: {', '.join(query_analysis['expected_data_types'])}
- Temporal Scope: {query_analysis['temporal_scope']}
- Required Business Context: {query_analysis['business_context']}
{db_context}
ADAPTIVE QUERY CONSTRUCTION STRATEGY:
STEP 1 - IDENTIFY OPTIMAL DATASET:
Analyze what the user is asking for and select the minimal but complete set of tables that will provide this information. Consider:
- What is the user's ultimate analytical goal?
- What specific data points are needed to answer their question?
- Which tables contain the core information vs supporting context?
- How should tables be logically connected based on business workflow?
STEP 2 - CONSTRUCT INTELLIGENT JOINS:
- Start with the most relevant core table for the user's request
- Add tables that provide essential context or requested details
- Use proper JOIN types based on data requirements (INNER vs LEFT JOIN)
- Follow the natural business process flow when possible
STEP 3 - OPTIMIZE DATA SELECTION:
- Include columns that directly answer the user's question
- Add supporting context columns that enable analysis
- Consider grouping/aggregation if the user wants summary statistics
- Include temporal fields when user asks about trends or distributions
CRITICAL REQUIREMENTS:
1. Use ONLY table and column names that exist in the schema above
2. Generate SQL Server T-SQL compatible syntax
3. {f"Limit to TOP {max_rows} rows" if max_rows is not None else "Return all rows (no limit)"}
4. Create meaningful column aliases
5. Add appropriate WHERE clauses for data quality/relevance
6. Order results logically for the intended analysis
7. IMPORTANT: Always use LEFT JOIN when joining the Companies table (e.g., "LEFT JOIN Companies c ON c.Id = r.Customer_ID") as many requests may not have valid company references
REFLECTION QUESTIONS TO GUIDE YOUR QUERY:
- Does this query directly address what the user is asking for?
- Have I included all necessary tables to provide complete context?
- Are there any missing pieces that would make the analysis incomplete?
- Is this the most efficient way to retrieve the requested information?
- Will the result set enable the user to perform their intended analysis?
OUTPUT FORMAT:
```sql
-- Analysis Goal: [What this query is designed to achieve]
-- Key Tables Used: [Primary tables and why they were selected]
-- Business Logic: [How the joins reflect real-world relationships]
SELECT {self._format_top_clause(max_rows)}
[columns with clear aliases and business meaning]
FROM [starting_table] [alias]
[JOIN clauses that reflect logical business relationships]
WHERE [conditions that ensure data quality and relevance]
ORDER BY [ordering that supports analysis goals]
```
Now generate the optimal SQL query for this analysis request:"""
return prompt
def _analyze_user_query(self, user_query: str) -> Dict[str, str]:
"""Analyze user query to provide adaptive guidance"""
query_lower = user_query.lower()
# Determine primary focus
if any(word in query_lower for word in ['result', 'outcome', 'finding', 'test', 'analysis']):
primary_focus = "Analysis Results & Outcomes"
elif any(word in query_lower for word in ['customer', 'client', 'company', 'practice']):
primary_focus = "Customer & Practice Information"
elif any(word in query_lower for word in ['sample', 'specimen', 'collection']):
primary_focus = "Sample Management"
elif any(word in query_lower for word in ['request', 'order', 'submission']):
primary_focus = "Request Workflow"
else:
primary_focus = "General Business Intelligence"
# Expected data types
data_types = []
if any(word in query_lower for word in ['count', 'number', 'total', 'sum']):
data_types.append("Aggregated Counts")
if any(word in query_lower for word in ['distribution', 'breakdown', 'category']):
data_types.append("Categorical Distributions")
if any(word in query_lower for word in ['time', 'date', 'over time', 'trend', 'monthly', 'yearly']):
data_types.append("Temporal Data")
if any(word in query_lower for word in ['type', 'kind', 'category']):
data_types.append("Classification Data")
if not data_types:
data_types.append("Detailed Records")
# Temporal scope
if any(word in query_lower for word in ['recent', 'last', 'current']):
temporal_scope = "Recent/Current Data"
elif any(word in query_lower for word in ['historical', 'over time', 'trend']):
temporal_scope = "Historical Trends"
elif any(word in query_lower for word in ['monthly', 'yearly', 'quarterly']):
temporal_scope = "Periodic Analysis"
else:
temporal_scope = "All Available Data"
# Business context
if any(word in query_lower for word in ['workflow', 'process', 'pipeline']):
business_context = "Process Flow Context"
elif any(word in query_lower for word in ['performance', 'efficiency', 'quality']):
business_context = "Performance Metrics"
elif any(word in query_lower for word in ['statistics', 'statistical', 'descriptive']):
business_context = "Statistical Analysis"
else:
business_context = "Operational Overview"
return {
'primary_focus': primary_focus,
'expected_data_types': data_types,
'temporal_scope': temporal_scope,
'business_context': business_context
}
def _generate_with_llm(self, prompt: str, model: str = 'claude-sonnet-4-5-20250929') -> str:
"""Generate SQL using the statistical agent's LLM"""
try:
# Add pattern-specific guidance for Poulpharm
enhanced_prompt = prompt
if self.is_poulpharm:
pattern_hints = self._get_poulpharm_pattern_hints(prompt)
if pattern_hints:
enhanced_prompt += f"\n\nSPECIFIC HINTS FOR THIS QUERY:\n{pattern_hints}"
logger.debug(f"Sending prompt to LLM (length: {len(enhanced_prompt)} chars)")
# Use specified model for SQL generation
response = self.statistical_agent.query_llm(enhanced_prompt, model=model, max_tokens=2000)
logger.debug(f"Received Claude response (length: {len(response)} chars)")
return response
except Exception as e:
logger.error(f"Error generating SQL with LLM: {str(e)}")
raise
def _get_poulpharm_pattern_hints(self, prompt: str) -> str:
"""Get Poulpharm-specific hints based on detected patterns"""
hints = []
prompt_lower = prompt.lower()
for pattern_name, pattern_info in self.common_patterns.items():
if any(keyword in prompt_lower for keyword in pattern_info['keywords']):
if pattern_name == 'recent_requests':
hints.append("- Start with Requests table and filter by DateCreated for recent data")
elif pattern_name == 'bacteriology':
hints.append("- Include tblBactResult for bacterial isolates and tblAntibiogram for sensitivity")
hints.append("- Join through Samples: Requests -> Samples -> tblBactResult")
elif pattern_name == 'customer_analysis':
hints.append("- Use Companies table for customer details, sCustomer field in Requests for quick lookup")
elif pattern_name == 'sample_workflow':
hints.append("- Follow workflow: Requests -> Samples -> SampleAnalyseGroups -> AnalysisGroups")
return "\n".join(hints)
def _parse_llm_response(self, response: str) -> Tuple[str, str]:
"""Parse LLM response to extract SQL query and explanation"""
try:
import re
logger.debug(f"Parsing LLM response: {response[:200]}...")
# Find explanation first
explanation = "Generated SQL query"
explanation_patterns = [
r'-- Explanation: (.+)',
r'Explanation: (.+)',
r'(?:This query|The query|SQL) (.+?)(?:\n|$)',
r'```sql\s*--\s*(.+?)(?:\n|$)'
]
for pattern in explanation_patterns:
explanation_match = re.search(pattern, response, re.IGNORECASE)
if explanation_match:
explanation = explanation_match.group(1).strip()
break
# Try multiple approaches to extract SQL
sql_query = None
# Method 1: Look for SQL code block with various formats
sql_patterns = [
r'```sql\s*(.*?)```',
r'```\s*(SELECT.*?)```',
r'```tsql\s*(.*?)```',
r'```\s*(.*?)```'
]
for pattern in sql_patterns:
sql_match = re.search(pattern, response, re.DOTALL | re.IGNORECASE)
if sql_match:
sql_query = sql_match.group(1).strip()
# Clean up common artifacts
sql_query = re.sub(r'^-- Explanation: .+\n', '', sql_query, flags=re.MULTILINE)
sql_query = re.sub(r'^\s*--.*?\n', '', sql_query, flags=re.MULTILINE)
break
# Method 2: Look for SELECT statements without code blocks
if not sql_query:
select_match = re.search(r'(SELECT\s+(?:TOP\s+\d+\s+)?.*?)(?:\n\n|\Z)', response, re.DOTALL | re.IGNORECASE)
if select_match:
sql_query = select_match.group(1).strip()
# Method 3: Extract everything that looks like SQL (fallback)
if not sql_query:
lines = response.split('\n')
sql_lines = []
in_sql = False
for line in lines:
line_upper = line.upper().strip()
if any(keyword in line_upper for keyword in ['SELECT', 'WITH', 'INSERT', 'UPDATE', 'DELETE']):
in_sql = True
sql_lines.append(line)
elif in_sql:
# Continue collecting SQL lines
if line.strip() == '' and len(sql_lines) > 0:
# Empty line might end SQL block
if sql_lines[-1].strip().endswith(';') or any(word in sql_lines[-1].upper() for word in ['ORDER BY', 'GROUP BY']):
break
sql_lines.append(line)
# Stop if we've collected too much or hit obvious end
if len(sql_lines) > 50 or line.strip().endswith(';'):
break
if sql_lines:
sql_query = '\n'.join(sql_lines).strip()
# Final validation and cleanup
if sql_query:
# Remove leading/trailing whitespace and ensure it looks like SQL
sql_query = sql_query.strip()
if not any(keyword in sql_query.upper() for keyword in ['SELECT', 'INSERT', 'UPDATE', 'DELETE', 'WITH']):
sql_query = None
if not sql_query:
logger.warning("Could not extract SQL query from LLM response, using full response")
# Log part of the response for debugging
logger.debug(f"Full LLM response: {response}")
raise ValueError("Could not extract SQL query from LLM response")
logger.debug(f"Extracted SQL query: {sql_query[:100]}...")
return sql_query, explanation
except Exception as e:
logger.error(f"Error parsing LLM response: {str(e)}")
logger.debug(f"Response that failed to parse: {response}")
# Enhanced fallback - try to find any SQL-like content
if 'SELECT' in response.upper():
# Extract everything from first SELECT to end or natural break
import re
fallback_match = re.search(r'(SELECT.*)', response, re.DOTALL | re.IGNORECASE)
if fallback_match:
fallback_sql = fallback_match.group(1).strip()
# Clean up obvious non-SQL content at the end
fallback_sql = re.sub(r'\n[A-Za-z][^;]*$', '', fallback_sql)
return fallback_sql, "Extracted SQL from response (fallback)"
# Last resort: return the full response
return response.strip(), "Generated SQL query (unparsed)"
def _generate_basic_sql(self, user_query: str, max_rows: Optional[int]) -> Tuple[str, str]:
"""Fallback basic SQL generation without LLM using structured approach"""
intent = self._detect_query_intent(user_query)
logger.info(f"Using fallback SQL generation with intent: {intent['pattern']}")
if intent['pattern'] == 'customer_stats':
return self._generate_customer_stats_sql(max_rows)
elif intent['pattern'] == 'bacteriology':
return self._generate_bacteriology_sql(max_rows)
elif intent['pattern'] == 'sample_workflow':
return self._generate_sample_workflow_sql(max_rows)
else:
return self._generate_generic_sql(max_rows)
def _generate_customer_stats_sql(self, max_rows: Optional[int], simple_version: bool = False) -> Tuple[str, str]:
"""Generate customer statistics SQL using exact schema - optimized to avoid empty results"""
if simple_version:
# Use simple version without Companies table dependency
return self._generate_simple_customer_stats_sql(max_rows)
# Full version with company information
sql_parts = [
f"SELECT {self._format_top_clause(max_rows)}",
" COALESCE(r.sCustomer, 'Customer ID: ' + CAST(r.Customer_ID AS VARCHAR), 'Unknown Customer') as CustomerName,",
" COALESCE(c.Name, 'No Company Info') as CompanyName,",
" COALESCE(c.Stad, 'Unknown') as City,",
" COALESCE(c.Land, 'Unknown') as Country,",
" COUNT(r.Id) as TotalRequests,",
" COUNT(DISTINCT s.Id) as TotalSamples,",
" MIN(r.DateCreated) as FirstRequestDate,",
" MAX(r.DateCreated) as LastRequestDate,",
" DATEDIFF(day, MIN(r.DateCreated), MAX(r.DateCreated)) as DaysBetweenFirstLast",
"FROM Requests r",
"LEFT JOIN Companies c ON r.Customer_ID = c.Id",
"LEFT JOIN Samples s ON s.Sample_Request = r.Id",
"WHERE r.DateCreated >= DATEADD(month, -3, GETDATE())",
"GROUP BY r.sCustomer, r.Customer_ID, c.Name, c.Stad, c.Land",
"ORDER BY TotalRequests DESC, LastRequestDate DESC"
]
explanation = "Customer request statistics for the last 3 months - uses LEFT JOINs to avoid excluding requests without company info"
return "\n".join(sql_parts), explanation
def _generate_simple_customer_stats_sql(self, max_rows: Optional[int]) -> Tuple[str, str]:
"""Generate simple customer statistics SQL without Companies table dependency"""
sql_parts = [
f"SELECT {self._format_top_clause(max_rows)}",
" COALESCE(r.sCustomer, 'Unknown Customer') as CustomerName,",
" COUNT(r.Id) as TotalRequests,",
" COUNT(DISTINCT s.Id) as TotalSamples,",
" MIN(r.DateCreated) as FirstRequestDate,",
" MAX(r.DateCreated) as LastRequestDate",
"FROM Requests r",
"LEFT JOIN Samples s ON s.Sample_Request = r.Id",
"WHERE r.DateCreated >= DATEADD(month, -3, GETDATE())",
"GROUP BY r.sCustomer",
"ORDER BY TotalRequests DESC, LastRequestDate DESC"
]
explanation = "Simple customer request statistics - no dependency on Companies table"
return "\n".join(sql_parts), explanation
def generate_diagnostic_query(self, original_query: str, intent: Dict[str, Any]) -> Tuple[str, str]:
"""Generate diagnostic query to understand why main query returned no results"""
if intent['pattern'] == 'customer_stats':
return self._generate_customer_stats_diagnostic()
elif intent['pattern'] == 'bacteriology':
return self._generate_bacteriology_diagnostic()
else:
return self._generate_generic_diagnostic()
def _generate_customer_stats_diagnostic(self) -> Tuple[str, str]:
"""Generate diagnostic query for customer statistics"""
sql_parts = [
"SELECT",
" 'Total Requests (All Time)' as CheckType,",
" COUNT(*) as Count,",
" MIN(DateCreated) as EarliestDate,",
" MAX(DateCreated) as LatestDate",
"FROM Requests",
"UNION ALL",
"SELECT",
" 'Requests Last 12 Months' as CheckType,",
" COUNT(*) as Count,",
" MIN(DateCreated) as EarliestDate,",
" MAX(DateCreated) as LatestDate",
"FROM Requests",
"WHERE DateCreated >= DATEADD(month, -12, GETDATE())",
"UNION ALL",
"SELECT",
" 'Requests Last 6 Months' as CheckType,",
" COUNT(*) as Count,",
" MIN(DateCreated) as EarliestDate,",
" MAX(DateCreated) as LatestDate",
"FROM Requests",
"WHERE DateCreated >= DATEADD(month, -6, GETDATE())",
"UNION ALL",
"SELECT",
" 'Requests Last 3 Months' as CheckType,",
" COUNT(*) as Count,",
" MIN(DateCreated) as EarliestDate,",
" MAX(DateCreated) as LatestDate",
"FROM Requests",
"WHERE DateCreated >= DATEADD(month, -3, GETDATE())",
"UNION ALL",
"SELECT",
" 'Sample Recent Requests' as CheckType,",
" COUNT(*) as Count,",
" MIN(DateCreated) as EarliestDate,",
" MAX(DateCreated) as LatestDate",
"FROM (",
" SELECT TOP 5 DateCreated, sCustomer, RequestNr FROM Requests",
" WHERE DateCreated >= DATEADD(month, -3, GETDATE()) ORDER BY DateCreated DESC",
") sample_data",
"ORDER BY CheckType"
]
explanation = "Diagnostic query to check request counts across different time periods and show sample data"
return "\n".join(sql_parts), explanation
def _generate_bacteriology_diagnostic(self) -> Tuple[str, str]:
"""Generate diagnostic query for bacteriology"""
sql_parts = [
"SELECT",
" 'Bacteriology Results (All Time)' as CheckType,",
" COUNT(*) as Count,",
" MIN(Bar_DatumInzet) as EarliestDate,",
" MAX(Bar_DatumInzet) as LatestDate",
"FROM tblBactResult",
"WHERE Bar_DatumInzet IS NOT NULL"
]
explanation = "Diagnostic query to check bacteriology data availability"
return "\n".join(sql_parts), explanation
def _generate_generic_diagnostic(self) -> Tuple[str, str]:
"""Generate generic diagnostic query"""
sql_parts = [
"SELECT",
" 'Total Requests' as CheckType,",
" COUNT(*) as Count,",
" MIN(DateCreated) as EarliestDate,",
" MAX(DateCreated) as LatestDate",
"FROM Requests"
]
explanation = "Generic diagnostic query to check basic data availability"
return "\n".join(sql_parts), explanation
def _generate_bacteriology_sql(self, max_rows: Optional[int]) -> Tuple[str, str]:
"""Generate bacteriology SQL using exact schema"""
sql_parts = [
f"SELECT {self._format_top_clause(max_rows)}",
" r.RequestNr,",
" r.DateCreated,",
" r.sCustomer,",
" s.SampleNr,",
" s.Identification,",
" g.Name_en as GermName,",
" brl.Brl_KiemenDescr as GermDescription,",
" a.Name_en as AntibioticName,",
" brla.Brla_AantalMM as DiameterMM,",
" pn.Name_en as SensitivityResult",
"FROM Requests r",
"INNER JOIN Samples s ON s.Sample_Request = r.Id",
"INNER JOIN tblBactResult br ON br.SampleID = s.Id",
"LEFT JOIN tblBactResultLine brl ON br.BactResultID = brl.BactResultID",
"LEFT JOIN tblBactResultLineAnalyse brla ON brl.BactResultLineID = brla.BactResultLineID",
"LEFT JOIN Germs g ON brl.KiemID = g.GermId",
"LEFT JOIN Analyses a ON brla.AnalyseID = a.Id",
"LEFT JOIN PosNegs pn ON brla.PosNegsID = pn.Id",
"WHERE r.DateCreated >= DATEADD(month, -6, GETDATE())",
" AND (g.Name_en IS NOT NULL OR brl.Brl_KiemenDescr IS NOT NULL)",
"ORDER BY r.DateCreated DESC"
]
explanation = "Bacteriology results with antibiogram sensitivity data using correct table relationships"
return "\n".join(sql_parts), explanation
def _generate_sample_workflow_sql(self, max_rows: Optional[int]) -> Tuple[str, str]:
"""Generate sample workflow SQL using exact schema"""
sql_parts = [
f"SELECT {self._format_top_clause(max_rows)}",
" r.RequestNr,",
" r.DateCreated,",
" r.sCustomer,",
" s.SampleNr,",
" s.Identification,",
" s.DateSampling,",
" st.Name_en as SampleType,",
" ag.Name_en as AnalysisGroup,",
" sag.IsAnalyseDone,",
" sag.IsAnalyseExported",
"FROM Requests r",
"INNER JOIN Samples s ON s.Sample_Request = r.Id",
"LEFT JOIN SampleTypes st ON st.Id = s.Sample_SampleType",
"LEFT JOIN SampleAnalyseGroups sag ON sag.SampleAnalyseGroup_Sample = s.Id",
"LEFT JOIN AnalysisGroups ag ON ag.Id = sag.RequestAnalyseGroup_AnalysisGroup",
"WHERE r.DateCreated >= DATEADD(month, -3, GETDATE())",
"ORDER BY r.DateCreated DESC"
]
explanation = "Sample workflow status with analysis progress"
return "\n".join(sql_parts), explanation
def _generate_generic_sql(self, max_rows: Optional[int]) -> Tuple[str, str]:
"""Generate generic SQL for unknown query types"""
sql_parts = [
f"SELECT {self._format_top_clause(max_rows)}",
" r.RequestNr,",
" r.DateCreated,",
" r.sCustomer,",
" s.SampleNr,",
" s.Identification,",
" COUNT(sag.Id) as AnalysisGroupCount",
"FROM Requests r",
"LEFT JOIN Samples s ON s.Sample_Request = r.Id",
"LEFT JOIN SampleAnalyseGroups sag ON sag.SampleAnalyseGroup_Sample = s.Id",
"WHERE r.DateCreated >= DATEADD(month, -3, GETDATE())",
"GROUP BY r.RequestNr, r.DateCreated, r.sCustomer, s.SampleNr, s.Identification",
"ORDER BY r.DateCreated DESC"
]
explanation = "Generic laboratory request and sample information"
return "\n".join(sql_parts), explanation
Parameters
| Name | Type | Default | Kind |
|---|---|---|---|
bases |
- | - |
Parameter Details
bases: Parameter of type
Return Value
Returns unspecified type
Class Interface
Methods
__init__(self, schema, statistical_agent)
Purpose: Internal method: init
Parameters:
schema: Type: DatabaseSchemastatistical_agent: Parameter
Returns: None
_load_dynamic_schema_info(self)
Purpose: Load comprehensive schema information - now uses enhanced schema data
Returns: None
_load_from_comprehensive_schema(self)
Purpose: Load schema information from comprehensive schema data
Returns: None
_infer_business_purpose_from_category(self, table_name, category) -> str
Purpose: Infer business purpose from table category and name
Parameters:
table_name: Type: strcategory: Type: str
Returns: Returns str
_load_table_descriptions(self)
Purpose: Load table descriptions from comprehensive database schema
Returns: None
_infer_business_purpose_from_name(self, table_name) -> str
Purpose: Infer business purpose from table name only
Parameters:
table_name: Type: str
Returns: Returns str
_infer_business_purpose(self, table_name, description) -> str
Purpose: Infer business purpose from table name and description
Parameters:
table_name: Type: strdescription: Type: str
Returns: Returns str
_load_detailed_schema_from_markdown(self)
Purpose: Load detailed schema from markdown reference files
Returns: None
_parse_markdown_schema(self, schema_file_path)
Purpose: Parse markdown schema file to extract table and column information
Parameters:
schema_file_path: Parameter
Returns: None
_enhance_with_schema_categories(self)
Purpose: Enhance table information with categories from main schema
Returns: None
_extract_table_relationships(self)
Purpose: Extract table relationships from schema information
Returns: None
_find_related_tables(self, table_name) -> List[str]
Purpose: Find tables that are likely related to the given table
Parameters:
table_name: Type: str
Returns: Returns List[str]
_initialize_common_patterns(self)
Purpose: Initialize common query patterns for different types of analysis
Returns: None
_load_detailed_schema(self)
Purpose: Load detailed schema information from database reference files if available
Returns: None
_parse_poulpharm_detailed_schema(self)
Purpose: Parse detailed schema from Poulpharm LIMS database reference files
Returns: None
_parse_markdown_schema(self, schema_file_path)
Purpose: Parse table and column information from markdown schema file
Parameters:
schema_file_path: Parameter
Returns: None
_load_detailed_database_schema(self)
Purpose: Load detailed database schema from JSON files
Returns: None
_setup_fallback_poulpharm_schema(self)
Purpose: Setup fallback - this should not be used when dynamic schema is available
Returns: None
get_best_name_column(self, table_name) -> str
Purpose: Get the best available name column for a table based on dynamic schema
Parameters:
table_name: Type: str
Returns: Returns str
validate_column_exists(self, table_name, column_name) -> bool
Purpose: Check if a column actually exists in the dynamic schema
Parameters:
table_name: Type: strcolumn_name: Type: str
Returns: Returns bool
get_safe_column_reference(self, table_name, preferred_column, fallback_column) -> str
Purpose: Get a safe column reference, falling back if the preferred column doesn't exist
Parameters:
table_name: Type: strpreferred_column: Type: strfallback_column: Type: str
Returns: Returns str
_setup_poulpharm_optimizations(self)
Purpose: Setup Poulpharm LIMS specific query optimizations
Returns: None
_detect_query_intent(self, user_query) -> Dict[str, Any]
Purpose: Detect user query intent and suggest appropriate tables/approach
Parameters:
user_query: Type: str
Returns: Returns Dict[str, Any]
_format_top_clause(self, max_rows) -> str
Purpose: Helper method to format TOP clause for SQL queries
Parameters:
max_rows: Parameter
Returns: Returns str
generate_sql_query(self, user_query, max_rows, model) -> Tuple[str, Dict[str, Any]]
Purpose: Generate SQL query based on user analysis request Returns: (sql_query, metadata)
Parameters:
user_query: Type: strmax_rows: Type: Optional[int]model: Type: str
Returns: Returns Tuple[str, Dict[str, Any]]
_prepare_database_context(self) -> str
Purpose: Prepare comprehensive database schema context using dynamic schema information
Returns: Returns str
_validate_generated_sql(self, sql_query) -> Tuple[bool, List[str]]
Purpose: Validate generated SQL against known schema
Parameters:
sql_query: Type: str
Returns: Returns Tuple[bool, List[str]]
_create_sql_generation_prompt(self, user_query, db_context, max_rows, intent) -> str
Purpose: Create prompt for SQL generation with intent guidance
Parameters:
user_query: Type: strdb_context: Type: strmax_rows: Type: intintent: Type: Dict[str, Any]
Returns: Returns str
_analyze_user_query(self, user_query) -> Dict[str, str]
Purpose: Analyze user query to provide adaptive guidance
Parameters:
user_query: Type: str
Returns: Returns Dict[str, str]
_generate_with_llm(self, prompt, model) -> str
Purpose: Generate SQL using the statistical agent's LLM
Parameters:
prompt: Type: strmodel: Type: str
Returns: Returns str
_get_poulpharm_pattern_hints(self, prompt) -> str
Purpose: Get Poulpharm-specific hints based on detected patterns
Parameters:
prompt: Type: str
Returns: Returns str
_parse_llm_response(self, response) -> Tuple[str, str]
Purpose: Parse LLM response to extract SQL query and explanation
Parameters:
response: Type: str
Returns: Returns Tuple[str, str]
_generate_basic_sql(self, user_query, max_rows) -> Tuple[str, str]
Purpose: Fallback basic SQL generation without LLM using structured approach
Parameters:
user_query: Type: strmax_rows: Type: Optional[int]
Returns: Returns Tuple[str, str]
_generate_customer_stats_sql(self, max_rows, simple_version) -> Tuple[str, str]
Purpose: Generate customer statistics SQL using exact schema - optimized to avoid empty results
Parameters:
max_rows: Type: Optional[int]simple_version: Type: bool
Returns: Returns Tuple[str, str]
_generate_simple_customer_stats_sql(self, max_rows) -> Tuple[str, str]
Purpose: Generate simple customer statistics SQL without Companies table dependency
Parameters:
max_rows: Type: Optional[int]
Returns: Returns Tuple[str, str]
generate_diagnostic_query(self, original_query, intent) -> Tuple[str, str]
Purpose: Generate diagnostic query to understand why main query returned no results
Parameters:
original_query: Type: strintent: Type: Dict[str, Any]
Returns: Returns Tuple[str, str]
_generate_customer_stats_diagnostic(self) -> Tuple[str, str]
Purpose: Generate diagnostic query for customer statistics
Returns: Returns Tuple[str, str]
_generate_bacteriology_diagnostic(self) -> Tuple[str, str]
Purpose: Generate diagnostic query for bacteriology
Returns: Returns Tuple[str, str]
_generate_generic_diagnostic(self) -> Tuple[str, str]
Purpose: Generate generic diagnostic query
Returns: Returns Tuple[str, str]
_generate_bacteriology_sql(self, max_rows) -> Tuple[str, str]
Purpose: Generate bacteriology SQL using exact schema
Parameters:
max_rows: Type: Optional[int]
Returns: Returns Tuple[str, str]
_generate_sample_workflow_sql(self, max_rows) -> Tuple[str, str]
Purpose: Generate sample workflow SQL using exact schema
Parameters:
max_rows: Type: Optional[int]
Returns: Returns Tuple[str, str]
_generate_generic_sql(self, max_rows) -> Tuple[str, str]
Purpose: Generate generic SQL for unknown query types
Parameters:
max_rows: Type: Optional[int]
Returns: Returns Tuple[str, str]
Required Imports
import json
import logging
import os
from typing import Dict
from typing import List
Usage Example
# Example usage:
# result = SQLQueryGenerator(bases)
Tags
Similar Components
AI-powered semantic similarity - components with related functionality:
-
class SQLQueryGenerator_v1 97.8% similar
-
class SqlGenerationResult 60.3% similar
-
function main_v61 56.3% similar
-
class QueryIteration 52.3% similar
-
class DatabaseSchema 50.7% similar