šŸ” Code Extractor

class DynamicSchemaDiscovery

Maturity: 26

Discovers database schema from live database connection

File:
/tf/active/vicechatdev/full_smartstat/dynamic_schema_discovery.py
Lines:
69 - 1251
Complexity:
moderate

Purpose

Discovers database schema from live database connection

Source Code

class DynamicSchemaDiscovery:
    """Discovers database schema from live database connection"""
    
    def __init__(self, data_processor, statistical_agent=None):
        """Initialize with a data processor that has database connection and optional LLM agent"""
        self.data_processor = data_processor
        self.statistical_agent = statistical_agent
        self.cache_file = Path(__file__).parent / 'discovered_schema_cache.json'
        self.descriptions_cache_file = Path(__file__).parent / 'table_descriptions_cache.json'
        self.cache_duration_hours = 24  # Refresh cache every 24 hours for more stability
        self.descriptions_cache_duration_hours = 72  # Table descriptions cache for 3 days
        
    def discover_schema(self, force_refresh: bool = False) -> DatabaseInfo:
        """Discover complete database schema from live connection"""
        
        # Check if we have a recent cached version
        if not force_refresh and self._has_valid_cache():
            logger.info("Using cached schema discovery results")
            return self._load_cached_schema()
        
        logger.info("Starting dynamic schema discovery from live database...")
        
        try:
            # Get database info
            db_info = self._get_database_info()
            
            # Discover all tables
            tables = self._discover_tables()
            
            # Discover relationships
            relationships = self._discover_relationships(tables)
            
            # Calculate totals
            total_columns = sum(len(table.columns) for table in tables)
            total_rows = sum(table.row_count for table in tables)
            
            schema_info = DatabaseInfo(
                database_name=db_info['database_name'],
                server_name=db_info['server_name'],
                discovery_timestamp=datetime.now().isoformat(),
                total_tables=len(tables),
                total_columns=total_columns,
                total_rows=total_rows,
                tables=tables,
                relationships=relationships
            )
            
            # Cache the results
            self._cache_schema(schema_info)
            
            logger.info(f"Schema discovery completed: {len(tables)} tables, {total_columns} columns, {total_rows} total rows")
            return schema_info
            
        except Exception as e:
            logger.error(f"Error during schema discovery: {str(e)}")
            # If discovery fails but we have cached data, use it
            if self.cache_file.exists():
                logger.warning("Using cached schema due to discovery error")
                return self._load_cached_schema()
            raise
    
    def _get_database_info(self) -> Dict[str, str]:
        """Get basic database information"""
        query = """
        SELECT 
            DB_NAME() as database_name,
            @@SERVERNAME as server_name
        """
        
        result = self.data_processor.execute_query(query)
        if result is not None and len(result) > 0:
            return {
                'database_name': result.iloc[0]['database_name'],
                'server_name': result.iloc[0]['server_name']
            }
        return {'database_name': 'Unknown', 'server_name': 'Unknown'}
    
    def _discover_tables(self) -> List[TableInfo]:
        """Discover all tables and their detailed information"""
        tables = []
        
        # Get list of all tables
        tables_query = """
        SELECT 
            t.TABLE_SCHEMA,
            t.TABLE_NAME,
            t.TABLE_TYPE,
            ISNULL(p.rows, 0) as row_count
        FROM INFORMATION_SCHEMA.TABLES t
        LEFT JOIN (
            SELECT 
                s.name as schema_name,
                o.name as table_name,
                SUM(p.rows) as rows
            FROM sys.objects o
            JOIN sys.schemas s ON o.schema_id = s.schema_id
            JOIN sys.partitions p ON o.object_id = p.object_id
            WHERE o.type = 'U'
            GROUP BY s.name, o.name
        ) p ON t.TABLE_SCHEMA = p.schema_name AND t.TABLE_NAME = p.table_name
        WHERE t.TABLE_TYPE = 'BASE TABLE'
        ORDER BY t.TABLE_NAME
        """
        
        tables_df = self.data_processor.execute_query(tables_query)
        if tables_df is None:
            logger.error("Could not retrieve table list")
            return []
        
        logger.info(f"Discovered {len(tables_df)} tables, getting detailed information...")
        
        for _, table_row in tables_df.iterrows():
            table_name = table_row['TABLE_NAME']
            table_schema = table_row['TABLE_SCHEMA']
            table_type = table_row['TABLE_TYPE']
            row_count = int(table_row['row_count']) if pd.notna(table_row['row_count']) else 0
            
            # Get columns for this table
            columns = self._get_table_columns(table_name, table_schema)
            
            # Get primary keys
            primary_keys = self._get_primary_keys(table_name, table_schema)
            
            # Get foreign keys
            foreign_keys = self._get_foreign_keys(table_name, table_schema)
            
            # Get table description if available
            description = self._get_table_description(table_name, table_schema)
            
            # Calculate a simple quality score based on available data
            quality_score = self._calculate_simple_quality_score(row_count, columns)
            
            table_info = TableInfo(
                name=table_name,
                schema=table_schema,
                table_type=table_type,
                row_count=row_count,
                columns=columns,
                primary_keys=primary_keys,
                foreign_keys=foreign_keys,
                description=description,
                data_quality_score=quality_score
            )
            
            tables.append(table_info)
            
            if len(tables) % 10 == 0:
                logger.info(f"Processed {len(tables)}/{len(tables_df)} tables...")
        
        return tables
    
    def _get_table_columns(self, table_name: str, schema_name: str) -> List[Dict[str, Any]]:
        """Get detailed column information for a table"""
        columns_query = f"""
        SELECT 
            c.COLUMN_NAME,
            c.DATA_TYPE,
            c.IS_NULLABLE,
            c.COLUMN_DEFAULT,
            c.CHARACTER_MAXIMUM_LENGTH,
            c.NUMERIC_PRECISION,
            c.NUMERIC_SCALE,
            c.ORDINAL_POSITION
        FROM INFORMATION_SCHEMA.COLUMNS c
        WHERE c.TABLE_NAME = '{table_name}' AND c.TABLE_SCHEMA = '{schema_name}'
        ORDER BY c.ORDINAL_POSITION
        """
        
        try:
            columns_df = self.data_processor.execute_query(columns_query)
            if columns_df is None:
                return []
            
            columns = []
            for _, col_row in columns_df.iterrows():
                column_info = {
                    'name': col_row['COLUMN_NAME'],
                    'data_type': col_row['DATA_TYPE'],
                    'is_nullable': col_row['IS_NULLABLE'] == 'YES',
                    'default_value': col_row['COLUMN_DEFAULT'],
                    'max_length': int(col_row['CHARACTER_MAXIMUM_LENGTH']) if pd.notna(col_row['CHARACTER_MAXIMUM_LENGTH']) else None,
                    'precision': int(col_row['NUMERIC_PRECISION']) if pd.notna(col_row['NUMERIC_PRECISION']) else None,
                    'scale': int(col_row['NUMERIC_SCALE']) if pd.notna(col_row['NUMERIC_SCALE']) else None,
                    'position': int(col_row['ORDINAL_POSITION']),
                    'is_primary_key': False,  # Will be updated separately
                    'is_foreign_key': False,  # Will be updated separately
                    'referenced_table': None,
                    'referenced_column': None
                }
                columns.append(column_info)
            
            return columns
            
        except Exception as e:
            logger.error(f"Error getting columns for table {table_name}: {str(e)}")
            return []
    
    def _get_primary_keys(self, table_name: str, schema_name: str) -> List[str]:
        """Get primary key columns for a table"""
        pk_query = f"""
        SELECT ku.COLUMN_NAME
        FROM INFORMATION_SCHEMA.TABLE_CONSTRAINTS tc
        JOIN INFORMATION_SCHEMA.KEY_COLUMN_USAGE ku 
            ON tc.CONSTRAINT_NAME = ku.CONSTRAINT_NAME
        WHERE tc.CONSTRAINT_TYPE = 'PRIMARY KEY'
            AND tc.TABLE_NAME = '{table_name}' AND tc.TABLE_SCHEMA = '{schema_name}'
        ORDER BY ku.ORDINAL_POSITION
        """
        
        try:
            pk_df = self.data_processor.execute_query(pk_query)
            if pk_df is not None and len(pk_df) > 0:
                return pk_df['COLUMN_NAME'].tolist()
            return []
            
        except Exception as e:
            logger.error(f"Error getting primary keys for table {table_name}: {str(e)}")
            return []
    
    def _get_foreign_keys(self, table_name: str, schema_name: str) -> List[Dict[str, str]]:
        """Get foreign key relationships for a table"""
        # Simplified approach - just get basic foreign key info without complex joins
        try:
            # For now, return empty list - we'll get relationships at the database level
            return []
            
        except Exception as e:
            logger.error(f"Error getting foreign keys for table {table_name}: {str(e)}")
            return []
    
    def _get_table_description(self, table_name: str, schema_name: str) -> Optional[str]:
        """Get table description using LLM analysis with schema and data samples"""
        try:
            # Check cache first
            cached_description = self._get_cached_table_description(table_name)
            if cached_description:
                return cached_description
            
            # Generate new description using LLM if available
            if self.statistical_agent:
                description = self._generate_llm_table_description(table_name, schema_name)
                if description:
                    # Cache the generated description
                    self._cache_table_description(table_name, description)
                    return description
            
            # Fallback to business logic analysis
            description = self._analyze_table_business_logic(table_name)
            return description
            
        except Exception as e:
            logger.error(f"Error getting table description for {table_name}: {e}")
            # Default fallback
        return f"Database table: {table_name}"
    
    def _generate_llm_table_description(self, table_name: str, schema_name: str) -> Optional[str]:
        """Generate table description using LLM with schema and data sample"""
        try:
            # Get table schema details
            table_schema = self._get_table_schema_details(table_name, schema_name)
            if not table_schema:
                return None
            
            # Get data sample (first 50 rows)
            data_sample = self._get_table_data_sample(table_name, schema_name, sample_size=50)
            
            # Create LLM prompt
            prompt = self._create_table_description_prompt(table_name, table_schema, data_sample)
            
            # Query LLM for description
            logger.info(f"Generating LLM-based description for table: {table_name}")
            response = self.statistical_agent.query_llm(prompt, model='gpt-4o-mini')  # Use smaller model for efficiency
            
            # Extract and clean the description
            description = self._extract_description_from_response(response)
            
            logger.info(f"Generated LLM description for {table_name}: {description[:100]}...")
            return description
            
        except Exception as e:
            logger.error(f"Error generating LLM description for table {table_name}: {e}")
            return None
    
    def _get_table_schema_details(self, table_name: str, schema_name: str) -> Optional[Dict]:
        """Get detailed schema information for a table"""
        try:
            schema_query = """
            SELECT 
                c.COLUMN_NAME,
                c.DATA_TYPE,
                c.IS_NULLABLE,
                c.COLUMN_DEFAULT,
                c.CHARACTER_MAXIMUM_LENGTH,
                c.NUMERIC_PRECISION,
                c.NUMERIC_SCALE,
                CASE WHEN pk.COLUMN_NAME IS NOT NULL THEN 1 ELSE 0 END as IS_PRIMARY_KEY,
                CASE WHEN fk.COLUMN_NAME IS NOT NULL THEN 1 ELSE 0 END as IS_FOREIGN_KEY
            FROM INFORMATION_SCHEMA.COLUMNS c
            LEFT JOIN INFORMATION_SCHEMA.KEY_COLUMN_USAGE pk 
                ON c.TABLE_NAME = pk.TABLE_NAME 
                AND c.COLUMN_NAME = pk.COLUMN_NAME 
                AND pk.CONSTRAINT_NAME LIKE 'PK_%'
            LEFT JOIN INFORMATION_SCHEMA.KEY_COLUMN_USAGE fk 
                ON c.TABLE_NAME = fk.TABLE_NAME 
                AND c.COLUMN_NAME = fk.COLUMN_NAME 
                AND fk.CONSTRAINT_NAME LIKE 'FK_%'
            WHERE c.TABLE_NAME = :table_name AND c.TABLE_SCHEMA = :schema_name
            ORDER BY c.ORDINAL_POSITION
            """
            
            result = self.data_processor.execute_query(schema_query, params={'table_name': table_name, 'schema_name': schema_name})
            if result is not None and len(result) > 0:
                return {
                    'columns': result.to_dict('records'),
                    'total_columns': len(result)
                }
            return None
            
        except Exception as e:
            logger.error(f"Error getting schema details for {table_name}: {e}")
            return None
    
    def _get_table_data_sample(self, table_name: str, schema_name: str, sample_size: int = 50) -> Optional[pd.DataFrame]:
        """Get a sample of data from the table"""
        try:
            # Use a safe approach to get sample data
            sample_query = f"""
            SELECT TOP {sample_size} *
            FROM [{schema_name}].[{table_name}]
            """
            
            result = self.data_processor.execute_query(sample_query)
            if result is not None and len(result) > 0:
                return result
            return None
            
        except Exception as e:
            logger.error(f"Error getting data sample for {table_name}: {e}")
            return None
    
    def _create_table_description_prompt(self, table_name: str, table_schema: Dict, data_sample: Optional[pd.DataFrame]) -> str:
        """Create a prompt for LLM to generate table description"""
        
        # Build schema information
        schema_info = f"Table: {table_name}\n"
        schema_info += "Columns:\n"
        
        for col in table_schema['columns']:
            column_info = f"- {col['COLUMN_NAME']} ({col['DATA_TYPE']}"
            if col['CHARACTER_MAXIMUM_LENGTH']:
                column_info += f"({col['CHARACTER_MAXIMUM_LENGTH']})"
            elif col['NUMERIC_PRECISION']:
                column_info += f"({col['NUMERIC_PRECISION']},{col['NUMERIC_SCALE'] or 0})"
            column_info += ")"
            
            if col['IS_PRIMARY_KEY']:
                column_info += " [PRIMARY KEY]"
            if col['IS_FOREIGN_KEY']:
                column_info += " [FOREIGN KEY]"
            if col['IS_NULLABLE'] == 'NO':
                column_info += " [NOT NULL]"
                
            schema_info += column_info + "\n"
        
        # Build data sample information
        data_info = ""
        if data_sample is not None and len(data_sample) > 0:
            data_info = f"\nSample Data (first {len(data_sample)} rows):\n"
            
            # Show representative data for key columns (limit to prevent huge prompts)
            key_columns = []
            for col in table_schema['columns']:
                if col['IS_PRIMARY_KEY'] or 'name' in col['COLUMN_NAME'].lower() or 'id' in col['COLUMN_NAME'].lower():
                    key_columns.append(col['COLUMN_NAME'])
            
            # Limit to max 8 columns for the sample
            display_columns = key_columns[:4] + [col for col in data_sample.columns if col not in key_columns][:4]
            
            for col in display_columns:
                if col in data_sample.columns:
                    unique_values = data_sample[col].dropna().unique()[:10]  # Show max 10 unique values
                    data_info += f"- {col}: {', '.join(str(v) for v in unique_values)}\n"
        
        # Create the prompt
        prompt = f"""
You are analyzing a database table from a Laboratory Information Management System (LIMS). 
This system manages laboratory testing data, customer information, sample tracking, and result reporting for a veterinary/medical diagnostic laboratory.

Please analyze the following table and provide a concise, informative description of what this table contains and its purpose in the laboratory workflow.

{schema_info}
{data_info}

CONTEXT: This is a laboratory management database that handles:
- Test requests from customers (veterinarians, clinics)
- Sample tracking and management
- Laboratory analysis and testing
- Result reporting and validation
- Customer and billing information
- Quality control and compliance

Please provide a single paragraph (2-4 sentences) describing:
1. What type of data this table stores
2. Its role in the laboratory workflow
3. Key relationships or dependencies with other data

Keep the description professional, concise, and focused on the business purpose.
"""
        
        return prompt
    
    def _extract_description_from_response(self, response: str) -> str:
        """Extract and clean the description from LLM response"""
        # Clean up the response
        description = response.strip()
        
        # Remove any markdown formatting
        description = description.replace('**', '').replace('*', '')
        
        # Remove any prompt echoes or prefixes
        common_prefixes = [
            "This table",
            "The table",
            "Based on the analysis",
            "Description:",
            "Table description:"
        ]
        
        for prefix in common_prefixes:
            if description.lower().startswith(prefix.lower()):
                description = description[len(prefix):].strip()
        
        # Ensure it starts with a capital letter
        if description and not description[0].isupper():
            description = description[0].upper() + description[1:]
        
        # Limit length to reasonable size
        if len(description) > 500:
            description = description[:500] + "..."
        
        return description
    
    def _analyze_table_business_logic(self, table_name: str) -> str:
        """Analyze table's business purpose based on name and categorize it"""
        table_lower = table_name.lower()
        
        # Core Laboratory Operations
        if table_lower == 'requests':
            return "Laboratory test requests from customers/veterinarians - entry point for all testing"
        elif table_lower == 'samples':
            return "Individual samples linked to requests - physical specimens for testing"
        elif table_lower == 'results':
            return "Laboratory test results with validation workflow - final analytical outcomes"
        elif table_lower == 'analyses':
            return "Types of laboratory analyses/tests available - defines what can be tested"
        elif table_lower == 'sampleanalysegroups':
            return "Groups of analyses to be performed on samples - organizes testing workflow"
        elif table_lower == 'analysisgroups':
            return "Categories/groups of related analyses - organizes test types"
        
        # Customer Management
        elif table_lower in ['companies', 'accounts']:
            return "Customer/company information - entities that submit testing requests"
        elif 'dierenarts' in table_lower:
            return "Veterinarian information - medical professionals submitting samples"
        elif 'praktijk' in table_lower:
            return "Veterinary practice information - clinic/office details"
        
        # Analysis Configuration
        elif table_lower == 'analysiscategories':
            return "Categories for organizing different types of analyses"
        elif table_lower == 'sampletypes':
            return "Types of samples that can be tested (blood, urine, tissue, etc.)"
        elif table_lower == 'posnegs':
            return "Positive/negative result definitions and interpretations"
        
        # Specialized Testing Modules
        elif any(x in table_lower for x in ['bact', 'bacteriology']):
            return "Bacteriology testing results and bacterial identification data"
        elif 'pcr' in table_lower:
            return "PCR (molecular) testing results and genetic analysis data"
        elif 'serolog' in table_lower:
            return "Serology testing results - antibody and immune response analysis"
        elif 'antibiogram' in table_lower:
            return "Antibiotic sensitivity testing results - bacterial resistance patterns"
        elif 'parasitolog' in table_lower:
            return "Parasitology testing results - parasite identification and analysis"
        
        # Result Processing
        elif 'resultaat' in table_lower or 'result' in table_lower:
            return "Laboratory result data - contains analytical findings and measurements"
        elif 'uitsla' in table_lower or 'uitslag' in table_lower:
            return "Test outcome/result reporting - formatted results for customers"
        
        # System Management
        elif any(x in table_lower for x in ['user', 'gebruiker']):
            return "System user accounts and authentication information"
        elif any(x in table_lower for x in ['role', 'permission', 'autoris']):
            return "User roles and system permissions - access control"
        elif any(x in table_lower for x in ['audit', 'log']):
            return "System audit trail and logging information"
        elif 'configuratie' in table_lower or 'config' in table_lower:
            return "System configuration settings and parameters"
        
        # Reporting
        elif 'report' in table_lower or 'rapport' in table_lower:
            return "Report templates and formatting configurations"
        elif 'template' in table_lower or 'sjabloon' in table_lower:
            return "Document and report templates"
        
        # Quality Control
        elif any(x in table_lower for x in ['qc', 'quality', 'kwaliteit']):
            return "Quality control data and measurements"
        elif 'validat' in table_lower:
            return "Validation workflow and approval tracking"
        
        # Inventory/Equipment
        elif any(x in table_lower for x in ['apparaat', 'device', 'equipment']):
            return "Laboratory equipment and device information"
        elif any(x in table_lower for x in ['reagens', 'reagent', 'kit']):
            return "Laboratory reagents and testing kit information"
        
        # Administrative
        elif any(x in table_lower for x in ['invoice', 'factuur', 'billing']):
            return "Billing and invoicing information"
        elif any(x in table_lower for x in ['admin', 'beheer']):
            return "Administrative data and system management"
        
        # Lookup/Reference Tables
        elif table_lower.startswith('tbl') or table_lower.startswith('lut') or 'lookup' in table_lower:
            return f"Reference/lookup table - provides standardized values for {table_name.replace('tbl', '').replace('lut', '')}"
        
        # Default categorization
        else:
            # Try to infer from common patterns
            if any(x in table_lower for x in ['type', 'category', 'groep', 'group']):
                return f"Classification/categorization table - defines types or groups for {table_name}"
            elif table_lower.endswith('s') and len(table_lower) > 3:
                return f"Data table containing {table_lower} records"
            else:
                return f"Database table: {table_name} - purpose needs investigation"
    
    def _discover_relationships(self, tables: List[TableInfo]) -> List[Dict[str, Any]]:
        """Discover relationships using column name patterns and data analysis"""
        relationships = []
        
        try:
            logger.info("Starting intelligent relationship discovery...")
            
            # Load manual relationships first
            manual_manager = get_manual_relationship_manager()
            manual_relationships = manual_manager.get_all_relationships()
            
            if manual_relationships:
                logger.info(f"Loading {len(manual_relationships)} manual relationships")
                relationships.extend(manual_relationships)
            else:
                logger.info("No manual relationships found")
            
            # Convert tables to dictionary for easier lookup
            all_tables = {table.name: table for table in tables}
            column_patterns = {}
            
            # First pass: catalog all columns and identify potential keys
            for table in tables:
                table_name = table.name
                
                # Use existing table column information
                if table.columns:
                    # Identify potential key columns
                    for col_info in table.columns:
                        if isinstance(col_info, dict):
                            col_name = col_info.get('name', col_info.get('COLUMN_NAME', ''))
                        else:
                            col_name = str(col_info)
                        
                        if col_name:
                            col_lower = col_name.lower()
                            
                            # Track column patterns for relationship detection
                            if col_lower not in column_patterns:
                                column_patterns[col_lower] = []
                            column_patterns[col_lower].append({
                                'table': table_name,
                                'column': col_name,
                                'is_identity': col_name.lower() == 'id' or 'id' in col_name.lower(),
                                'data_type': col_info.get('data_type', col_info.get('DATA_TYPE', 'unknown')) if isinstance(col_info, dict) else 'unknown'
                            })
            
            # Second pass: find relationships based on column patterns AND data content
            logger.info(f"Analyzing {len(column_patterns)} column patterns for relationships...")
            
            # Also look for cross-table data relationships (even with different column names)
            relationships.extend(self._find_cross_table_relationships(tables))
            
            for col_pattern, occurrences in column_patterns.items():
                if len(occurrences) > 1:  # Column appears in multiple tables
                    logger.debug(f"Analyzing pattern '{col_pattern}' with {len(occurrences)} occurrences")
                    
                    # Look for ID patterns
                    if col_pattern.endswith('id') or col_pattern == 'id':
                        identity_tables = [occ for occ in occurrences if occ['is_identity']]
                        non_identity_tables = [occ for occ in occurrences if not occ['is_identity']]
                        
                        # Create relationships from non-identity to identity tables with data validation
                        for identity_table in identity_tables:
                            for non_identity_table in non_identity_tables:
                                if identity_table['table'] != non_identity_table['table']:
                                    # Validate relationship with data sampling
                                    confidence = self._validate_relationship_with_data(
                                        non_identity_table['table'], non_identity_table['column'],
                                        identity_table['table'], identity_table['column']
                                    )
                                    
                                    if confidence > 0.1:  # Only include if data shows some relationship
                                        relationship = {
                                            'from_table': non_identity_table['table'],
                                            'from_column': non_identity_table['column'],
                                            'to_table': identity_table['table'],
                                            'to_column': identity_table['column'],
                                            'type': 'foreign_key',
                                            'confidence': confidence,
                                            'detection_method': 'column_pattern_and_data'
                                        }
                                        relationships.append(relationship)
                                        logger.info(f"Found relationship: {non_identity_table['table']}.{non_identity_table['column']} → {identity_table['table']}.{identity_table['column']} (confidence: {confidence:.2f})")
                    
                    # Look for common naming patterns like TableName + ID with data validation
                    elif 'id' in col_pattern:
                        # Extract table name from column (e.g., 'sample_request' -> 'request')
                        potential_table_name = col_pattern.replace('id', '').replace('_', '').strip()
                        
                        # Find matching table names
                        for table_name in all_tables.keys():
                            table_lower = table_name.lower().replace('_', '')
                            if (potential_table_name in table_lower or 
                                table_lower in potential_table_name or
                                potential_table_name == table_lower):
                                
                                # Find ID column in target table
                                target_table_info = all_tables[table_name]
                                target_id_columns = [col for col in target_table_info.columns 
                                                   if isinstance(col, dict) and 
                                                   'id' in col.get('name', '').lower()]
                                
                                if target_id_columns:
                                    target_column = target_id_columns[0].get('name', 'Id')
                                    
                                    # Test all occurrences of this pattern
                                    for occ in occurrences:
                                        if occ['table'] != table_name:
                                            # Validate with data sampling
                                            confidence = self._validate_relationship_with_data(
                                                occ['table'], occ['column'],
                                                table_name, target_column
                                            )
                                            
                                            if confidence > 0.1:
                                                relationship = {
                                                    'from_table': occ['table'],
                                                    'from_column': occ['column'],
                                                    'to_table': table_name,
                                                    'to_column': target_column,
                                                    'type': 'foreign_key',
                                                    'confidence': confidence,
                                                    'detection_method': 'table_name_pattern_and_data'
                                                }
                                                relationships.append(relationship)
            
            # Third pass: Enhanced underscore pattern detection for missed relationships
            logger.info("Analyzing underscore patterns for foreign key relationships...")
            underscore_relationships = self._discover_underscore_relationships(tables)
            relationships.extend(underscore_relationships)
            
            logger.info(f"Discovered {len(relationships)} potential relationships")
            return relationships
            
        except Exception as e:
            logger.error(f"Error discovering relationships: {str(e)}")
            return []
    
    def _validate_relationship_with_data(self, from_table: str, from_column: str, 
                                       to_table: str, to_column: str) -> float:
        """Validate potential relationship by analyzing actual data content"""
        try:
            # Use larger sample and get more strategic sampling
            sample_size = 500
            
            # Get non-zero values from source table (many FKs have 0 for NULL)
            from_query = f"""
            SELECT TOP {sample_size} [{from_column}] 
            FROM [{from_table}] 
            WHERE [{from_column}] IS NOT NULL 
                AND [{from_column}] != 0
            ORDER BY NEWID()
            """
            
            # Get all unique values from target table (typically smaller set)
            to_query = f"""
            SELECT DISTINCT [{to_column}] 
            FROM [{to_table}] 
            WHERE [{to_column}] IS NOT NULL
            """
            
            from_df = self.data_processor.execute_query(from_query)
            to_df = self.data_processor.execute_query(to_query)
            
            if from_df.empty or to_df.empty:
                return 0.0
            
            # Get unique values from both columns
            from_values = set(from_df[from_column].dropna().astype(str))
            to_values = set(to_df[to_column].dropna().astype(str))
            
            if not from_values or not to_values:
                return 0.0
            
            # Calculate overlap percentage
            intersection = from_values.intersection(to_values)
            overlap_ratio = len(intersection) / len(from_values) if from_values else 0
            
            # Check if target values are subset of from values (reverse relationship)
            reverse_overlap = len(intersection) / len(to_values) if to_values else 0
            
            # Use the better overlap ratio
            best_overlap = max(overlap_ratio, reverse_overlap)
            confidence = best_overlap
            
            # Boost confidence based on overlap quality
            if best_overlap > 0.5:
                confidence = min(0.95, best_overlap + 0.2)
            elif best_overlap > 0.2:
                confidence = min(0.8, best_overlap + 0.15)
            elif best_overlap > 0.05:
                confidence = min(0.6, best_overlap + 0.1)
            
            # Extra boost for known LIMS table patterns
            from_lower = from_table.lower()
            to_lower = to_table.lower()
            
            if (('sample' in from_lower and 'request' in to_lower) or
                ('result' in from_lower and 'sample' in to_lower) or
                ('result' in from_lower and 'analys' in to_lower)):
                confidence = min(0.95, confidence + 0.1)
            
            # Penalize if no meaningful overlap found
            if len(intersection) == 0:
                confidence = 0.0
            elif len(intersection) < 3:  # Very few matches might be coincidental
                confidence = confidence * 0.5
            
            logger.debug(f"Data validation {from_table}.{from_column} → {to_table}.{to_column}: "
                        f"{len(intersection)} matches, {overlap_ratio:.2f} forward, {reverse_overlap:.2f} reverse, confidence: {confidence:.2f}")
            
            return confidence
            
        except Exception as e:
            logger.warning(f"Could not validate relationship {from_table}.{from_column} → {to_table}.{to_column}: {e}")
            return 0.0
    
    def _find_cross_table_relationships(self, tables: List[TableInfo]) -> List[Dict[str, Any]]:
        """Find relationships by analyzing data content across tables, even with different column names"""
        relationships = []
        
        try:
            # Focus on high-volume tables that are likely to contain main data
            main_tables = [t for t in tables if t.row_count > 1000]
            logger.info(f"Analyzing cross-table relationships for {len(main_tables)} high-volume tables...")
            
            # Known LIMS table patterns to prioritize
            lims_patterns = {
                'requests': ['request', 'req'],
                'samples': ['sample', 'samp'],
                'results': ['result', 'res'],
                'analyses': ['analys', 'analysis', 'test']
            }
            
            # Find tables matching LIMS patterns
            lims_tables = {}
            for category, patterns in lims_patterns.items():
                for table in main_tables:
                    table_lower = table.name.lower()
                    if any(pattern in table_lower for pattern in patterns):
                        if category not in lims_tables:
                            lims_tables[category] = []
                        lims_tables[category].append(table)
            
            # Test known LIMS relationships with data validation
            test_relationships = [
                ('requests', 'samples', ['id'], ['sample_request', 'request_id', 'requestid']),
                ('samples', 'results', ['id'], ['result_sample', 'sample_id', 'sampleid']),
                ('results', 'analyses', ['result_analysis', 'analysis_id'], ['id']),
            ]
            
            for from_category, to_category, from_cols, to_cols in test_relationships:
                if from_category in lims_tables and to_category in lims_tables:
                    for from_table in lims_tables[from_category]:
                        for to_table in lims_tables[to_category]:
                            # Try different column combinations
                            for from_col_pattern in from_cols:
                                for to_col_pattern in to_cols:
                                    from_col = self._find_matching_column(from_table, from_col_pattern)
                                    to_col = self._find_matching_column(to_table, to_col_pattern)
                                    
                                    if from_col and to_col:
                                        confidence = self._validate_relationship_with_data(
                                            from_table.name, from_col,
                                            to_table.name, to_col
                                        )
                                        
                                        if confidence > 0.2:  # Higher threshold for cross-table discovery
                                            relationships.append({
                                                'from_table': from_table.name,
                                                'from_column': from_col,
                                                'to_table': to_table.name,
                                                'to_column': to_col,
                                                'type': 'foreign_key',
                                                'confidence': confidence,
                                                'detection_method': 'cross_table_data_analysis'
                                            })
                                            logger.info(f"Cross-table relationship: {from_table.name}.{from_col} → {to_table.name}.{to_col} (confidence: {confidence:.2f})")
            
            return relationships
            
        except Exception as e:
            logger.error(f"Error in cross-table relationship discovery: {e}")
            return []
    
    def _find_matching_column(self, table: TableInfo, pattern: str) -> str:
        """Find a column in the table that matches the given pattern"""
        pattern_lower = pattern.lower()
        
        for col_info in table.columns:
            if isinstance(col_info, dict):
                col_name = col_info.get('name', '')
                if col_name and pattern_lower in col_name.lower():
                    return col_name
        
        return None
    
    def _discover_underscore_relationships(self, tables: List[TableInfo]) -> List[Dict[str, Any]]:
        """Discover relationships with underscore patterns like Sample_SampleType → SampleType"""
        relationships = []
        
        try:
            # Convert tables to dictionary for easier lookup
            tables_dict = {table.name: table for table in tables}
            
            logger.info("Analyzing underscore patterns for foreign key relationships...")
            
            for table in tables:
                for col_info in table.columns:
                    if isinstance(col_info, dict):
                        col_name = col_info.get('name', '')
                        
                        # Look for underscore patterns: TablePrefix_TargetTable
                        if '_' in col_name:
                            parts = col_name.split('_')
                            if len(parts) >= 2:
                                # Try different combinations
                                potential_targets = []
                                
                                # Pattern 1: Sample_SampleType → SampleType
                                if len(parts) == 2:
                                    potential_targets.append(parts[1])  # SampleType from Sample_SampleType
                                
                                # Pattern 2: Sample_SpeciesType → SpeciesType
                                for i in range(1, len(parts)):
                                    potential_target = '_'.join(parts[i:])  # Join remaining parts
                                    potential_targets.append(potential_target)
                                
                                # Pattern 3: Remove common suffixes and try plural forms
                                for target in potential_targets[:]:  # Copy list to avoid modification during iteration
                                    # Try singular/plural variations
                                    if target.endswith('s'):
                                        potential_targets.append(target[:-1])  # Remove 's'
                                    else:
                                        potential_targets.append(target + 's')  # Add 's'
                                
                                # Look for matching tables
                                for potential_target in potential_targets:
                                    # Try exact match first
                                    if potential_target in tables_dict:
                                        target_table = tables_dict[potential_target]
                                        # Find ID column in target table
                                        target_id_col = self._find_id_column(target_table)
                                        
                                        if target_id_col:
                                            # Validate with data
                                            confidence = self._validate_relationship_with_data(
                                                table.name, col_name,
                                                target_table.name, target_id_col
                                            )
                                            
                                            if confidence > 0.1:  # Lower threshold for underscore patterns
                                                relationship = {
                                                    'from_table': table.name,
                                                    'from_column': col_name,
                                                    'to_table': target_table.name,
                                                    'to_column': target_id_col,
                                                    'type': 'foreign_key',
                                                    'confidence': confidence,
                                                    'detection_method': 'underscore_pattern_analysis'
                                                }
                                                relationships.append(relationship)
                                                logger.info(f"Underscore relationship: {table.name}.{col_name} → {target_table.name}.{target_id_col} (confidence: {confidence:.2f})")
                                    
                                    # Try case-insensitive match
                                    else:
                                        for table_name, target_table in tables_dict.items():
                                            if table_name.lower() == potential_target.lower():
                                                target_id_col = self._find_id_column(target_table)
                                                
                                                if target_id_col:
                                                    confidence = self._validate_relationship_with_data(
                                                        table.name, col_name,
                                                        target_table.name, target_id_col
                                                    )
                                                    
                                                    if confidence > 0.1:
                                                        relationship = {
                                                            'from_table': table.name,
                                                            'from_column': col_name,
                                                            'to_table': target_table.name,
                                                            'to_column': target_id_col,
                                                            'type': 'foreign_key',
                                                            'confidence': confidence,
                                                            'detection_method': 'underscore_pattern_analysis'
                                                        }
                                                        relationships.append(relationship)
                                                        logger.info(f"Underscore relationship (case-insensitive): {table.name}.{col_name} → {target_table.name}.{target_id_col} (confidence: {confidence:.2f})")
                                                break
            
            logger.info(f"Discovered {len(relationships)} underscore pattern relationships")
            return relationships
            
        except Exception as e:
            logger.error(f"Error in underscore relationship discovery: {e}")
            return []
    
    def _find_id_column(self, table: TableInfo) -> str:
        """Find the ID column in a table"""
        for col_info in table.columns:
            if isinstance(col_info, dict):
                col_name = col_info.get('name', '')
                # Look for common ID column patterns
                if col_name.lower() in ['id', 'pk', f'{table.name.lower()}_id', f'{table.name.lower()}id']:
                    return col_name
                # Check for identity columns or primary keys
                if col_info.get('is_primary_key', False) or col_info.get('is_identity', False):
                    return col_name
        
        # Fallback: return 'Id' if nothing found
        return 'Id'
    
    def _has_valid_cache(self) -> bool:
        """Check if we have a valid cached schema with LLM descriptions when statistical agent is available"""
        if not self.cache_file.exists():
            return False
        
        try:
            with open(self.cache_file, 'r', encoding='utf-8') as f:
                cache_data = json.load(f)
            
            cache_time = datetime.fromisoformat(cache_data.get('discovery_timestamp', '2000-01-01'))
            hours_old = (datetime.now() - cache_time).total_seconds() / 3600
            
            # Check if cache is too old
            if hours_old >= self.cache_duration_hours:
                return False
            
            # If we have a statistical agent, check if cached tables have LLM descriptions
            if self.statistical_agent is not None:
                tables = cache_data.get('tables', [])
                for table_data in tables:
                    # Check if table has an LLM-generated description
                    description = table_data.get('description', '')
                    if not description or description.startswith('Table containing'):
                        # Missing or has fallback description - need to refresh cache
                        logger.info(f"Cache invalid: Table {table_data.get('name', 'unknown')} missing LLM description")
                        return False
            
            return True
            
        except Exception as e:
            logger.warning(f"Error checking cache validity: {str(e)}")
            return False
    
    def _load_cached_schema(self) -> DatabaseInfo:
        """Load schema from cache file"""
        with open(self.cache_file, 'r', encoding='utf-8') as f:
            cache_data = json.load(f)
        
        tables = []
        for table_data in cache_data.get('tables', []):
            table_info = TableInfo(
                name=table_data['name'],
                schema=table_data['schema'],
                table_type=table_data['table_type'],
                row_count=table_data['row_count'],
                columns=table_data['columns'],
                primary_keys=table_data['primary_keys'],
                foreign_keys=table_data['foreign_keys'],
                description=table_data.get('description'),
                data_quality_score=table_data.get('data_quality_score')
            )
            tables.append(table_info)
        
        return DatabaseInfo(
            database_name=cache_data['database_name'],
            server_name=cache_data['server_name'],
            discovery_timestamp=cache_data['discovery_timestamp'],
            total_tables=cache_data['total_tables'],
            total_columns=cache_data['total_columns'],
            total_rows=cache_data['total_rows'],
            tables=tables,
            relationships=cache_data.get('relationships', [])
        )
    
    def _calculate_simple_quality_score(self, row_count: int, columns: List[Dict[str, Any]]) -> int:
        """Calculate a simple quality score for a table based on basic metrics"""
        try:
            score = 0
            
            # Row count scoring (30 points max)
            if row_count > 10000:
                score += 30
            elif row_count > 1000:
                score += 25
            elif row_count > 100:
                score += 20
            elif row_count > 10:
                score += 15
            elif row_count > 0:
                score += 10
            
            # Column count scoring (20 points max)
            column_count = len(columns)
            if column_count > 10:
                score += 20
            elif column_count > 5:
                score += 15
            elif column_count > 2:
                score += 10
            elif column_count > 0:
                score += 5
            
            # Data type diversity scoring (25 points max)
            data_types = set()
            nullable_columns = 0
            for col in columns:
                if isinstance(col, dict):
                    data_types.add(col.get('data_type', '').lower())
                    if col.get('is_nullable') == 'YES':
                        nullable_columns += 1
            
            # Diversity bonus
            type_diversity = len(data_types)
            if type_diversity > 5:
                score += 25
            elif type_diversity > 3:
                score += 20
            elif type_diversity > 1:
                score += 15
            else:
                score += 5
            
            # Completeness penalty (25 points max)
            if column_count > 0:
                nullable_ratio = nullable_columns / column_count
                completeness_score = int((1 - nullable_ratio) * 25)
                score += completeness_score
            
            return min(score, 100)  # Cap at 100
            
        except Exception as e:
            logger.warning(f"Error calculating quality score: {e}")
            return 50  # Default moderate score
    
    def _get_cached_table_description(self, table_name: str) -> Optional[str]:
        """Get cached table description if available and valid"""
        if not self.descriptions_cache_file.exists():
            return None
        
        try:
            with open(self.descriptions_cache_file, 'r', encoding='utf-8') as f:
                cache_data = json.load(f)
            
            # Check if cache is still valid
            cache_time = datetime.fromisoformat(cache_data.get('cache_timestamp', '2000-01-01'))
            hours_old = (datetime.now() - cache_time).total_seconds() / 3600
            
            if hours_old >= self.descriptions_cache_duration_hours:
                return None
            
            # Get table description
            table_hash = self._get_table_hash(table_name)
            descriptions = cache_data.get('descriptions', {})
            
            return descriptions.get(table_hash)
            
        except Exception as e:
            logger.warning(f"Error reading table descriptions cache: {e}")
            return None
    
    def _cache_table_description(self, table_name: str, description: str):
        """Cache a table description"""
        try:
            # Load existing cache or create new
            cache_data = {'descriptions': {}, 'cache_timestamp': datetime.now().isoformat()}
            
            if self.descriptions_cache_file.exists():
                try:
                    with open(self.descriptions_cache_file, 'r', encoding='utf-8') as f:
                        cache_data = json.load(f)
                except:
                    pass  # Use new cache_data if loading fails
            
            # Update cache
            table_hash = self._get_table_hash(table_name)
            cache_data['descriptions'][table_hash] = description
            cache_data['cache_timestamp'] = datetime.now().isoformat()
            
            # Save cache
            with open(self.descriptions_cache_file, 'w', encoding='utf-8') as f:
                json.dump(cache_data, f, indent=2, ensure_ascii=False)
                
        except Exception as e:
            logger.error(f"Error caching table description for {table_name}: {e}")
    
    def _get_table_hash(self, table_name: str) -> str:
        """Generate a hash for table name to use as cache key"""
        return hashlib.md5(table_name.lower().encode()).hexdigest()[:12]

    def _cache_schema(self, schema_info: DatabaseInfo):
        """Cache the discovered schema to file"""
        try:
            with open(self.cache_file, 'w', encoding='utf-8') as f:
                json.dump(schema_info.to_dict(), f, indent=2, ensure_ascii=False)
            logger.info(f"Schema cached to {self.cache_file}")
        except Exception as e:
            logger.error(f"Error caching schema: {str(e)}")
    
    def get_schema_summary(self, schema_info: DatabaseInfo) -> str:
        """Generate a human-readable summary of the discovered schema"""
        summary = f"""
šŸ—„ļø  DATABASE SCHEMA SUMMARY
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

šŸ“Š Database: {schema_info.database_name} on {schema_info.server_name}
šŸ“… Discovered: {schema_info.discovery_timestamp}
šŸ“ˆ Statistics: {schema_info.total_tables} tables, {schema_info.total_columns} columns, {schema_info.total_rows:,} total rows

šŸ”— KEY RELATIONSHIPS ({len(schema_info.relationships)} found):
"""
        
        # Show key relationships
        core_tables = ['Requests', 'Samples', 'Results', 'Analyses', 'Companies']
        shown_relationships = 0
        for rel in schema_info.relationships:
            # Handle both old and new relationship formats
            from_table = rel.get('from_table', rel.get('source_table', ''))
            from_column = rel.get('from_column', rel.get('source_column', ''))
            to_table = rel.get('to_table', rel.get('target_table', ''))
            to_column = rel.get('to_column', rel.get('target_column', ''))
            
            if (from_table in core_tables or to_table in core_tables) and shown_relationships < 10:
                summary += f"   {from_table}.{from_column} → {to_table}.{to_column}\n"
                shown_relationships += 1
        
        if shown_relationships < len(schema_info.relationships):
            summary += f"   ... and {len(schema_info.relationships) - shown_relationships} more relationships\n"
        
        summary += f"\nšŸ“‹ CORE TABLES (showing largest by row count):\n"
        
        # Show tables sorted by row count
        sorted_tables = sorted(schema_info.tables, key=lambda t: t.row_count, reverse=True)[:10]
        for table in sorted_tables:
            col_count = len(table.columns)
            pk_info = f" (PK: {', '.join(table.primary_keys)})" if table.primary_keys else ""
            summary += f"   {table.name}: {table.row_count:,} rows, {col_count} columns{pk_info}\n"
        
        return summary

