šŸ” Code Extractor

class SQLQueryGenerator

Maturity: 26

Generates SQL queries based on user requests and database schema

File:
/tf/active/vicechatdev/full_smartstat/sql_query_generator.py
Lines:
347 - 1909
Complexity:
moderate

Purpose

Generates SQL queries based on user requests and database schema

Source Code

class SQLQueryGenerator:
    """Generates SQL queries based on user requests and database schema"""
    
    def __init__(self, schema: DatabaseSchema, statistical_agent=None):
        self.schema = schema
        self.statistical_agent = statistical_agent
        self.is_poulpharm = schema.database_name.lower() == 'poulpharm'
        self._load_dynamic_schema_info()
        self._initialize_common_patterns()
        self._setup_poulpharm_optimizations() if self.is_poulpharm else None
        
    def _load_dynamic_schema_info(self):
        """Load comprehensive schema information - now uses enhanced schema data"""
        self.detailed_tables = {}
        self.table_relationships = {}
        self.business_context = {}
        
        # Use comprehensive schema data if available (new approach)
        if hasattr(self.schema, 'system_architecture') and self.schema.system_architecture:
            logger.info(f"Using comprehensive schema with {len(self.schema.complete_table_list)} tables")
            self._load_from_comprehensive_schema()
        else:
            # Fallback to legacy loading methods
            logger.info("Using legacy schema loading methods")
            # Load from table_list.json if available
            self._load_table_descriptions()
            
            # Load from detailed markdown files if available
            self._load_detailed_schema_from_markdown()
            
            # Extract relationships from schema data
            self._extract_table_relationships()
    
    def _load_from_comprehensive_schema(self):
        """Load schema information from comprehensive schema data"""
        try:
            # Load table information from comprehensive schema
            for table_name in self.schema.complete_table_list:
                if table_name not in self.detailed_tables:
                    self.detailed_tables[table_name] = {}
                
                # Add table metadata if available in categories
                for category, tables in self.schema.table_categories.items():
                    if table_name in tables:
                        self.detailed_tables[table_name]['category'] = category
                        self.detailed_tables[table_name]['business_purpose'] = self._infer_business_purpose_from_category(table_name, category)
                        break
            
            # Load relationships from comprehensive schema
            if isinstance(self.schema.key_relationships, list):
                for relationship in self.schema.key_relationships:
                    if isinstance(relationship, dict):
                        source_table = relationship.get('source_table', '')
                        target_table = relationship.get('target_table', '')
                        if source_table and target_table:
                            if source_table not in self.table_relationships:
                                self.table_relationships[source_table] = []
                            self.table_relationships[source_table].append({
                                'target_table': target_table,
                                'relationship_type': relationship.get('relationship_type', 'Unknown'),
                                'source_columns': relationship.get('source_columns', []),
                                'target_columns': relationship.get('target_columns', [])
                            })
            
            logger.info(f"Loaded comprehensive schema: {len(self.detailed_tables)} tables, {len(self.schema.key_relationships) if isinstance(self.schema.key_relationships, list) else 0} relationships")
            
        except Exception as e:
            logger.error(f"Error loading comprehensive schema: {str(e)}")
            # Fallback to legacy methods if comprehensive loading fails
            self._load_table_descriptions()
            self._load_detailed_schema_from_markdown()
            self._extract_table_relationships()
    
    def _infer_business_purpose_from_category(self, table_name: str, category: str) -> str:
        """Infer business purpose from table category and name"""
        table_lower = table_name.lower()
        
        if category == 'results':
            if 'serologie' in table_lower:
                return 'Serology test results and measurements'
            elif 'bact' in table_lower:
                return 'Bacteriology test results and analysis'
            elif 'pcr' in table_lower:
                return 'PCR test results and genetic analysis'
            elif 'result' in table_lower:
                return 'Laboratory test results and measurements'
            else:
                return 'Analysis results and test outcomes'
        elif category == 'lookup':
            return 'Reference data and lookup values'
        elif category == 'core_data':
            if 'sample' in table_lower:
                return 'Sample tracking and management'
            elif 'analysis' in table_lower:
                return 'Analysis definitions and configurations'
            else:
                return 'Core business data and entities'
        elif category == 'system':
            return 'System administration and user management'
        elif category == 'audit':
            return 'Audit trail and system logging'
        else:
            return 'Database table for business operations'
    
    def _load_table_descriptions(self):
        """Load table descriptions from comprehensive database schema"""
        try:
            # Use new comprehensive schema files instead of deprecated table_list.json
            schema_path = Path(__file__).parent / "database_schema_20251003_120434.json"
            if schema_path.exists():
                with open(schema_path, 'r', encoding='utf-8') as f:
                    schema_data = json.load(f)
                    
                # Extract table information from comprehensive schema
                if 'columns_by_table' in schema_data:
                    for table_name, table_info in schema_data['columns_by_table'].items():
                        if table_name not in self.detailed_tables:
                            self.detailed_tables[table_name] = {}
                        
                        # Store actual column information - columns_by_table[table_name] is a list
                        if isinstance(table_info, list):
                            self.detailed_tables[table_name]['columns'] = table_info
                            self.detailed_tables[table_name]['column_count'] = len(table_info)
                        
                        # Infer business purpose from table name
                        self.detailed_tables[table_name]['business_purpose'] = self._infer_business_purpose_from_name(table_name)
                        
            logger.info(f"Loaded comprehensive schema for {len(self.detailed_tables)} tables from database_schema_20251003_120434.json")
        except Exception as e:
            logger.warning(f"Could not load comprehensive schema: {str(e)}")
    
    def _infer_business_purpose_from_name(self, table_name: str) -> str:
        """Infer business purpose from table name only"""
        # Categorize tables by their business function
        if any(keyword in table_name.lower() for keyword in ['request', 'order']):
            return 'workflow_initiation'
        elif any(keyword in table_name.lower() for keyword in ['sample', 'specimen']):
            return 'sample_management'
        elif any(keyword in table_name.lower() for keyword in ['result', 'analysis', 'test']):
            return 'testing_results'
        elif any(keyword in table_name.lower() for keyword in ['company', 'customer', 'client']):
            return 'customer_management'
        elif any(keyword in table_name.lower() for keyword in ['bact', 'pcr', 'serolog']):
            return 'specialized_testing'
        else:
            return 'general_operations'
    
    def _infer_business_purpose(self, table_name: str, description: str) -> str:
        """Infer business purpose from table name and description"""
        # Categorize tables by their business function
        if any(keyword in table_name.lower() for keyword in ['request', 'order']):
            return 'workflow_initiation'
        elif any(keyword in table_name.lower() for keyword in ['sample', 'specimen']):
            return 'sample_management'
        elif any(keyword in table_name.lower() for keyword in ['result', 'analysis', 'test']):
            return 'testing_results'
        elif any(keyword in table_name.lower() for keyword in ['company', 'customer', 'client']):
            return 'customer_management'
        elif any(keyword in table_name.lower() for keyword in ['bact', 'pcr', 'serolog']):
            return 'specialized_testing'
        elif any(keyword in description.lower() for keyword in ['audit', 'log', 'track']):
            return 'audit_tracking'
        else:
            return 'supporting_data'
    
    def _load_detailed_schema_from_markdown(self):
        """Load detailed schema from markdown reference files"""
        try:
            labosoft_dir = Path(__file__).parent.parent / "labosoft_SQL"
            schema_files = list(labosoft_dir.glob("poulpharm_lims_database_reference_*.md"))
            
            if schema_files:
                latest_schema = sorted(schema_files)[-1]
                logger.info(f"Loading detailed schema from {latest_schema}")
                
                # Parse the markdown file for column information
                self._parse_markdown_schema(latest_schema)
                self._enhance_with_schema_categories()
            else:
                logger.warning("No detailed schema markdown files found")
        except Exception as e:
            logger.warning(f"Could not load detailed schema from markdown: {str(e)}")
    
    def _parse_markdown_schema(self, schema_file_path):
        """Parse markdown schema file to extract table and column information"""
        try:
            with open(schema_file_path, 'r', encoding='utf-8') as f:
                content = f.read()
            
            import re
            
            # Find table sections (#### TableName)
            table_pattern = r'#### (\w+)\s*\n\s*\|[^|]*\|[^|]*\|\s*\n[^#]*?\*\*Columns:\*\*\s*\n\s*\|[^|]*\|[^|]*\|[^|]*\|[^|]*\|[^|]*\|\s*\n[^|]*\n((?:\|[^|]*\|[^|]*\|[^|]*\|[^|]*\|[^|]*\|\s*\n)*)'
            
            # Extract key tables and their columns
            key_tables = ['Requests', 'Samples', 'Results', 'Analyses', 'AnalysisGroups', 'SampleAnalyseGroups', 'Companies']
            
            for table_name in key_tables:
                if table_name not in self.detailed_tables:
                    self.detailed_tables[table_name] = {}
                
                # Look for table-specific column patterns in the markdown
                table_section_pattern = f'#### {table_name}.*?\\*\\*Columns:\\*\\*(.*?)(?=####|$)'
                table_match = re.search(table_section_pattern, content, re.DOTALL | re.IGNORECASE)
                
                if table_match:
                    columns_section = table_match.group(1)
                    # Extract column names from table rows
                    column_pattern = r'\|\s*`([^`]+)`\s*\|'
                    columns = re.findall(column_pattern, columns_section)
                    
                    if columns:
                        self.detailed_tables[table_name]['columns'] = columns
                        logger.info(f"Extracted {len(columns)} columns for {table_name}")
                    
        except Exception as e:
            logger.warning(f"Could not parse markdown schema: {str(e)}")
    
    def _enhance_with_schema_categories(self):
        """Enhance table information with categories from main schema"""
        try:
            # Check if schema has table_categories attribute
            if hasattr(self.schema, 'table_categories') and self.schema.table_categories:
                table_categories = self.schema.table_categories
                if isinstance(table_categories, dict):
                    for category_name, category_info in table_categories.items():
                        if isinstance(category_info, dict) and 'tables' in category_info:
                            for table_entry in category_info['tables']:
                                # Handle both string names and dict entries
                                if isinstance(table_entry, str):
                                    table_name = table_entry
                                elif isinstance(table_entry, dict) and 'name' in table_entry:
                                    table_name = table_entry['name']
                                else:
                                    continue
                                
                                if table_name not in self.detailed_tables:
                                    self.detailed_tables[table_name] = {}
                                self.detailed_tables[table_name]['category'] = category_name
                                self.detailed_tables[table_name]['category_context'] = category_info.get('description', '')
                                
                                # If it's a dict entry, extract additional info
                                if isinstance(table_entry, dict):
                                    if 'description' in table_entry:
                                        self.detailed_tables[table_name]['description'] = table_entry['description']
                                    if 'schema' in table_entry:
                                        self.detailed_tables[table_name]['schema'] = table_entry['schema']
        except Exception as e:
            logger.warning(f"Could not enhance with schema categories: {str(e)}")
    
    def _extract_table_relationships(self):
        """Extract table relationships from schema information"""
        # Initialize with empty dict and safely copy key_relationships if they exist
        self.table_relationships = {}
        if hasattr(self.schema, 'key_relationships') and self.schema.key_relationships:
            try:
                self.table_relationships = self.schema.key_relationships.copy()
            except Exception as e:
                logger.warning(f"Could not copy key_relationships: {str(e)}")
        
        # Infer additional relationships from foreign key patterns
        for table_name in self.schema.complete_table_list:
            # Common foreign key patterns
            potential_fks = []
            
            # ID patterns (assumes standard naming conventions)
            if table_name.endswith('s') and table_name != 'Analyses':  # Plural table
                singular = table_name[:-1]
                potential_fks.append(f"{singular}ID")
            
            # Add to relationships if not already present
            if table_name not in self.table_relationships:
                self.table_relationships[table_name] = {
                    'inferred_foreign_keys': potential_fks,
                    'related_tables': self._find_related_tables(table_name)
                }
    
    def _find_related_tables(self, table_name: str) -> List[str]:
        """Find tables that are likely related to the given table"""
        related = []
        
        # Look for tables with similar names or obvious relationships
        base_name = table_name.lower().replace('tbl', '').replace('_', '')
        
        for other_table in self.schema.complete_table_list:
            if other_table == table_name:
                continue
                
            other_base = other_table.lower().replace('tbl', '').replace('_', '')
            
            # Check for name similarities or known patterns
            if (base_name in other_base or other_base in base_name or
                any(keyword in base_name and keyword in other_base 
                    for keyword in ['result', 'analysis', 'sample', 'request'])):
                related.append(other_table)
        
        return related[:5]  # Limit to avoid too many relationships
    
    def _initialize_common_patterns(self):
        """Initialize common query patterns for different types of analysis"""
        self.common_query_patterns = {
            'customer_stats': {
                'keywords': ['customer', 'requests', 'statistics', 'stats', 'company'],
                'tables': ['Requests', 'Companies', 'Accounts', 'Samples'],
                'avoid_tables': ['tblBactResult', 'tblBactResultLine', 'tblBactResultLineAnalyse']
            },
            'bacteriology': {
                'keywords': ['bacteria', 'antibiogram', 'sensitivity', 'resistance', 'germ'],
                'tables': ['Requests', 'Samples', 'tblBactResult', 'tblBactResultLine', 'tblBactResultLineAnalyse', 'Germs'],
                'required_joins': [
                    'Samples s ON s.Sample_Request = r.Id',
                    'tblBactResult br ON br.SampleID = s.Id',
                    'tblBactResultLine brl ON br.BactResultID = brl.BactResultID',
                    'tblBactResultLineAnalyse brla ON brl.BactResultLineID = brla.BactResultLineID'
                ]
            },
            'sample_workflow': {
                'keywords': ['sample', 'workflow', 'analysis', 'testing'],
                'tables': ['Requests', 'Samples', 'SampleAnalyseGroups', 'AnalysisGroups'],
                'avoid_tables': ['tblBactResult', 'tblBactResultLine']
            },
            'analysis_results': {
                'keywords': ['analysis', 'results', 'testing', 'laboratory', 'distribution', 'types', 'over time'],
                'tables': ['Requests', 'Samples', 'SampleAnalyseGroups', 'Results', 'Analyses', 'AnalysisGroups'],
                'required_joins': [
                    'Samples s ON s.Sample_Request = r.Id',
                    'SampleAnalyseGroups sag ON sag.SampleAnalyseGroup_Sample = s.Id',
                    'Results res ON res.Result_Sample = s.Id',
                    'Analyses a ON a.Id = res.Result_Analysis',
                    'AnalysisGroups ag ON ag.Id = sag.RequestAnalyseGroup_AnalysisGroup'
                ]
            },
            'descriptive_statistics': {
                'keywords': ['descriptive', 'statistics', 'distribution', 'frequency', 'count', 'temporal', 'time'],
                'tables': ['Requests', 'Samples', 'SampleAnalyseGroups', 'Results', 'Analyses', 'AnalysisGroups'],
                'required_joins': [
                    'Samples s ON s.Sample_Request = r.Id',
                    'SampleAnalyseGroups sag ON sag.SampleAnalyseGroup_Sample = s.Id',
                    'Results res ON res.Result_Sample = s.Id',
                    'Analyses a ON a.Id = res.Result_Analysis',
                    'AnalysisGroups ag ON ag.Id = sag.RequestAnalyseGroup_AnalysisGroup'
                ]
            }
        }
        
    def _load_detailed_schema(self):
        """Load detailed schema information from database reference files if available"""
        self.detailed_tables = {}
        self.table_relationships = {}
        
        if self.is_poulpharm:
            # Try to load from the detailed markdown files
            try:
                self._parse_poulpharm_detailed_schema()
            except Exception as e:
                logger.warning(f"Could not load detailed schema: {str(e)}")
                self._setup_fallback_poulpharm_schema()
        
    def _parse_poulpharm_detailed_schema(self):
        """Parse detailed schema from Poulpharm LIMS database reference files"""
        import os
        from pathlib import Path
        
        # Look for detailed schema files in the labosoft_SQL directory
        labosoft_dir = Path(__file__).parent.parent / "labosoft_SQL"
        schema_files = list(labosoft_dir.glob("poulpharm_lims_database_reference_*.md"))
        
        if schema_files:
            # Use the most recent schema file (by filename)
            latest_schema = sorted(schema_files)[-1]
            logger.info(f"Loading detailed schema from {latest_schema}")
            self._parse_markdown_schema(latest_schema)
        else:
            raise FileNotFoundError("No detailed schema files found")
    
    def _parse_markdown_schema(self, schema_file_path):
        """Parse table and column information from markdown schema file"""
        # Try to load detailed schema first, fall back to basic schema if needed
        self._load_detailed_database_schema()
    
    def _load_detailed_database_schema(self):
        """Load detailed database schema from JSON files"""
        schema_file = 'database_schema_20251003_120434.json'
        relationships_file = 'database_relationships_20251003_130415.json'
        
        try:
            # Load detailed schema
            if os.path.exists(schema_file):
                with open(schema_file, 'r', encoding='utf-8') as f:
                    schema_data = json.load(f)
                    
                # Extract column information by table
                if 'columns_by_table' in schema_data:
                    for table_name, columns in schema_data['columns_by_table'].items():
                        if table_name not in self.detailed_tables:
                            self.detailed_tables[table_name] = {}
                        
                        # Extract column names and their types
                        column_names = []
                        column_details = {}
                        for col in columns:
                            col_name = col['COLUMN_NAME']
                            column_names.append(col_name)
                            column_details[col_name] = {
                                'data_type': col['DATA_TYPE'],
                                'is_nullable': col['IS_NULLABLE'],
                                'is_primary_key': col.get('IS_PRIMARY_KEY', 'NO'),
                                'is_foreign_key': col.get('IS_FOREIGN_KEY', 'NO'),
                                'referenced_table': col.get('REFERENCED_TABLE_NAME'),
                                'referenced_column': col.get('REFERENCED_COLUMN_NAME')
                            }
                        
                        self.detailed_tables[table_name]['columns'] = column_names
                        self.detailed_tables[table_name]['column_details'] = column_details
                        
                        # Infer business purpose
                        self.detailed_tables[table_name]['business_purpose'] = self._infer_business_purpose(table_name, '')
                
                logger.info(f"Loaded detailed schema for {len(self.detailed_tables)} tables")
            
            # Load relationships
            if os.path.exists(relationships_file):
                with open(relationships_file, 'r', encoding='utf-8') as f:
                    relationships_data = json.load(f)
                    
                if 'relationships' in relationships_data:
                    for rel in relationships_data['relationships']:
                        source_table = rel['source_table']
                        target_table = rel['target_table']
                        
                        if source_table not in self.table_relationships:
                            self.table_relationships[source_table] = {'related_tables': []}
                        
                        if target_table not in self.table_relationships[source_table]['related_tables']:
                            self.table_relationships[source_table]['related_tables'].append(target_table)
                
                logger.info(f"Loaded {len(relationships_data.get('relationships', []))} table relationships")
                
        except Exception as e:
            logger.warning(f"Could not load detailed schema files: {e}")
            self._setup_fallback_poulpharm_schema()

    def _setup_fallback_poulpharm_schema(self):
        """Setup fallback - this should not be used when dynamic schema is available"""
        logger.warning("Using fallback schema - dynamic schema discovery should be preferred")
        # Keep minimal fallback for emergency situations
        self.detailed_tables = {
            'Requests': {
                'primary_key': 'Id',
                'columns': {
                    'Id': 'int',
                    'RequestNr': 'nvarchar(15)',
                    'DateCreated': 'datetime',
                    'sCustomer': 'nvarchar(255)',
                    'Customer_ID': 'int',
                    'Veterinarian_ID': 'int',
                    'sVeterinarian': 'nvarchar(255)',
                    'TestCode': 'nvarchar(255)',
                    'StartNumber': 'int'
                },
                'foreign_keys': {
                    'Customer_ID': 'Companies.Id',
                    'Veterinarian_ID': 'tblDierenarts.DierenartsID'
                }
            },
            'Samples': {
                'primary_key': 'Id',
                'columns': {
                    'Id': 'int',
                    'SampleNr': 'nvarchar(50)',
                    'Identification': 'nvarchar(255)',
                    'DateCreated': 'datetime',
                    'DateSampling': 'datetime',
                    'Sample_Request': 'int',
                    'Sample_SampleType': 'int'
                },
                'foreign_keys': {
                    'Sample_Request': 'Requests.Id',
                    'Sample_SampleType': 'SampleTypes.Id'
                }
            },
            'SampleAnalyseGroups': {
                'primary_key': 'Id',
                'columns': {
                    'Id': 'int',
                    'Germ': 'nvarchar(255)',
                    'SampleAnalyseGroup_Sample': 'int',
                    'RequestAnalyseGroup_AnalysisGroup': 'int',
                    'IsAnalyseDone': 'bit',
                    'IsAnalyseExported': 'bit'
                },
                'foreign_keys': {
                    'SampleAnalyseGroup_Sample': 'Samples.Id',
                    'RequestAnalyseGroup_AnalysisGroup': 'AnalysisGroups.Id'
                }
            },
            'Results': {
                'primary_key': 'Id',
                'columns': {
                    'Id': 'int',
                    'DateCreated': 'datetime',
                    'DateLastModified': 'datetime',
                    'CreatedBy': 'nvarchar(255)',
                    'ModifiedBy': 'nvarchar(255)',
                    'TechValidated': 'bit',
                    'BioValidated': 'bit',
                    'TechValidatedBy': 'nvarchar(255)',
                    'BioValidatedBy': 'nvarchar(255)',
                    'DateTechValidated': 'datetime',
                    'DateBioValidated': 'datetime',
                    'Result_Sample': 'int',
                    'Result_Analysis': 'int',
                    'Result_AnalysisGroup': 'int',
                    'Result_AnalysisAnalysisGroup': 'int'
                },
                'foreign_keys': {
                    'Result_Sample': 'Samples.Id',
                    'Result_Analysis': 'Analyses.Id'
                }
            },
            'Analyses': {
                'primary_key': 'Id',
                'columns': {
                    'Id': 'int',
                    'Name_nl': 'nvarchar(255)',
                    'Name_fr': 'nvarchar(255)',
                    'Name_en': 'nvarchar(255)',
                    'ResultCode': 'nvarchar(255)',
                    'ShowOnReport': 'bit',
                    'RapportCode': 'nvarchar(5)',
                    'AnalyseCode': 'nvarchar(15)'
                },
                'foreign_keys': {}
            },
            'AnalysisGroups': {
                'primary_key': 'Id',
                'columns': {
                    'Id': 'int',
                    'Name_nl': 'nvarchar(255)',
                    'Name_fr': 'nvarchar(255)',
                    'Name_en': 'nvarchar(255)',
                    'Item_ItemCode': 'nvarchar(255)',
                    'Abbrev': 'nvarchar(255)',
                    'Sequence': 'int',
                    'IsCanceled': 'bit',
                    'AnalysisGroup_AnalysisCategory': 'int',
                    'ReportSmall': 'bit',
                    'IsAutomated': 'bit'
                },
                'foreign_keys': {
                    'AnalysisGroup_AnalysisCategory': 'AnalysisCategories.Id'
                }
            },
            'SampleTypes': {
                'primary_key': 'Id',
                'columns': {
                    'Id': 'int',
                    'Name_nl': 'nvarchar(255)',
                    'Name_fr': 'nvarchar(255)',
                    'Name_en': 'nvarchar(255)',
                    'IsSalmonella': 'bit'
                },
                'foreign_keys': {}
            },
            'Accounts': {
                'primary_key': 'Id',
                'columns': {
                    'Id': 'int',
                    'Name': 'varchar(255)',  # Note: This exists in Accounts table
                    'VatNumber': 'varchar(50)',
                    'StableNumber': 'varchar(50)',
                    'RegistrationNumber': 'varchar(50)',
                    'AddressStreet': 'varchar(100)',
                    'AddressCity': 'varchar(100)',
                    'AddressZipCode': 'varchar(50)',
                    'AddressCountryCode': 'varchar(5)',
                    'EmailAddress': 'varchar(100)'
                },
                'foreign_keys': {}
            },
            'Companies': {
                'primary_key': 'Id',
                'columns': {
                    'Id': 'int',
                    'Name_nl': 'nvarchar(255)',  # Changed from 'Name' to 'Name_nl'
                    'Name_fr': 'nvarchar(255)',
                    'Name_en': 'nvarchar(255)',
                    'Stad': 'nvarchar(255)',
                    'Land': 'nvarchar(255)',
                    'ReportPath': 'nvarchar(255)',
                    'Telefoon': 'nvarchar(255)',
                    'Email': 'nvarchar(255)'
                }
            },
            'AnalysisGroups': {
                'primary_key': 'Id',
                'columns': {
                    'Id': 'int',
                    'Name_en': 'nvarchar(255)',
                    'Name_nl': 'nvarchar(255)',
                    'Name_fr': 'nvarchar(255)',
                    'Abbrev': 'nvarchar(255)'
                }
            },
            'Analyses': {
                'primary_key': 'Id',
                'columns': {
                    'Id': 'int',
                    'Name_en': 'nvarchar(255)',
                    'Name_nl': 'nvarchar(255)',
                    'Name_fr': 'nvarchar(255)',
                    'AnalyseCode': 'nvarchar(15)'
                }
            },
            'SampleTypes': {
                'primary_key': 'Id', 
                'columns': {
                    'Id': 'int',
                    'Name_en': 'nvarchar(255)',
                    'Name_nl': 'nvarchar(255)',
                    'Name_fr': 'nvarchar(255)',
                    'IsSalmonella': 'bit'
                }
            },
            # Bacteriology tables - based on database reference
            'tblBactResult': {
                'primary_key': 'BactResultID',
                'columns': {
                    'BactResultID': 'int',
                    'SampleID': 'int',
                    'Bar_DatumInzet': 'datetime',
                    'Bar_VerificatieAt': 'datetime',
                    'Bar_VerificatieBy': 'nvarchar(50)',
                    'Bar_IsAfgewerkt': 'bit'
                },
                'foreign_keys': {
                    'SampleID': 'Samples.Id'
                }
            },
            'tblBactResultLine': {
                'primary_key': 'BactResultLineID',
                'columns': {
                    'BactResultLineID': 'int',
                    'BactResultID': 'int',
                    'KiemID': 'int',
                    'Brl_KiemenDescr': 'nvarchar(50)',
                    'Brl_Plaat': 'nvarchar(15)'
                },
                'foreign_keys': {
                    'BactResultID': 'tblBactResult.BactResultID',
                    'KiemID': 'Germs.GermId'
                }
            },
            'tblBactResultLineAnalyse': {
                'primary_key': 'BactResultLineAnalyseID',
                'columns': {
                    'BactResultLineAnalyseID': 'int',
                    'BactResultLineID': 'int',
                    'AnalyseID': 'int',
                    'Brla_AantalMM': 'int',
                    'PosNegsID': 'int'
                },
                'foreign_keys': {
                    'BactResultLineID': 'tblBactResultLine.BactResultLineID',
                    'AnalyseID': 'Analyses.Id',
                    'PosNegsID': 'PosNegs.Id'
                }
            },
            'Germs': {
                'primary_key': 'GermId',
                'columns': {
                    'GermId': 'int',
                    'Name_en': 'nvarchar(255)',
                    'Name_nl': 'nvarchar(255)',
                    'Name_fr': 'nvarchar(255)'
                }
            },
            'PosNegs': {
                'primary_key': 'Id',
                'columns': {
                    'Id': 'int',
                    'Name_en': 'nvarchar(255)',
                    'Name_nl': 'nvarchar(255)'
                }
            }
        }
        
    def get_best_name_column(self, table_name: str) -> str:
        """Get the best available name column for a table based on dynamic schema"""
        available_columns = self.get_available_columns(table_name)
        
        # Priority order for name columns (preferred first)
        name_column_priorities = [
            'Name_nl',      # Dutch name (preferred for Poulpharm)
            'Name',         # Generic name
            'Naam',         # Dutch alternative
            'Name_en',      # English name
            'Name_fr',      # French name
            'Title',        # Alternative name field
            'Description',  # Fallback description
            'Code',         # Code as identifier
            'Id'           # Last resort - ID field
        ]
        
        for preferred_name in name_column_priorities:
            if preferred_name in available_columns:
                logger.info(f"Using '{preferred_name}' as name column for table '{table_name}'")
                return preferred_name
        
        # If no standard name column found, return the first non-ID column
        for col in available_columns:
            if col.lower() not in ['id', 'datecreated', 'datemodified', 'createdby', 'modifiedby']:
                logger.warning(f"Using fallback column '{col}' as name for table '{table_name}'")
                return col
                
        # Ultimate fallback - use Id
        logger.warning(f"No suitable name column found for table '{table_name}', using 'Id'")
        return 'Id'
    
    def validate_column_exists(self, table_name: str, column_name: str) -> bool:
        """Check if a column actually exists in the dynamic schema"""
        available_columns = self.get_available_columns(table_name)
        return column_name in available_columns
    
    def get_safe_column_reference(self, table_name: str, preferred_column: str, fallback_column: str = 'Id') -> str:
        """Get a safe column reference, falling back if the preferred column doesn't exist"""
        if self.validate_column_exists(table_name, preferred_column):
            return preferred_column
        elif self.validate_column_exists(table_name, fallback_column):
            logger.warning(f"Column '{preferred_column}' not found in table '{table_name}', using '{fallback_column}'")
            return fallback_column
        else:
            # Try to find the best available column
            best_column = self.get_best_name_column(table_name)
            logger.warning(f"Neither '{preferred_column}' nor '{fallback_column}' found in table '{table_name}', using '{best_column}'")
            return best_column
        
    def _setup_poulpharm_optimizations(self):
        """Setup Poulpharm LIMS specific query optimizations"""
        # Common query patterns for Poulpharm LIMS
        self.common_patterns = {
            'recent_requests': {
                'keywords': ['recent', 'latest', 'new', 'current'],
                'base_query': 'Requests WHERE DateCreated >= DATEADD(month, -3, GETDATE())'
            },
            'bacteriology': {
                'keywords': ['bacteria', 'antibiogram', 'sensitivity', 'resistance', 'germ'],
                'tables': ['tblBactResult', 'tblAntibiogram', 'tblBactResultLine']
            },
            'pcr_analysis': {
                'keywords': ['pcr', 'molecular', 'genetic'],
                'tables': ['tblPCRResult', 'tblPCRKit', 'tblPCRImport']
            },
            'customer_analysis': {
                'keywords': ['customer', 'company', 'veterinarian', 'client'],
                'tables': ['Companies', 'tblDierenarts', 'Requests']
            },
            'sample_workflow': {
                'keywords': ['sample', 'workflow', 'processing', 'status'],
                'tables': ['Samples', 'SampleAnalyseGroups', 'AnalysisGroups']
            }
        }
        
    def _detect_query_intent(self, user_query: str) -> Dict[str, Any]:
        """Detect user query intent and suggest appropriate tables/approach"""
        query_lower = user_query.lower()
        
        # Score each pattern
        pattern_scores = {}
        for pattern_name, pattern_info in self.common_query_patterns.items():
            score = 0
            for keyword in pattern_info['keywords']:
                if keyword in query_lower:
                    score += 1
            pattern_scores[pattern_name] = score
        
        # Find best matching pattern
        best_pattern = max(pattern_scores.items(), key=lambda x: x[1])
        
        if best_pattern[1] > 0:
            pattern_info = self.common_query_patterns[best_pattern[0]]
            return {
                'pattern': best_pattern[0],
                'confidence': best_pattern[1] / len(pattern_info['keywords']),
                'suggested_tables': pattern_info['tables'],
                'avoid_tables': pattern_info.get('avoid_tables', []),
                'required_joins': pattern_info.get('required_joins', [])
            }
        
        # Default fallback
        return {
            'pattern': 'generic',
            'confidence': 0.1,
            'suggested_tables': ['Requests', 'Samples'],
            'avoid_tables': [],
            'required_joins': []
        }

    def _format_top_clause(self, max_rows) -> str:
        """Helper method to format TOP clause for SQL queries"""
        if max_rows is None:
            return ""  # No TOP clause for unlimited
        return f"TOP {max_rows}"

    def generate_sql_query(self, user_query: str, max_rows: Optional[int] = 10000, model: str = 'claude-sonnet-4-5-20250929') -> Tuple[str, Dict[str, Any]]:
        """
        Generate SQL query based on user analysis request
        Returns: (sql_query, metadata)
        """
        try:
            # Detect query intent
            logger.debug("Detecting query intent...")
            intent = self._detect_query_intent(user_query)
            logger.info(f"Detected query intent: {intent['pattern']} (confidence: {intent['confidence']:.2f})")
            
            # Prepare context for the LLM
            logger.debug("Preparing database context...")
            context = self._prepare_database_context()
            
            # Create prompt for SQL generation with intent guidance
            logger.debug("Creating SQL generation prompt...")
            prompt = self._create_sql_generation_prompt(user_query, context, max_rows, intent)  
            
            # Use statistical agent to generate SQL if available
            if self.statistical_agent:
                response = self._generate_with_llm(prompt, model)
                sql_query, explanation = self._parse_llm_response(response)
                
                # Validate generated SQL
                is_valid, validation_errors = self._validate_generated_sql(sql_query)
                if not is_valid:
                    logger.warning(f"Generated SQL has validation errors: {validation_errors}")
                    # Try fallback approach
                    sql_query, explanation = self._generate_basic_sql(user_query, max_rows)
                    
            else:
                # Fallback: basic SQL generation
                sql_query, explanation = self._generate_basic_sql(user_query, max_rows)
            
            metadata = {
                'user_query': user_query,
                'database_name': self.schema.database_name,
                'explanation': explanation,
                'generated_at': datetime.now().isoformat(),
                'max_rows': max_rows,
                'detected_intent': intent,
                'generation_method': 'llm' if self.statistical_agent else 'fallback'
            }
            
            return sql_query, metadata
            
        except Exception as e:
            logger.error(f"Error generating SQL query: {str(e)}")
            raise
    
    def _prepare_database_context(self) -> str:
        """Prepare comprehensive database schema context using dynamic schema information"""
        logger.debug("Starting _prepare_database_context")
        context_parts = []
        
        # Database overview
        context_parts.append(f"DATABASE: {self.schema.database_name}")
        context_parts.append(f"DESCRIPTION: {self.schema.description}")
        context_parts.append(f"TOTAL TABLES: {len(self.schema.complete_table_list)}")
        
        # Add business context from table categories (if available)
        if hasattr(self.schema, 'table_categories') and self.schema.table_categories:
            context_parts.append("\nBUSINESS DOMAIN CATEGORIES:")
            try:
                table_categories = self.schema.table_categories
                if isinstance(table_categories, dict):
                    for category, info in table_categories.items():
                        context_parts.append(f"  {category}:")
                        if isinstance(info, dict):
                            if 'description' in info:
                                context_parts.append(f"    Purpose: {info['description']}")
                            if 'tables' in info and isinstance(info['tables'], list):
                                # Extract table names from the list (which may contain dicts)
                                table_names = []
                                for table_entry in info['tables'][:10]:  # Limit display
                                    if isinstance(table_entry, str):
                                        table_names.append(table_entry)
                                    elif isinstance(table_entry, dict) and 'name' in table_entry:
                                        table_names.append(table_entry['name'])
                                if table_names:
                                    context_parts.append(f"    Tables: {', '.join(table_names)}")
            except Exception as e:
                logger.warning(f"Could not add table categories to context: {str(e)}")
        
        # Add detailed table information from our dynamic loading
        context_parts.append("\nKEY TABLES WITH BUSINESS CONTEXT:")
        
        # Group tables by business purpose for better context
        purpose_groups = {}
        for table_name, table_info in self.detailed_tables.items():
            purpose = table_info.get('business_purpose', 'supporting_data')
            if purpose not in purpose_groups:
                purpose_groups[purpose] = []
            purpose_groups[purpose].append((table_name, table_info))
        
        # Present tables organized by business purpose
        try:
            for purpose, tables in purpose_groups.items():
                context_parts.append(f"\n{purpose.upper().replace('_', ' ')} TABLES:")
                tables_list = list(tables) if not isinstance(tables, list) else tables
                for table_name, table_info in tables_list[:5]:  # Limit to avoid context overflow
                    context_parts.append(f"  {table_name}:")
                    if 'description' in table_info:
                        context_parts.append(f"    Purpose: {table_info['description']}")
                    if 'category' in table_info:
                        context_parts.append(f"    Category: {table_info['category']}")
        except Exception as e:
            logger.error(f"Error in business purpose tables section: {e}")
            context_parts.append("\nBUSINESS PURPOSE TABLES: Error loading table information")
        
        # Add complete table list for reference
        context_parts.append(f"\nCOMPLETE TABLE LIST ({len(self.schema.complete_table_list)} tables):")
        context_parts.append(", ".join(self.schema.complete_table_list))
        
        # Add relationship context
        try:
            context_parts.append("\nTABLE RELATIONSHIPS:")
            for table_name, relationships in self.table_relationships.items():
                if 'related_tables' in relationships and relationships['related_tables']:
                    # Ensure we only join strings, not dicts
                    related_names = []
                    related_tables = relationships['related_tables']
                    if isinstance(related_tables, list):
                        for item in related_tables[:3]:
                            if isinstance(item, str):
                                related_names.append(item)
                            elif isinstance(item, dict) and 'name' in item:
                                related_names.append(item['name'])
                            else:
                                related_names.append(str(item))
                    else:
                        logger.warning(f"related_tables for {table_name} is not a list: {type(related_tables)}")
                    if related_names:
                        context_parts.append(f"  {table_name} relates to: {', '.join(related_names)}")
        except Exception as e:
            logger.error(f"Error in table relationships section: {e}")
            context_parts.append("\nTABLE RELATIONSHIPS: Error loading relationship information")
        
        # Add actual column information for key tables
        context_parts.append("\nKEY TABLE COLUMNS (Actual column names to use in SQL):")
        key_tables_for_columns = ['Requests', 'Samples', 'Results', 'Analyses', 'AnalysisGroups', 'SampleAnalyseGroups', 'Companies']
        for table_name in key_tables_for_columns:
            if table_name in self.detailed_tables and 'columns' in self.detailed_tables[table_name]:
                columns_data = self.detailed_tables[table_name]['columns']
                # Handle both list and dict formats for columns
                if isinstance(columns_data, list):
                    columns = columns_data[:15]  # Show first 15 columns for better context
                    context_parts.append(f"  {table_name}: {', '.join(columns)}")
                    
                    # Add special note for tables with language-specific columns
                    if any('_nl' in col or '_fr' in col or '_en' in col for col in columns):
                        lang_cols = [col for col in columns if '_nl' in col or '_fr' in col or '_en' in col]
                        context_parts.append(f"    → Language columns: {', '.join(lang_cols)}")
                        # Get best name column for this specific table
                        best_name_col = self.get_best_name_column(table_name)
                        context_parts.append(f"    → For {table_name}: Use '{best_name_col}' for name/description")
                        
                elif isinstance(columns_data, dict):
                    # If it's a dict (column_name: data_type), extract just the column names
                    column_names = list(columns_data.keys())[:15]
                    context_parts.append(f"  {table_name}: {', '.join(column_names)}")
                else:
                    logger.warning(f"Columns for {table_name} is unexpected type: {type(columns_data)}")
        
        # Add general schema guidance (specific column errors will be handled dynamically in iterations)
        context_parts.append("\nļæ½ SCHEMA GUIDANCE:")
        context_parts.append("• Always verify column names exist in their respective tables")
        context_parts.append("• Use proper table aliases for clarity")
        context_parts.append("• Companies table may be empty - consider using r.sCustomer for customer names")
        context_parts.append("• CRITICAL: Use exact column names from the schema above - verify each column exists in its table")
        
        # Add table-specific column guidance based on common usage patterns
        context_parts.append("\nļæ½ TABLE-SPECIFIC COLUMN GUIDANCE:")
        common_tables = ['Companies', 'Analyses', 'SampleTypes', 'AnalysisGroups', 'Results', 'Requests', 'Samples']
        for table in common_tables:
            available_columns = self.get_available_columns(table)
            if available_columns:
                best_name_col = self.get_best_name_column(table)
                context_parts.append(f"• {table}: Use '{best_name_col}' for names/descriptions")
        
        # Add recommended JOIN patterns
        context_parts.append("\nšŸ’” RECOMMENDED JOIN PATTERNS:")
        joins = self.schema.get_recommended_joins()
        for join_name, join_info in joins.items():
            context_parts.append(f"āœ… {join_info['description']}:")
            context_parts.append(f"   FROM {join_info['tables'][0]} {join_info['tables'][1] if len(join_info['tables']) > 1 else ''}")
            if len(join_info['tables']) > 1:
                context_parts.append(f"   JOIN {join_info['tables'][1]} ON {join_info['join_condition']}")
            
        # Add high-value result tables
        context_parts.append("\nšŸ“Š HIGH-VALUE RESULT TABLES (prioritize these for laboratory analysis):")
        result_tables = self.schema.get_best_result_tables()
        for table in result_tables:
            available_columns = self.schema.get_available_columns(table)
            if available_columns:
                value_columns = [col for col in available_columns if any(keyword in col.lower() for keyword in ['result', 'value', 'text', 'posneg'])]
                context_parts.append(f"   šŸ“‹ {table}: {', '.join(value_columns[:5])}")
        
        # Add query optimization guidance
        context_parts.append("\nQUERY OPTIMIZATION GUIDANCE:")
        context_parts.append("- CRITICAL: Use exact column names as shown above - avoid generic names like 'Name'")
        context_parts.append("- For name columns: Check the exact column names above - different tables use different naming patterns")
        context_parts.append("- Join tables logically based on business relationships")
        context_parts.append("- Include relevant result tables when users ask for analysis outcomes")
        context_parts.append("- Consider the full workflow: Request -> Sample -> Analysis -> Results")
        context_parts.append("- Use appropriate date filtering for temporal analysis")
        context_parts.append("- ALWAYS verify column names exist in the table before using them")
        context_parts.append("- āš ļø  NEVER use Companies.City, Samples.SampleType, or Users.Name - they don't exist!")
        
        return "\n".join(context_parts)
    
    def _validate_generated_sql(self, sql_query: str) -> Tuple[bool, List[str]]:
        """Validate generated SQL against known schema"""
        errors = []
        
        # Extract table names from SQL
        import re
        
        # Find FROM and JOIN clauses
        table_pattern = r'(?:FROM|JOIN)\s+(\w+)(?:\s+(\w+))?'
        matches = re.findall(table_pattern, sql_query, re.IGNORECASE)
        
        for match in matches:
            table_name = match[0]
            if table_name not in self.detailed_tables:
                errors.append(f"Unknown table: {table_name}")
        
        # Find column references (basic check)
        column_pattern = r'(\w+)\.(\w+)'
        col_matches = re.findall(column_pattern, sql_query)
        
        for table_alias, column_name in col_matches:
            # This is a simplified check - would need more sophisticated parsing for full validation
            pass
        
        return len(errors) == 0, errors
    
    def _create_sql_generation_prompt(self, user_query: str, db_context: str, max_rows: int, intent: Dict[str, Any] = None) -> str:
        """Create prompt for SQL generation with intent guidance"""
        
        # Build intent-specific guidance
        intent_guidance = ""
        if intent:
            try:
                intent_guidance = f"""
DETECTED QUERY INTENT: {intent.get('pattern', 'unknown').upper()} (confidence: {intent.get('confidence', 0.0):.2f})
RECOMMENDED TABLES: {', '.join(intent.get('suggested_tables', []))}
"""
                if intent.get('avoid_tables'):
                    intent_guidance += f"AVOID TABLES: {', '.join(intent.get('avoid_tables', []))}\n"
                
                if intent.get('required_joins'):
                    intent_guidance += "REQUIRED JOIN PATTERN:\n"
                    for join in intent.get('required_joins', []):
                        intent_guidance += f"  JOIN {join}\n"
            except Exception as e:
                logger.error(f"Error building intent guidance: {e}")
                intent_guidance = "INTENT GUIDANCE: Error processing query intent"
        
        # Build adaptive guidance based on user query analysis
        query_analysis = self._analyze_user_query(user_query)
        
        prompt = f"""You are an expert SQL analyst for the {self.schema.database_name} database. Your goal is to create the OPTIMAL SQL query to answer the user's specific analysis request using the comprehensive schema information provided.

{intent_guidance}

USER ANALYSIS REQUEST: "{user_query}"

QUERY ANALYSIS:
- Primary Focus: {query_analysis['primary_focus']}
- Expected Data Types: {', '.join(query_analysis['expected_data_types'])}
- Temporal Scope: {query_analysis['temporal_scope']}
- Required Business Context: {query_analysis['business_context']}

{db_context}

ADAPTIVE QUERY CONSTRUCTION STRATEGY:

STEP 1 - IDENTIFY OPTIMAL DATASET:
Analyze what the user is asking for and select the minimal but complete set of tables that will provide this information. Consider:
- What is the user's ultimate analytical goal?
- What specific data points are needed to answer their question?
- Which tables contain the core information vs supporting context?
- How should tables be logically connected based on business workflow?

STEP 2 - CONSTRUCT INTELLIGENT JOINS:
- Start with the most relevant core table for the user's request
- Add tables that provide essential context or requested details
- Use proper JOIN types based on data requirements (INNER vs LEFT JOIN)
- Follow the natural business process flow when possible

STEP 3 - OPTIMIZE DATA SELECTION:
- Include columns that directly answer the user's question
- Add supporting context columns that enable analysis
- Consider grouping/aggregation if the user wants summary statistics
- Include temporal fields when user asks about trends or distributions

CRITICAL REQUIREMENTS:
1. Use ONLY table and column names that exist in the schema above
2. Generate SQL Server T-SQL compatible syntax
3. {f"Limit to TOP {max_rows} rows" if max_rows is not None else "Return all rows (no limit)"}
4. Create meaningful column aliases
5. Add appropriate WHERE clauses for data quality/relevance
6. Order results logically for the intended analysis
7. IMPORTANT: Always use LEFT JOIN when joining the Companies table (e.g., "LEFT JOIN Companies c ON c.Id = r.Customer_ID") as many requests may not have valid company references

REFLECTION QUESTIONS TO GUIDE YOUR QUERY:
- Does this query directly address what the user is asking for?
- Have I included all necessary tables to provide complete context?
- Are there any missing pieces that would make the analysis incomplete?
- Is this the most efficient way to retrieve the requested information?
- Will the result set enable the user to perform their intended analysis?

OUTPUT FORMAT:
```sql
-- Analysis Goal: [What this query is designed to achieve]
-- Key Tables Used: [Primary tables and why they were selected]
-- Business Logic: [How the joins reflect real-world relationships]
SELECT {self._format_top_clause(max_rows)}
    [columns with clear aliases and business meaning]
FROM [starting_table] [alias]
[JOIN clauses that reflect logical business relationships]
WHERE [conditions that ensure data quality and relevance]
ORDER BY [ordering that supports analysis goals]
```

Now generate the optimal SQL query for this analysis request:"""
        
        return prompt
    
    def _analyze_user_query(self, user_query: str) -> Dict[str, str]:
        """Analyze user query to provide adaptive guidance"""
        query_lower = user_query.lower()
        
        # Determine primary focus
        if any(word in query_lower for word in ['result', 'outcome', 'finding', 'test', 'analysis']):
            primary_focus = "Analysis Results & Outcomes"
        elif any(word in query_lower for word in ['customer', 'client', 'company', 'practice']):
            primary_focus = "Customer & Practice Information"
        elif any(word in query_lower for word in ['sample', 'specimen', 'collection']):
            primary_focus = "Sample Management"
        elif any(word in query_lower for word in ['request', 'order', 'submission']):
            primary_focus = "Request Workflow"
        else:
            primary_focus = "General Business Intelligence"
        
        # Expected data types
        data_types = []
        if any(word in query_lower for word in ['count', 'number', 'total', 'sum']):
            data_types.append("Aggregated Counts")
        if any(word in query_lower for word in ['distribution', 'breakdown', 'category']):
            data_types.append("Categorical Distributions")
        if any(word in query_lower for word in ['time', 'date', 'over time', 'trend', 'monthly', 'yearly']):
            data_types.append("Temporal Data")
        if any(word in query_lower for word in ['type', 'kind', 'category']):
            data_types.append("Classification Data")
        if not data_types:
            data_types.append("Detailed Records")
        
        # Temporal scope
        if any(word in query_lower for word in ['recent', 'last', 'current']):
            temporal_scope = "Recent/Current Data"
        elif any(word in query_lower for word in ['historical', 'over time', 'trend']):
            temporal_scope = "Historical Trends"
        elif any(word in query_lower for word in ['monthly', 'yearly', 'quarterly']):
            temporal_scope = "Periodic Analysis"
        else:
            temporal_scope = "All Available Data"
        
        # Business context
        if any(word in query_lower for word in ['workflow', 'process', 'pipeline']):
            business_context = "Process Flow Context"
        elif any(word in query_lower for word in ['performance', 'efficiency', 'quality']):
            business_context = "Performance Metrics"
        elif any(word in query_lower for word in ['statistics', 'statistical', 'descriptive']):
            business_context = "Statistical Analysis"
        else:
            business_context = "Operational Overview"
        
        return {
            'primary_focus': primary_focus,
            'expected_data_types': data_types,
            'temporal_scope': temporal_scope,
            'business_context': business_context
        }
    
    def _generate_with_llm(self, prompt: str, model: str = 'claude-sonnet-4-5-20250929') -> str:
        """Generate SQL using the statistical agent's LLM"""
        try:
            # Add pattern-specific guidance for Poulpharm
            enhanced_prompt = prompt
            if self.is_poulpharm:
                pattern_hints = self._get_poulpharm_pattern_hints(prompt)
                if pattern_hints:
                    enhanced_prompt += f"\n\nSPECIFIC HINTS FOR THIS QUERY:\n{pattern_hints}"
            
            logger.debug(f"Sending prompt to LLM (length: {len(enhanced_prompt)} chars)")
            
            # Use specified model for SQL generation
            response = self.statistical_agent.query_llm(enhanced_prompt, model=model, max_tokens=2000)
            
            logger.debug(f"Received Claude response (length: {len(response)} chars)")
            return response
        except Exception as e:
            logger.error(f"Error generating SQL with LLM: {str(e)}")
            raise
    
    def _get_poulpharm_pattern_hints(self, prompt: str) -> str:
        """Get Poulpharm-specific hints based on detected patterns"""
        hints = []
        prompt_lower = prompt.lower()
        
        for pattern_name, pattern_info in self.common_patterns.items():
            if any(keyword in prompt_lower for keyword in pattern_info['keywords']):
                if pattern_name == 'recent_requests':
                    hints.append("- Start with Requests table and filter by DateCreated for recent data")
                elif pattern_name == 'bacteriology':
                    hints.append("- Include tblBactResult for bacterial isolates and tblAntibiogram for sensitivity")
                    hints.append("- Join through Samples: Requests -> Samples -> tblBactResult")
                elif pattern_name == 'customer_analysis':
                    hints.append("- Use Companies table for customer details, sCustomer field in Requests for quick lookup")
                elif pattern_name == 'sample_workflow':
                    hints.append("- Follow workflow: Requests -> Samples -> SampleAnalyseGroups -> AnalysisGroups")
        
        return "\n".join(hints)
    
    def _parse_llm_response(self, response: str) -> Tuple[str, str]:
        """Parse LLM response to extract SQL query and explanation"""
        try:
            import re
            
            logger.debug(f"Parsing LLM response: {response[:200]}...")
            
            # Find explanation first
            explanation = "Generated SQL query"
            explanation_patterns = [
                r'-- Explanation: (.+)',
                r'Explanation: (.+)',
                r'(?:This query|The query|SQL) (.+?)(?:\n|$)',
                r'```sql\s*--\s*(.+?)(?:\n|$)'
            ]
            
            for pattern in explanation_patterns:
                explanation_match = re.search(pattern, response, re.IGNORECASE)
                if explanation_match:
                    explanation = explanation_match.group(1).strip()
                    break
            
            # Try multiple approaches to extract SQL
            sql_query = None
            
            # Method 1: Look for SQL code block with various formats
            sql_patterns = [
                r'```sql\s*(.*?)```',
                r'```\s*(SELECT.*?)```',
                r'```tsql\s*(.*?)```',
                r'```\s*(.*?)```'
            ]
            
            for pattern in sql_patterns:
                sql_match = re.search(pattern, response, re.DOTALL | re.IGNORECASE)
                if sql_match:
                    sql_query = sql_match.group(1).strip()
                    # Clean up common artifacts
                    sql_query = re.sub(r'^-- Explanation: .+\n', '', sql_query, flags=re.MULTILINE)
                    sql_query = re.sub(r'^\s*--.*?\n', '', sql_query, flags=re.MULTILINE)
                    break
            
            # Method 2: Look for SELECT statements without code blocks
            if not sql_query:
                select_match = re.search(r'(SELECT\s+(?:TOP\s+\d+\s+)?.*?)(?:\n\n|\Z)', response, re.DOTALL | re.IGNORECASE)
                if select_match:
                    sql_query = select_match.group(1).strip()
            
            # Method 3: Extract everything that looks like SQL (fallback)
            if not sql_query:
                lines = response.split('\n')
                sql_lines = []
                in_sql = False
                for line in lines:
                    line_upper = line.upper().strip()
                    if any(keyword in line_upper for keyword in ['SELECT', 'WITH', 'INSERT', 'UPDATE', 'DELETE']):
                        in_sql = True
                        sql_lines.append(line)
                    elif in_sql:
                        # Continue collecting SQL lines
                        if line.strip() == '' and len(sql_lines) > 0:
                            # Empty line might end SQL block
                            if sql_lines[-1].strip().endswith(';') or any(word in sql_lines[-1].upper() for word in ['ORDER BY', 'GROUP BY']):
                                break
                        sql_lines.append(line)
                        # Stop if we've collected too much or hit obvious end
                        if len(sql_lines) > 50 or line.strip().endswith(';'):
                            break
                
                if sql_lines:
                    sql_query = '\n'.join(sql_lines).strip()
            
            # Final validation and cleanup
            if sql_query:
                # Remove leading/trailing whitespace and ensure it looks like SQL
                sql_query = sql_query.strip()
                if not any(keyword in sql_query.upper() for keyword in ['SELECT', 'INSERT', 'UPDATE', 'DELETE', 'WITH']):
                    sql_query = None
            
            if not sql_query:
                logger.warning("Could not extract SQL query from LLM response, using full response")
                # Log part of the response for debugging
                logger.debug(f"Full LLM response: {response}")
                raise ValueError("Could not extract SQL query from LLM response")
            
            logger.debug(f"Extracted SQL query: {sql_query[:100]}...")
            return sql_query, explanation
            
        except Exception as e:
            logger.error(f"Error parsing LLM response: {str(e)}")
            logger.debug(f"Response that failed to parse: {response}")
            
            # Enhanced fallback - try to find any SQL-like content
            if 'SELECT' in response.upper():
                # Extract everything from first SELECT to end or natural break
                import re
                fallback_match = re.search(r'(SELECT.*)', response, re.DOTALL | re.IGNORECASE)
                if fallback_match:
                    fallback_sql = fallback_match.group(1).strip()
                    # Clean up obvious non-SQL content at the end
                    fallback_sql = re.sub(r'\n[A-Za-z][^;]*$', '', fallback_sql)
                    return fallback_sql, "Extracted SQL from response (fallback)"
            
            # Last resort: return the full response
            return response.strip(), "Generated SQL query (unparsed)"
    
    def _generate_basic_sql(self, user_query: str, max_rows: Optional[int]) -> Tuple[str, str]:
        """Fallback basic SQL generation without LLM using structured approach"""
        intent = self._detect_query_intent(user_query)
        logger.info(f"Using fallback SQL generation with intent: {intent['pattern']}")
        
        if intent['pattern'] == 'customer_stats':
            return self._generate_customer_stats_sql(max_rows)
        elif intent['pattern'] == 'bacteriology':
            return self._generate_bacteriology_sql(max_rows)
        elif intent['pattern'] == 'sample_workflow':
            return self._generate_sample_workflow_sql(max_rows)
        else:
            return self._generate_generic_sql(max_rows)
    
    def _generate_customer_stats_sql(self, max_rows: Optional[int], simple_version: bool = False) -> Tuple[str, str]:
        """Generate customer statistics SQL using exact schema - optimized to avoid empty results"""
        
        if simple_version:
            # Use simple version without Companies table dependency
            return self._generate_simple_customer_stats_sql(max_rows)
        
        # Full version with company information
        sql_parts = [
            f"SELECT {self._format_top_clause(max_rows)}",
            "    COALESCE(r.sCustomer, 'Customer ID: ' + CAST(r.Customer_ID AS VARCHAR), 'Unknown Customer') as CustomerName,",
            "    COALESCE(c.Name, 'No Company Info') as CompanyName,",
            "    COALESCE(c.Stad, 'Unknown') as City,",
            "    COALESCE(c.Land, 'Unknown') as Country,",
            "    COUNT(r.Id) as TotalRequests,",
            "    COUNT(DISTINCT s.Id) as TotalSamples,",
            "    MIN(r.DateCreated) as FirstRequestDate,",
            "    MAX(r.DateCreated) as LastRequestDate,",
            "    DATEDIFF(day, MIN(r.DateCreated), MAX(r.DateCreated)) as DaysBetweenFirstLast",
            "FROM Requests r",
            "LEFT JOIN Companies c ON r.Customer_ID = c.Id",
            "LEFT JOIN Samples s ON s.Sample_Request = r.Id",
            "WHERE r.DateCreated >= DATEADD(month, -3, GETDATE())",
            "GROUP BY r.sCustomer, r.Customer_ID, c.Name, c.Stad, c.Land",
            "ORDER BY TotalRequests DESC, LastRequestDate DESC"
        ]
        
        explanation = "Customer request statistics for the last 3 months - uses LEFT JOINs to avoid excluding requests without company info"
        return "\n".join(sql_parts), explanation
    
    def _generate_simple_customer_stats_sql(self, max_rows: Optional[int]) -> Tuple[str, str]:
        """Generate simple customer statistics SQL without Companies table dependency"""
        sql_parts = [
            f"SELECT {self._format_top_clause(max_rows)}",
            "    COALESCE(r.sCustomer, 'Unknown Customer') as CustomerName,",
            "    COUNT(r.Id) as TotalRequests,",
            "    COUNT(DISTINCT s.Id) as TotalSamples,",
            "    MIN(r.DateCreated) as FirstRequestDate,",
            "    MAX(r.DateCreated) as LastRequestDate",
            "FROM Requests r",
            "LEFT JOIN Samples s ON s.Sample_Request = r.Id",
            "WHERE r.DateCreated >= DATEADD(month, -3, GETDATE())",
            "GROUP BY r.sCustomer",
            "ORDER BY TotalRequests DESC, LastRequestDate DESC"
        ]
        
        explanation = "Simple customer request statistics - no dependency on Companies table"
        return "\n".join(sql_parts), explanation
    
    def generate_diagnostic_query(self, original_query: str, intent: Dict[str, Any]) -> Tuple[str, str]:
        """Generate diagnostic query to understand why main query returned no results"""
        if intent['pattern'] == 'customer_stats':
            return self._generate_customer_stats_diagnostic()
        elif intent['pattern'] == 'bacteriology':
            return self._generate_bacteriology_diagnostic()
        else:
            return self._generate_generic_diagnostic()
    
    def _generate_customer_stats_diagnostic(self) -> Tuple[str, str]:
        """Generate diagnostic query for customer statistics"""
        sql_parts = [
            "SELECT",
            "    'Total Requests (All Time)' as CheckType,",
            "    COUNT(*) as Count,",
            "    MIN(DateCreated) as EarliestDate,",
            "    MAX(DateCreated) as LatestDate",
            "FROM Requests",
            "UNION ALL",
            "SELECT",
            "    'Requests Last 12 Months' as CheckType,",
            "    COUNT(*) as Count,",
            "    MIN(DateCreated) as EarliestDate,",
            "    MAX(DateCreated) as LatestDate",
            "FROM Requests",
            "WHERE DateCreated >= DATEADD(month, -12, GETDATE())",
            "UNION ALL",
            "SELECT",
            "    'Requests Last 6 Months' as CheckType,",
            "    COUNT(*) as Count,",
            "    MIN(DateCreated) as EarliestDate,",
            "    MAX(DateCreated) as LatestDate",
            "FROM Requests",
            "WHERE DateCreated >= DATEADD(month, -6, GETDATE())",
            "UNION ALL",
            "SELECT",
            "    'Requests Last 3 Months' as CheckType,",
            "    COUNT(*) as Count,",
            "    MIN(DateCreated) as EarliestDate,",
            "    MAX(DateCreated) as LatestDate",
            "FROM Requests",
            "WHERE DateCreated >= DATEADD(month, -3, GETDATE())",
            "UNION ALL",
            "SELECT",
            "    'Sample Recent Requests' as CheckType,",
            "    COUNT(*) as Count,",
            "    MIN(DateCreated) as EarliestDate,",
            "    MAX(DateCreated) as LatestDate",
            "FROM (",
            "    SELECT TOP 5 DateCreated, sCustomer, RequestNr FROM Requests",
            "    WHERE DateCreated >= DATEADD(month, -3, GETDATE()) ORDER BY DateCreated DESC",
            ") sample_data",
            "ORDER BY CheckType"
        ]
        
        explanation = "Diagnostic query to check request counts across different time periods and show sample data"
        return "\n".join(sql_parts), explanation
    
    def _generate_bacteriology_diagnostic(self) -> Tuple[str, str]:
        """Generate diagnostic query for bacteriology"""
        sql_parts = [
            "SELECT",
            "    'Bacteriology Results (All Time)' as CheckType,",
            "    COUNT(*) as Count,",
            "    MIN(Bar_DatumInzet) as EarliestDate,",
            "    MAX(Bar_DatumInzet) as LatestDate",
            "FROM tblBactResult",
            "WHERE Bar_DatumInzet IS NOT NULL"
        ]
        
        explanation = "Diagnostic query to check bacteriology data availability"
        return "\n".join(sql_parts), explanation
    
    def _generate_generic_diagnostic(self) -> Tuple[str, str]:
        """Generate generic diagnostic query"""
        sql_parts = [
            "SELECT",
            "    'Total Requests' as CheckType,",
            "    COUNT(*) as Count,",
            "    MIN(DateCreated) as EarliestDate,",
            "    MAX(DateCreated) as LatestDate",
            "FROM Requests"
        ]
        
        explanation = "Generic diagnostic query to check basic data availability"
        return "\n".join(sql_parts), explanation
    
    def _generate_bacteriology_sql(self, max_rows: Optional[int]) -> Tuple[str, str]:
        """Generate bacteriology SQL using exact schema"""
        sql_parts = [
            f"SELECT {self._format_top_clause(max_rows)}",
            "    r.RequestNr,",
            "    r.DateCreated,",
            "    r.sCustomer,",
            "    s.SampleNr,",
            "    s.Identification,",
            "    g.Name_en as GermName,",
            "    brl.Brl_KiemenDescr as GermDescription,",
            "    a.Name_en as AntibioticName,",
            "    brla.Brla_AantalMM as DiameterMM,",
            "    pn.Name_en as SensitivityResult",
            "FROM Requests r",
            "INNER JOIN Samples s ON s.Sample_Request = r.Id",
            "INNER JOIN tblBactResult br ON br.SampleID = s.Id",
            "LEFT JOIN tblBactResultLine brl ON br.BactResultID = brl.BactResultID",
            "LEFT JOIN tblBactResultLineAnalyse brla ON brl.BactResultLineID = brla.BactResultLineID",
            "LEFT JOIN Germs g ON brl.KiemID = g.GermId",
            "LEFT JOIN Analyses a ON brla.AnalyseID = a.Id",
            "LEFT JOIN PosNegs pn ON brla.PosNegsID = pn.Id",
            "WHERE r.DateCreated >= DATEADD(month, -6, GETDATE())",
            "  AND (g.Name_en IS NOT NULL OR brl.Brl_KiemenDescr IS NOT NULL)",
            "ORDER BY r.DateCreated DESC"
        ]
        
        explanation = "Bacteriology results with antibiogram sensitivity data using correct table relationships"
        return "\n".join(sql_parts), explanation
    
    def _generate_sample_workflow_sql(self, max_rows: Optional[int]) -> Tuple[str, str]:
        """Generate sample workflow SQL using exact schema"""
        sql_parts = [
            f"SELECT {self._format_top_clause(max_rows)}",
            "    r.RequestNr,",
            "    r.DateCreated,",
            "    r.sCustomer,",
            "    s.SampleNr,",
            "    s.Identification,",
            "    s.DateSampling,",
            "    st.Name_en as SampleType,",
            "    ag.Name_en as AnalysisGroup,",
            "    sag.IsAnalyseDone,",
            "    sag.IsAnalyseExported",
            "FROM Requests r",
            "INNER JOIN Samples s ON s.Sample_Request = r.Id",
            "LEFT JOIN SampleTypes st ON st.Id = s.Sample_SampleType",
            "LEFT JOIN SampleAnalyseGroups sag ON sag.SampleAnalyseGroup_Sample = s.Id",
            "LEFT JOIN AnalysisGroups ag ON ag.Id = sag.RequestAnalyseGroup_AnalysisGroup",
            "WHERE r.DateCreated >= DATEADD(month, -3, GETDATE())",
            "ORDER BY r.DateCreated DESC"
        ]
        
        explanation = "Sample workflow status with analysis progress"
        return "\n".join(sql_parts), explanation
    
    def _generate_generic_sql(self, max_rows: Optional[int]) -> Tuple[str, str]:
        """Generate generic SQL for unknown query types"""
        sql_parts = [
            f"SELECT {self._format_top_clause(max_rows)}",
            "    r.RequestNr,",
            "    r.DateCreated,",
            "    r.sCustomer,",
            "    s.SampleNr,",
            "    s.Identification,",
            "    COUNT(sag.Id) as AnalysisGroupCount",
            "FROM Requests r",
            "LEFT JOIN Samples s ON s.Sample_Request = r.Id", 
            "LEFT JOIN SampleAnalyseGroups sag ON sag.SampleAnalyseGroup_Sample = s.Id",
            "WHERE r.DateCreated >= DATEADD(month, -3, GETDATE())",
            "GROUP BY r.RequestNr, r.DateCreated, r.sCustomer, s.SampleNr, s.Identification",
            "ORDER BY r.DateCreated DESC"
        ]
        
        explanation = "Generic laboratory request and sample information"
        return "\n".join(sql_parts), explanation

Parameters

Name Type Default Kind
bases - -

Parameter Details

bases: Parameter of type

Return Value

Returns unspecified type

Class Interface

Methods

__init__(self, schema, statistical_agent)

Purpose: Internal method: init

Parameters:

  • schema: Type: DatabaseSchema
  • statistical_agent: Parameter

Returns: None

_load_dynamic_schema_info(self)

Purpose: Load comprehensive schema information - now uses enhanced schema data

Returns: None

_load_from_comprehensive_schema(self)

Purpose: Load schema information from comprehensive schema data

Returns: None

_infer_business_purpose_from_category(self, table_name, category) -> str

Purpose: Infer business purpose from table category and name

Parameters:

  • table_name: Type: str
  • category: Type: str

Returns: Returns str

_load_table_descriptions(self)

Purpose: Load table descriptions from comprehensive database schema

Returns: None

_infer_business_purpose_from_name(self, table_name) -> str

Purpose: Infer business purpose from table name only

Parameters:

  • table_name: Type: str

Returns: Returns str

_infer_business_purpose(self, table_name, description) -> str

Purpose: Infer business purpose from table name and description

Parameters:

  • table_name: Type: str
  • description: Type: str

Returns: Returns str

_load_detailed_schema_from_markdown(self)

Purpose: Load detailed schema from markdown reference files