Parameters

Name Type Default Kind
bases - -

Parameter Details

bases: Parameter of type

Return Value

Returns unspecified type

Class Interface

Methods

__init__(self, data_processor, statistical_agent)

Purpose: Initialize with a data processor that has database connection and optional LLM agent

Parameters:

  • data_processor: Parameter
  • statistical_agent: Parameter

Returns: None

discover_schema(self, force_refresh) -> DatabaseInfo

Purpose: Discover complete database schema from live connection

Parameters:

  • force_refresh: Type: bool

Returns: Returns DatabaseInfo

_get_database_info(self) -> Dict[str, str]

Purpose: Get basic database information

Returns: Returns Dict[str, str]

_discover_tables(self) -> List[TableInfo]

Purpose: Discover all tables and their detailed information

Returns: Returns List[TableInfo]

_get_table_columns(self, table_name, schema_name) -> List[Dict[str, Any]]

Purpose: Get detailed column information for a table

Parameters:

  • table_name: Type: str
  • schema_name: Type: str

Returns: Returns List[Dict[str, Any]]

_get_primary_keys(self, table_name, schema_name) -> List[str]

Purpose: Get primary key columns for a table

Parameters:

  • table_name: Type: str
  • schema_name: Type: str

Returns: Returns List[str]

_get_foreign_keys(self, table_name, schema_name) -> List[Dict[str, str]]