Returns: None

_parse_markdown_schema(self, schema_file_path)

Purpose: Parse markdown schema file to extract table and column information

Parameters:

  • schema_file_path: Parameter

Returns: None

_enhance_with_schema_categories(self)

Purpose: Enhance table information with categories from main schema

Returns: None

_extract_table_relationships(self)

Purpose: Extract table relationships from schema information

Returns: None

_find_related_tables(self, table_name) -> List[str]

Purpose: Find tables that are likely related to the given table

Parameters:

  • table_name: Type: str

Returns: Returns List[str]

_initialize_common_patterns(self)

Purpose: Initialize common query patterns for different types of analysis

Returns: None

_load_detailed_schema(self)

Purpose: Load detailed schema information from database reference files if available

Returns: None

_parse_poulpharm_detailed_schema(self)

Purpose: Parse detailed schema from Poulpharm LIMS database reference files

Returns: None

_parse_markdown_schema(self, schema_file_path)

Purpose: Parse table and column information from markdown schema file

Parameters:

  • schema_file_path: Parameter

Returns: None

_load_detailed_database_schema(self)

Purpose: Load detailed database schema from JSON files

Returns: None

_setup_fallback_poulpharm_schema(self)

Purpose: Setup fallback - this should not be used when dynamic schema is available

Returns: None

get_best_name_column(self, table_name) -> str

Purpose: Get the best available name column for a table based on dynamic schema

Parameters:

  • table_name: Type: str

Returns: Returns str

validate_column_exists(self, table_name, column_name) -> bool

Purpose: Check if a column actually exists in the dynamic schema

Parameters:

  • table_name: Type: str
  • column_name: Type: str

Returns: Returns bool

get_safe_column_reference(self, table_name, preferred_column, fallback_column) -> str

Purpose: Get a safe column reference, falling back if the preferred column doesn't exist

Parameters:

  • table_name: Type: str
  • preferred_column: Type: str
  • fallback_column: Type: str

Returns: Returns str

_setup_poulpharm_optimizations(self)

Purpose: Setup Poulpharm LIMS specific query optimizations

Returns: None

_detect_query_intent(self, user_query) -> Dict[str, Any]

Purpose: Detect user query intent and suggest appropriate tables/approach

Parameters:

  • user_query: Type: str

Returns: Returns Dict[str, Any]

_format_top_clause(self, max_rows) -> str

Purpose: Helper method to format TOP clause for SQL queries

Parameters:

  • max_rows: Parameter

Returns: Returns str

generate_sql_query(self, user_query, max_rows, model) -> Tuple[str, Dict[str, Any]]