Purpose: Get foreign key relationships for a table

Parameters:

  • table_name: Type: str
  • schema_name: Type: str

Returns: Returns List[Dict[str, str]]

_get_table_description(self, table_name, schema_name) -> Optional[str]

Purpose: Get table description using LLM analysis with schema and data samples

Parameters:

  • table_name: Type: str
  • schema_name: Type: str

Returns: Returns Optional[str]

_generate_llm_table_description(self, table_name, schema_name) -> Optional[str]

Purpose: Generate table description using LLM with schema and data sample

Parameters:

  • table_name: Type: str
  • schema_name: Type: str

Returns: Returns Optional[str]

_get_table_schema_details(self, table_name, schema_name) -> Optional[Dict]

Purpose: Get detailed schema information for a table

Parameters:

  • table_name: Type: str
  • schema_name: Type: str

Returns: Returns Optional[Dict]

_get_table_data_sample(self, table_name, schema_name, sample_size) -> Optional[pd.DataFrame]

Purpose: Get a sample of data from the table

Parameters:

  • table_name: Type: str
  • schema_name: Type: str
  • sample_size: Type: int

Returns: Returns Optional[pd.DataFrame]

_create_table_description_prompt(self, table_name, table_schema, data_sample) -> str

Purpose: Create a prompt for LLM to generate table description

Parameters:

  • table_name: Type: str
  • table_schema: Type: Dict
  • data_sample: Type: Optional[pd.DataFrame]

Returns: Returns str

_extract_description_from_response(self, response) -> str

Purpose: Extract and clean the description from LLM response

Parameters:

  • response: Type: str

Returns: Returns str

_analyze_table_business_logic(self, table_name) -> str

Purpose: Analyze table's business purpose based on name and categorize it

Parameters:

  • table_name: Type: str

Returns: Returns str

_discover_relationships(self, tables) -> List[Dict[str, Any]]

Purpose: Discover relationships using column name patterns and data analysis

Parameters:

  • tables: Type: List[TableInfo]

Returns: Returns List[Dict[str, Any]]

_validate_relationship_with_data(self, from_table, from_column, to_table, to_column) -> float

Purpose: Validate potential relationship by analyzing actual data content

Parameters:

  • from_table: Type: str
  • from_column: Type: str
  • to_table: Type: str
  • to_column: Type: str

Returns: Returns float

_find_cross_table_relationships(self, tables) -> List[Dict[str, Any]]

Purpose: Find relationships by analyzing data content across tables, even with different column names

Parameters:

  • tables: Type: List[TableInfo]

Returns: Returns List[Dict[str, Any]]

_find_matching_column(self, table, pattern) -> str