Purpose: Generate SQL query based on user analysis request Returns: (sql_query, metadata)

Parameters:

  • user_query: Type: str
  • max_rows: Type: Optional[int]
  • model: Type: str

Returns: Returns Tuple[str, Dict[str, Any]]

_prepare_database_context(self) -> str

Purpose: Prepare comprehensive database schema context using dynamic schema information

Returns: Returns str

_validate_generated_sql(self, sql_query) -> Tuple[bool, List[str]]

Purpose: Validate generated SQL against known schema

Parameters:

  • sql_query: Type: str

Returns: Returns Tuple[bool, List[str]]

_create_sql_generation_prompt(self, user_query, db_context, max_rows, intent) -> str

Purpose: Create prompt for SQL generation with intent guidance

Parameters:

  • user_query: Type: str
  • db_context: Type: str
  • max_rows: Type: int
  • intent: Type: Dict[str, Any]

Returns: Returns str

_analyze_user_query(self, user_query) -> Dict[str, str]

Purpose: Analyze user query to provide adaptive guidance

Parameters:

  • user_query: Type: str

Returns: Returns Dict[str, str]

_generate_with_llm(self, prompt, model) -> str

Purpose: Generate SQL using the statistical agent's LLM

Parameters:

  • prompt: Type: str
  • model: Type: str

Returns: Returns str

_get_poulpharm_pattern_hints(self, prompt) -> str

Purpose: Get Poulpharm-specific hints based on detected patterns

Parameters:

  • prompt: Type: str

Returns: Returns str

_parse_llm_response(self, response) -> Tuple[str, str]

Purpose: Parse LLM response to extract SQL query and explanation

Parameters:

  • response: Type: str

Returns: Returns Tuple[str, str]

_generate_basic_sql(self, user_query, max_rows) -> Tuple[str, str]

Purpose: Fallback basic SQL generation without LLM using structured approach

Parameters:

  • user_query: Type: str
  • max_rows: Type: Optional[int]

Returns: Returns Tuple[str, str]

_generate_customer_stats_sql(self, max_rows, simple_version) -> Tuple[str, str]

Purpose: Generate customer statistics SQL using exact schema - optimized to avoid empty results

Parameters:

  • max_rows: Type: Optional[int]
  • simple_version: Type: bool

Returns: Returns Tuple[str, str]

_generate_simple_customer_stats_sql(self, max_rows) -> Tuple[str, str]

Purpose: Generate simple customer statistics SQL without Companies table dependency

Parameters:

  • max_rows: Type: Optional[int]

Returns: Returns Tuple[str, str]

generate_diagnostic_query(self, original_query, intent) -> Tuple[str, str]