Purpose: Find a column in the table that matches the given pattern

Parameters:

  • table: Type: TableInfo
  • pattern: Type: str

Returns: Returns str

_discover_underscore_relationships(self, tables) -> List[Dict[str, Any]]

Purpose: Discover relationships with underscore patterns like Sample_SampleType → SampleType

Parameters:

  • tables: Type: List[TableInfo]

Returns: Returns List[Dict[str, Any]]

_find_id_column(self, table) -> str

Purpose: Find the ID column in a table

Parameters:

  • table: Type: TableInfo

Returns: Returns str

_has_valid_cache(self) -> bool

Purpose: Check if we have a valid cached schema with LLM descriptions when statistical agent is available

Returns: Returns bool

_load_cached_schema(self) -> DatabaseInfo

Purpose: Load schema from cache file

Returns: Returns DatabaseInfo

_calculate_simple_quality_score(self, row_count, columns) -> int

Purpose: Calculate a simple quality score for a table based on basic metrics

Parameters:

  • row_count: Type: int
  • columns: Type: List[Dict[str, Any]]

Returns: Returns int

_get_cached_table_description(self, table_name) -> Optional[str]

Purpose: Get cached table description if available and valid

Parameters:

  • table_name: Type: str

Returns: Returns Optional[str]

_cache_table_description(self, table_name, description)