Purpose: Generate diagnostic query to understand why main query returned no results

Parameters:

  • original_query: Type: str
  • intent: Type: Dict[str, Any]

Returns: Returns Tuple[str, str]

_generate_customer_stats_diagnostic(self) -> Tuple[str, str]

Purpose: Generate diagnostic query for customer statistics

Returns: Returns Tuple[str, str]

_generate_bacteriology_diagnostic(self) -> Tuple[str, str]

Purpose: Generate diagnostic query for bacteriology

Returns: Returns Tuple[str, str]

_generate_generic_diagnostic(self) -> Tuple[str, str]

Purpose: Generate generic diagnostic query

Returns: Returns Tuple[str, str]

_generate_bacteriology_sql(self, max_rows) -> Tuple[str, str]

Purpose: Generate bacteriology SQL using exact schema

Parameters:

  • max_rows: Type: Optional[int]

Returns: Returns Tuple[str, str]

_generate_sample_workflow_sql(self, max_rows) -> Tuple[str, str]

Purpose: Generate sample workflow SQL using exact schema

Parameters:

  • max_rows: Type: Optional[int]

Returns: Returns Tuple[str, str]

_generate_generic_sql(self, max_rows) -> Tuple[str, str]

Purpose: Generate generic SQL for unknown query types

Parameters:

  • max_rows: Type: Optional[int]

Returns: Returns Tuple[str, str]

Required Imports

import json
import logging
import os
from typing import Dict
from typing import List

Usage Example

# Example usage:
# result = SQLQueryGenerator(bases)

Similar Components

AI-powered semantic similarity - components with related functionality:

  • class SQLQueryGenerator_v1 97.8% similar

    Generates SQL queries based on user requests and database schema

    From: /tf/active/vicechatdev/smartstat/sql_query_generator.py
  • class SqlGenerationResult 60.3% similar

    A dataclass that encapsulates the results of an SQL query generation operation, including the generated query, explanation, confidence score, and metadata about database objects used.

    From: /tf/active/vicechatdev/full_smartstat/two_pass_sql_workflow.py
  • function main_v61 56.3% similar

    Demonstrates a SmartStat SQL workflow by loading a database schema, initializing a SQL query generator, and generating SQL queries from natural language requests with detailed output and metadata.

    From: /tf/active/vicechatdev/full_smartstat/demo_sql_workflow.py
  • class QueryIteration 52.3% similar

    A dataclass representing a single iteration in an iterative SQL query generation and evaluation workflow, capturing the query, its results, assessment, and improvement suggestions.

    From: /tf/active/vicechatdev/full_smartstat/enhanced_sql_workflow.py
  • class DatabaseSchema 50.7% similar

    A dataclass that represents comprehensive database schema information, including table structures, columns, relationships, and categorizations for SQL database introspection and query generation.

    From: /tf/active/vicechatdev/full_smartstat/sql_query_generator.py
← Back to Browse