Purpose: Cache a table description

Parameters:

  • table_name: Type: str
  • description: Type: str

Returns: None

_get_table_hash(self, table_name) -> str

Purpose: Generate a hash for table name to use as cache key

Parameters:

  • table_name: Type: str

Returns: Returns str

_cache_schema(self, schema_info)

Purpose: Cache the discovered schema to file

Parameters:

  • schema_info: Type: DatabaseInfo

Returns: None

get_schema_summary(self, schema_info) -> str

Purpose: Generate a human-readable summary of the discovered schema

Parameters:

  • schema_info: Type: DatabaseInfo

Returns: Returns str

Required Imports

import json
import logging
import pandas as pd
import hashlib
from typing import Dict

Usage Example

# Example usage:
# result = DynamicSchemaDiscovery(bases)

Similar Components

AI-powered semantic similarity - components with related functionality:

  • class DatabaseSchema 56.4% similar

    A dataclass that represents comprehensive database schema information, including table structures, columns, relationships, and categorizations for SQL database introspection and query generation.

    From: /tf/active/vicechatdev/full_smartstat/sql_query_generator.py
  • function refresh_database_schema 54.2% similar

    Flask route endpoint that forces a refresh of the database schema by invoking the schema discovery service with LLM-generated descriptions.

    From: /tf/active/vicechatdev/full_smartstat/app.py
  • class DatabaseSchema_v1 52.9% similar

    A dataclass that represents database schema information, including table categories, relationships, and system architecture. Provides functionality to load schema from JSON files.

    From: /tf/active/vicechatdev/smartstat/sql_query_generator.py
  • class DatabaseInfo 49.2% similar

    A dataclass that encapsulates complete database schema information including tables, columns, relationships, and metadata for a specific database instance.

    From: /tf/active/vicechatdev/full_smartstat/dynamic_schema_discovery.py
  • function load_database_schema 48.1% similar

    Loads a database schema from a JSON file and returns it as a DatabaseSchema object.

    From: /tf/active/vicechatdev/full_smartstat/sql_query_generator.py
← Back to Browse