class DynamicSchemaDiscovery
Discovers database schema from live database connection
/tf/active/vicechatdev/full_smartstat/dynamic_schema_discovery.py
69 - 1251
moderate
Purpose
Discovers database schema from live database connection
Source Code
class DynamicSchemaDiscovery:
"""Discovers database schema from live database connection"""
def __init__(self, data_processor, statistical_agent=None):
"""Initialize with a data processor that has database connection and optional LLM agent"""
self.data_processor = data_processor
self.statistical_agent = statistical_agent
self.cache_file = Path(__file__).parent / 'discovered_schema_cache.json'
self.descriptions_cache_file = Path(__file__).parent / 'table_descriptions_cache.json'
self.cache_duration_hours = 24 # Refresh cache every 24 hours for more stability
self.descriptions_cache_duration_hours = 72 # Table descriptions cache for 3 days
def discover_schema(self, force_refresh: bool = False) -> DatabaseInfo:
"""Discover complete database schema from live connection"""
# Check if we have a recent cached version
if not force_refresh and self._has_valid_cache():
logger.info("Using cached schema discovery results")
return self._load_cached_schema()
logger.info("Starting dynamic schema discovery from live database...")
try:
# Get database info
db_info = self._get_database_info()
# Discover all tables
tables = self._discover_tables()
# Discover relationships
relationships = self._discover_relationships(tables)
# Calculate totals
total_columns = sum(len(table.columns) for table in tables)
total_rows = sum(table.row_count for table in tables)
schema_info = DatabaseInfo(
database_name=db_info['database_name'],
server_name=db_info['server_name'],
discovery_timestamp=datetime.now().isoformat(),
total_tables=len(tables),
total_columns=total_columns,
total_rows=total_rows,
tables=tables,
relationships=relationships
)
# Cache the results
self._cache_schema(schema_info)
logger.info(f"Schema discovery completed: {len(tables)} tables, {total_columns} columns, {total_rows} total rows")
return schema_info
except Exception as e:
logger.error(f"Error during schema discovery: {str(e)}")
# If discovery fails but we have cached data, use it
if self.cache_file.exists():
logger.warning("Using cached schema due to discovery error")
return self._load_cached_schema()
raise
def _get_database_info(self) -> Dict[str, str]:
"""Get basic database information"""
query = """
SELECT
DB_NAME() as database_name,
@@SERVERNAME as server_name
"""
result = self.data_processor.execute_query(query)
if result is not None and len(result) > 0:
return {
'database_name': result.iloc[0]['database_name'],
'server_name': result.iloc[0]['server_name']
}
return {'database_name': 'Unknown', 'server_name': 'Unknown'}
def _discover_tables(self) -> List[TableInfo]:
"""Discover all tables and their detailed information"""
tables = []
# Get list of all tables
tables_query = """
SELECT
t.TABLE_SCHEMA,
t.TABLE_NAME,
t.TABLE_TYPE,
ISNULL(p.rows, 0) as row_count
FROM INFORMATION_SCHEMA.TABLES t
LEFT JOIN (
SELECT
s.name as schema_name,
o.name as table_name,
SUM(p.rows) as rows
FROM sys.objects o
JOIN sys.schemas s ON o.schema_id = s.schema_id
JOIN sys.partitions p ON o.object_id = p.object_id
WHERE o.type = 'U'
GROUP BY s.name, o.name
) p ON t.TABLE_SCHEMA = p.schema_name AND t.TABLE_NAME = p.table_name
WHERE t.TABLE_TYPE = 'BASE TABLE'
ORDER BY t.TABLE_NAME
"""
tables_df = self.data_processor.execute_query(tables_query)
if tables_df is None:
logger.error("Could not retrieve table list")
return []
logger.info(f"Discovered {len(tables_df)} tables, getting detailed information...")
for _, table_row in tables_df.iterrows():
table_name = table_row['TABLE_NAME']
table_schema = table_row['TABLE_SCHEMA']
table_type = table_row['TABLE_TYPE']
row_count = int(table_row['row_count']) if pd.notna(table_row['row_count']) else 0
# Get columns for this table
columns = self._get_table_columns(table_name, table_schema)
# Get primary keys
primary_keys = self._get_primary_keys(table_name, table_schema)
# Get foreign keys
foreign_keys = self._get_foreign_keys(table_name, table_schema)
# Get table description if available
description = self._get_table_description(table_name, table_schema)
# Calculate a simple quality score based on available data
quality_score = self._calculate_simple_quality_score(row_count, columns)
table_info = TableInfo(
name=table_name,
schema=table_schema,
table_type=table_type,
row_count=row_count,
columns=columns,
primary_keys=primary_keys,
foreign_keys=foreign_keys,
description=description,
data_quality_score=quality_score
)
tables.append(table_info)
if len(tables) % 10 == 0:
logger.info(f"Processed {len(tables)}/{len(tables_df)} tables...")
return tables
def _get_table_columns(self, table_name: str, schema_name: str) -> List[Dict[str, Any]]:
"""Get detailed column information for a table"""
columns_query = f"""
SELECT
c.COLUMN_NAME,
c.DATA_TYPE,
c.IS_NULLABLE,
c.COLUMN_DEFAULT,
c.CHARACTER_MAXIMUM_LENGTH,
c.NUMERIC_PRECISION,
c.NUMERIC_SCALE,
c.ORDINAL_POSITION
FROM INFORMATION_SCHEMA.COLUMNS c
WHERE c.TABLE_NAME = '{table_name}' AND c.TABLE_SCHEMA = '{schema_name}'
ORDER BY c.ORDINAL_POSITION
"""
try:
columns_df = self.data_processor.execute_query(columns_query)
if columns_df is None:
return []
columns = []
for _, col_row in columns_df.iterrows():
column_info = {
'name': col_row['COLUMN_NAME'],
'data_type': col_row['DATA_TYPE'],
'is_nullable': col_row['IS_NULLABLE'] == 'YES',
'default_value': col_row['COLUMN_DEFAULT'],
'max_length': int(col_row['CHARACTER_MAXIMUM_LENGTH']) if pd.notna(col_row['CHARACTER_MAXIMUM_LENGTH']) else None,
'precision': int(col_row['NUMERIC_PRECISION']) if pd.notna(col_row['NUMERIC_PRECISION']) else None,
'scale': int(col_row['NUMERIC_SCALE']) if pd.notna(col_row['NUMERIC_SCALE']) else None,
'position': int(col_row['ORDINAL_POSITION']),
'is_primary_key': False, # Will be updated separately
'is_foreign_key': False, # Will be updated separately
'referenced_table': None,
'referenced_column': None
}
columns.append(column_info)
return columns
except Exception as e:
logger.error(f"Error getting columns for table {table_name}: {str(e)}")
return []
def _get_primary_keys(self, table_name: str, schema_name: str) -> List[str]:
"""Get primary key columns for a table"""
pk_query = f"""
SELECT ku.COLUMN_NAME
FROM INFORMATION_SCHEMA.TABLE_CONSTRAINTS tc
JOIN INFORMATION_SCHEMA.KEY_COLUMN_USAGE ku
ON tc.CONSTRAINT_NAME = ku.CONSTRAINT_NAME
WHERE tc.CONSTRAINT_TYPE = 'PRIMARY KEY'
AND tc.TABLE_NAME = '{table_name}' AND tc.TABLE_SCHEMA = '{schema_name}'
ORDER BY ku.ORDINAL_POSITION
"""
try:
pk_df = self.data_processor.execute_query(pk_query)
if pk_df is not None and len(pk_df) > 0:
return pk_df['COLUMN_NAME'].tolist()
return []
except Exception as e:
logger.error(f"Error getting primary keys for table {table_name}: {str(e)}")
return []
def _get_foreign_keys(self, table_name: str, schema_name: str) -> List[Dict[str, str]]:
"""Get foreign key relationships for a table"""
# Simplified approach - just get basic foreign key info without complex joins
try:
# For now, return empty list - we'll get relationships at the database level
return []
except Exception as e:
logger.error(f"Error getting foreign keys for table {table_name}: {str(e)}")
return []
def _get_table_description(self, table_name: str, schema_name: str) -> Optional[str]:
"""Get table description using LLM analysis with schema and data samples"""
try:
# Check cache first
cached_description = self._get_cached_table_description(table_name)
if cached_description:
return cached_description
# Generate new description using LLM if available
if self.statistical_agent:
description = self._generate_llm_table_description(table_name, schema_name)
if description:
# Cache the generated description
self._cache_table_description(table_name, description)
return description
# Fallback to business logic analysis
description = self._analyze_table_business_logic(table_name)
return description
except Exception as e:
logger.error(f"Error getting table description for {table_name}: {e}")
# Default fallback
return f"Database table: {table_name}"
def _generate_llm_table_description(self, table_name: str, schema_name: str) -> Optional[str]:
"""Generate table description using LLM with schema and data sample"""
try:
# Get table schema details
table_schema = self._get_table_schema_details(table_name, schema_name)
if not table_schema:
return None
# Get data sample (first 50 rows)
data_sample = self._get_table_data_sample(table_name, schema_name, sample_size=50)
# Create LLM prompt
prompt = self._create_table_description_prompt(table_name, table_schema, data_sample)
# Query LLM for description
logger.info(f"Generating LLM-based description for table: {table_name}")
response = self.statistical_agent.query_llm(prompt, model='gpt-4o-mini') # Use smaller model for efficiency
# Extract and clean the description
description = self._extract_description_from_response(response)
logger.info(f"Generated LLM description for {table_name}: {description[:100]}...")
return description
except Exception as e:
logger.error(f"Error generating LLM description for table {table_name}: {e}")
return None
def _get_table_schema_details(self, table_name: str, schema_name: str) -> Optional[Dict]:
"""Get detailed schema information for a table"""
try:
schema_query = """
SELECT
c.COLUMN_NAME,
c.DATA_TYPE,
c.IS_NULLABLE,
c.COLUMN_DEFAULT,
c.CHARACTER_MAXIMUM_LENGTH,
c.NUMERIC_PRECISION,
c.NUMERIC_SCALE,
CASE WHEN pk.COLUMN_NAME IS NOT NULL THEN 1 ELSE 0 END as IS_PRIMARY_KEY,
CASE WHEN fk.COLUMN_NAME IS NOT NULL THEN 1 ELSE 0 END as IS_FOREIGN_KEY
FROM INFORMATION_SCHEMA.COLUMNS c
LEFT JOIN INFORMATION_SCHEMA.KEY_COLUMN_USAGE pk
ON c.TABLE_NAME = pk.TABLE_NAME
AND c.COLUMN_NAME = pk.COLUMN_NAME
AND pk.CONSTRAINT_NAME LIKE 'PK_%'
LEFT JOIN INFORMATION_SCHEMA.KEY_COLUMN_USAGE fk
ON c.TABLE_NAME = fk.TABLE_NAME
AND c.COLUMN_NAME = fk.COLUMN_NAME
AND fk.CONSTRAINT_NAME LIKE 'FK_%'
WHERE c.TABLE_NAME = :table_name AND c.TABLE_SCHEMA = :schema_name
ORDER BY c.ORDINAL_POSITION
"""
result = self.data_processor.execute_query(schema_query, params={'table_name': table_name, 'schema_name': schema_name})
if result is not None and len(result) > 0:
return {
'columns': result.to_dict('records'),
'total_columns': len(result)
}
return None
except Exception as e:
logger.error(f"Error getting schema details for {table_name}: {e}")
return None
def _get_table_data_sample(self, table_name: str, schema_name: str, sample_size: int = 50) -> Optional[pd.DataFrame]:
"""Get a sample of data from the table"""
try:
# Use a safe approach to get sample data
sample_query = f"""
SELECT TOP {sample_size} *
FROM [{schema_name}].[{table_name}]
"""
result = self.data_processor.execute_query(sample_query)
if result is not None and len(result) > 0:
return result
return None
except Exception as e:
logger.error(f"Error getting data sample for {table_name}: {e}")
return None
def _create_table_description_prompt(self, table_name: str, table_schema: Dict, data_sample: Optional[pd.DataFrame]) -> str:
"""Create a prompt for LLM to generate table description"""
# Build schema information
schema_info = f"Table: {table_name}\n"
schema_info += "Columns:\n"
for col in table_schema['columns']:
column_info = f"- {col['COLUMN_NAME']} ({col['DATA_TYPE']}"
if col['CHARACTER_MAXIMUM_LENGTH']:
column_info += f"({col['CHARACTER_MAXIMUM_LENGTH']})"
elif col['NUMERIC_PRECISION']:
column_info += f"({col['NUMERIC_PRECISION']},{col['NUMERIC_SCALE'] or 0})"
column_info += ")"
if col['IS_PRIMARY_KEY']:
column_info += " [PRIMARY KEY]"
if col['IS_FOREIGN_KEY']:
column_info += " [FOREIGN KEY]"
if col['IS_NULLABLE'] == 'NO':
column_info += " [NOT NULL]"
schema_info += column_info + "\n"
# Build data sample information
data_info = ""
if data_sample is not None and len(data_sample) > 0:
data_info = f"\nSample Data (first {len(data_sample)} rows):\n"
# Show representative data for key columns (limit to prevent huge prompts)
key_columns = []
for col in table_schema['columns']:
if col['IS_PRIMARY_KEY'] or 'name' in col['COLUMN_NAME'].lower() or 'id' in col['COLUMN_NAME'].lower():
key_columns.append(col['COLUMN_NAME'])
# Limit to max 8 columns for the sample
display_columns = key_columns[:4] + [col for col in data_sample.columns if col not in key_columns][:4]
for col in display_columns:
if col in data_sample.columns:
unique_values = data_sample[col].dropna().unique()[:10] # Show max 10 unique values
data_info += f"- {col}: {', '.join(str(v) for v in unique_values)}\n"
# Create the prompt
prompt = f"""
You are analyzing a database table from a Laboratory Information Management System (LIMS).
This system manages laboratory testing data, customer information, sample tracking, and result reporting for a veterinary/medical diagnostic laboratory.
Please analyze the following table and provide a concise, informative description of what this table contains and its purpose in the laboratory workflow.
{schema_info}
{data_info}
CONTEXT: This is a laboratory management database that handles:
- Test requests from customers (veterinarians, clinics)
- Sample tracking and management
- Laboratory analysis and testing
- Result reporting and validation
- Customer and billing information
- Quality control and compliance
Please provide a single paragraph (2-4 sentences) describing:
1. What type of data this table stores
2. Its role in the laboratory workflow
3. Key relationships or dependencies with other data
Keep the description professional, concise, and focused on the business purpose.
"""
return prompt
def _extract_description_from_response(self, response: str) -> str:
"""Extract and clean the description from LLM response"""
# Clean up the response
description = response.strip()
# Remove any markdown formatting
description = description.replace('**', '').replace('*', '')
# Remove any prompt echoes or prefixes
common_prefixes = [
"This table",
"The table",
"Based on the analysis",
"Description:",
"Table description:"
]
for prefix in common_prefixes:
if description.lower().startswith(prefix.lower()):
description = description[len(prefix):].strip()
# Ensure it starts with a capital letter
if description and not description[0].isupper():
description = description[0].upper() + description[1:]
# Limit length to reasonable size
if len(description) > 500:
description = description[:500] + "..."
return description
def _analyze_table_business_logic(self, table_name: str) -> str:
"""Analyze table's business purpose based on name and categorize it"""
table_lower = table_name.lower()
# Core Laboratory Operations
if table_lower == 'requests':
return "Laboratory test requests from customers/veterinarians - entry point for all testing"
elif table_lower == 'samples':
return "Individual samples linked to requests - physical specimens for testing"
elif table_lower == 'results':
return "Laboratory test results with validation workflow - final analytical outcomes"
elif table_lower == 'analyses':
return "Types of laboratory analyses/tests available - defines what can be tested"
elif table_lower == 'sampleanalysegroups':
return "Groups of analyses to be performed on samples - organizes testing workflow"
elif table_lower == 'analysisgroups':
return "Categories/groups of related analyses - organizes test types"
# Customer Management
elif table_lower in ['companies', 'accounts']:
return "Customer/company information - entities that submit testing requests"
elif 'dierenarts' in table_lower:
return "Veterinarian information - medical professionals submitting samples"
elif 'praktijk' in table_lower:
return "Veterinary practice information - clinic/office details"
# Analysis Configuration
elif table_lower == 'analysiscategories':
return "Categories for organizing different types of analyses"
elif table_lower == 'sampletypes':
return "Types of samples that can be tested (blood, urine, tissue, etc.)"
elif table_lower == 'posnegs':
return "Positive/negative result definitions and interpretations"
# Specialized Testing Modules
elif any(x in table_lower for x in ['bact', 'bacteriology']):
return "Bacteriology testing results and bacterial identification data"
elif 'pcr' in table_lower:
return "PCR (molecular) testing results and genetic analysis data"
elif 'serolog' in table_lower:
return "Serology testing results - antibody and immune response analysis"
elif 'antibiogram' in table_lower:
return "Antibiotic sensitivity testing results - bacterial resistance patterns"
elif 'parasitolog' in table_lower:
return "Parasitology testing results - parasite identification and analysis"
# Result Processing
elif 'resultaat' in table_lower or 'result' in table_lower:
return "Laboratory result data - contains analytical findings and measurements"
elif 'uitsla' in table_lower or 'uitslag' in table_lower:
return "Test outcome/result reporting - formatted results for customers"
# System Management
elif any(x in table_lower for x in ['user', 'gebruiker']):
return "System user accounts and authentication information"
elif any(x in table_lower for x in ['role', 'permission', 'autoris']):
return "User roles and system permissions - access control"
elif any(x in table_lower for x in ['audit', 'log']):
return "System audit trail and logging information"
elif 'configuratie' in table_lower or 'config' in table_lower:
return "System configuration settings and parameters"
# Reporting
elif 'report' in table_lower or 'rapport' in table_lower:
return "Report templates and formatting configurations"
elif 'template' in table_lower or 'sjabloon' in table_lower:
return "Document and report templates"
# Quality Control
elif any(x in table_lower for x in ['qc', 'quality', 'kwaliteit']):
return "Quality control data and measurements"
elif 'validat' in table_lower:
return "Validation workflow and approval tracking"
# Inventory/Equipment
elif any(x in table_lower for x in ['apparaat', 'device', 'equipment']):
return "Laboratory equipment and device information"
elif any(x in table_lower for x in ['reagens', 'reagent', 'kit']):
return "Laboratory reagents and testing kit information"
# Administrative
elif any(x in table_lower for x in ['invoice', 'factuur', 'billing']):
return "Billing and invoicing information"
elif any(x in table_lower for x in ['admin', 'beheer']):
return "Administrative data and system management"
# Lookup/Reference Tables
elif table_lower.startswith('tbl') or table_lower.startswith('lut') or 'lookup' in table_lower:
return f"Reference/lookup table - provides standardized values for {table_name.replace('tbl', '').replace('lut', '')}"
# Default categorization
else:
# Try to infer from common patterns
if any(x in table_lower for x in ['type', 'category', 'groep', 'group']):
return f"Classification/categorization table - defines types or groups for {table_name}"
elif table_lower.endswith('s') and len(table_lower) > 3:
return f"Data table containing {table_lower} records"
else:
return f"Database table: {table_name} - purpose needs investigation"
def _discover_relationships(self, tables: List[TableInfo]) -> List[Dict[str, Any]]:
"""Discover relationships using column name patterns and data analysis"""
relationships = []
try:
logger.info("Starting intelligent relationship discovery...")
# Load manual relationships first
manual_manager = get_manual_relationship_manager()
manual_relationships = manual_manager.get_all_relationships()
if manual_relationships:
logger.info(f"Loading {len(manual_relationships)} manual relationships")
relationships.extend(manual_relationships)
else:
logger.info("No manual relationships found")
# Convert tables to dictionary for easier lookup
all_tables = {table.name: table for table in tables}
column_patterns = {}
# First pass: catalog all columns and identify potential keys
for table in tables:
table_name = table.name
# Use existing table column information
if table.columns:
# Identify potential key columns
for col_info in table.columns:
if isinstance(col_info, dict):
col_name = col_info.get('name', col_info.get('COLUMN_NAME', ''))
else:
col_name = str(col_info)
if col_name:
col_lower = col_name.lower()
# Track column patterns for relationship detection
if col_lower not in column_patterns:
column_patterns[col_lower] = []
column_patterns[col_lower].append({
'table': table_name,
'column': col_name,
'is_identity': col_name.lower() == 'id' or 'id' in col_name.lower(),
'data_type': col_info.get('data_type', col_info.get('DATA_TYPE', 'unknown')) if isinstance(col_info, dict) else 'unknown'
})
# Second pass: find relationships based on column patterns AND data content
logger.info(f"Analyzing {len(column_patterns)} column patterns for relationships...")
# Also look for cross-table data relationships (even with different column names)
relationships.extend(self._find_cross_table_relationships(tables))
for col_pattern, occurrences in column_patterns.items():
if len(occurrences) > 1: # Column appears in multiple tables
logger.debug(f"Analyzing pattern '{col_pattern}' with {len(occurrences)} occurrences")
# Look for ID patterns
if col_pattern.endswith('id') or col_pattern == 'id':
identity_tables = [occ for occ in occurrences if occ['is_identity']]
non_identity_tables = [occ for occ in occurrences if not occ['is_identity']]
# Create relationships from non-identity to identity tables with data validation
for identity_table in identity_tables:
for non_identity_table in non_identity_tables:
if identity_table['table'] != non_identity_table['table']:
# Validate relationship with data sampling
confidence = self._validate_relationship_with_data(
non_identity_table['table'], non_identity_table['column'],
identity_table['table'], identity_table['column']
)
if confidence > 0.1: # Only include if data shows some relationship
relationship = {
'from_table': non_identity_table['table'],
'from_column': non_identity_table['column'],
'to_table': identity_table['table'],
'to_column': identity_table['column'],
'type': 'foreign_key',
'confidence': confidence,
'detection_method': 'column_pattern_and_data'
}
relationships.append(relationship)
logger.info(f"Found relationship: {non_identity_table['table']}.{non_identity_table['column']} ā {identity_table['table']}.{identity_table['column']} (confidence: {confidence:.2f})")
# Look for common naming patterns like TableName + ID with data validation
elif 'id' in col_pattern:
# Extract table name from column (e.g., 'sample_request' -> 'request')
potential_table_name = col_pattern.replace('id', '').replace('_', '').strip()
# Find matching table names
for table_name in all_tables.keys():
table_lower = table_name.lower().replace('_', '')
if (potential_table_name in table_lower or
table_lower in potential_table_name or
potential_table_name == table_lower):
# Find ID column in target table
target_table_info = all_tables[table_name]
target_id_columns = [col for col in target_table_info.columns
if isinstance(col, dict) and
'id' in col.get('name', '').lower()]
if target_id_columns:
target_column = target_id_columns[0].get('name', 'Id')
# Test all occurrences of this pattern
for occ in occurrences:
if occ['table'] != table_name:
# Validate with data sampling
confidence = self._validate_relationship_with_data(
occ['table'], occ['column'],
table_name, target_column
)
if confidence > 0.1:
relationship = {
'from_table': occ['table'],
'from_column': occ['column'],
'to_table': table_name,
'to_column': target_column,
'type': 'foreign_key',
'confidence': confidence,
'detection_method': 'table_name_pattern_and_data'
}
relationships.append(relationship)
# Third pass: Enhanced underscore pattern detection for missed relationships
logger.info("Analyzing underscore patterns for foreign key relationships...")
underscore_relationships = self._discover_underscore_relationships(tables)
relationships.extend(underscore_relationships)
logger.info(f"Discovered {len(relationships)} potential relationships")
return relationships
except Exception as e:
logger.error(f"Error discovering relationships: {str(e)}")
return []
def _validate_relationship_with_data(self, from_table: str, from_column: str,
to_table: str, to_column: str) -> float:
"""Validate potential relationship by analyzing actual data content"""
try:
# Use larger sample and get more strategic sampling
sample_size = 500
# Get non-zero values from source table (many FKs have 0 for NULL)
from_query = f"""
SELECT TOP {sample_size} [{from_column}]
FROM [{from_table}]
WHERE [{from_column}] IS NOT NULL
AND [{from_column}] != 0
ORDER BY NEWID()
"""
# Get all unique values from target table (typically smaller set)
to_query = f"""
SELECT DISTINCT [{to_column}]
FROM [{to_table}]
WHERE [{to_column}] IS NOT NULL
"""
from_df = self.data_processor.execute_query(from_query)
to_df = self.data_processor.execute_query(to_query)
if from_df.empty or to_df.empty:
return 0.0
# Get unique values from both columns
from_values = set(from_df[from_column].dropna().astype(str))
to_values = set(to_df[to_column].dropna().astype(str))
if not from_values or not to_values:
return 0.0
# Calculate overlap percentage
intersection = from_values.intersection(to_values)
overlap_ratio = len(intersection) / len(from_values) if from_values else 0
# Check if target values are subset of from values (reverse relationship)
reverse_overlap = len(intersection) / len(to_values) if to_values else 0
# Use the better overlap ratio
best_overlap = max(overlap_ratio, reverse_overlap)
confidence = best_overlap
# Boost confidence based on overlap quality
if best_overlap > 0.5:
confidence = min(0.95, best_overlap + 0.2)
elif best_overlap > 0.2:
confidence = min(0.8, best_overlap + 0.15)
elif best_overlap > 0.05:
confidence = min(0.6, best_overlap + 0.1)
# Extra boost for known LIMS table patterns
from_lower = from_table.lower()
to_lower = to_table.lower()
if (('sample' in from_lower and 'request' in to_lower) or
('result' in from_lower and 'sample' in to_lower) or
('result' in from_lower and 'analys' in to_lower)):
confidence = min(0.95, confidence + 0.1)
# Penalize if no meaningful overlap found
if len(intersection) == 0:
confidence = 0.0
elif len(intersection) < 3: # Very few matches might be coincidental
confidence = confidence * 0.5
logger.debug(f"Data validation {from_table}.{from_column} ā {to_table}.{to_column}: "
f"{len(intersection)} matches, {overlap_ratio:.2f} forward, {reverse_overlap:.2f} reverse, confidence: {confidence:.2f}")
return confidence
except Exception as e:
logger.warning(f"Could not validate relationship {from_table}.{from_column} ā {to_table}.{to_column}: {e}")
return 0.0
def _find_cross_table_relationships(self, tables: List[TableInfo]) -> List[Dict[str, Any]]:
"""Find relationships by analyzing data content across tables, even with different column names"""
relationships = []
try:
# Focus on high-volume tables that are likely to contain main data
main_tables = [t for t in tables if t.row_count > 1000]
logger.info(f"Analyzing cross-table relationships for {len(main_tables)} high-volume tables...")
# Known LIMS table patterns to prioritize
lims_patterns = {
'requests': ['request', 'req'],
'samples': ['sample', 'samp'],
'results': ['result', 'res'],
'analyses': ['analys', 'analysis', 'test']
}
# Find tables matching LIMS patterns
lims_tables = {}
for category, patterns in lims_patterns.items():
for table in main_tables:
table_lower = table.name.lower()
if any(pattern in table_lower for pattern in patterns):
if category not in lims_tables:
lims_tables[category] = []
lims_tables[category].append(table)
# Test known LIMS relationships with data validation
test_relationships = [
('requests', 'samples', ['id'], ['sample_request', 'request_id', 'requestid']),
('samples', 'results', ['id'], ['result_sample', 'sample_id', 'sampleid']),
('results', 'analyses', ['result_analysis', 'analysis_id'], ['id']),
]
for from_category, to_category, from_cols, to_cols in test_relationships:
if from_category in lims_tables and to_category in lims_tables:
for from_table in lims_tables[from_category]:
for to_table in lims_tables[to_category]:
# Try different column combinations
for from_col_pattern in from_cols:
for to_col_pattern in to_cols:
from_col = self._find_matching_column(from_table, from_col_pattern)
to_col = self._find_matching_column(to_table, to_col_pattern)
if from_col and to_col:
confidence = self._validate_relationship_with_data(
from_table.name, from_col,
to_table.name, to_col
)
if confidence > 0.2: # Higher threshold for cross-table discovery
relationships.append({
'from_table': from_table.name,
'from_column': from_col,
'to_table': to_table.name,
'to_column': to_col,
'type': 'foreign_key',
'confidence': confidence,
'detection_method': 'cross_table_data_analysis'
})
logger.info(f"Cross-table relationship: {from_table.name}.{from_col} ā {to_table.name}.{to_col} (confidence: {confidence:.2f})")
return relationships
except Exception as e:
logger.error(f"Error in cross-table relationship discovery: {e}")
return []
def _find_matching_column(self, table: TableInfo, pattern: str) -> str:
"""Find a column in the table that matches the given pattern"""
pattern_lower = pattern.lower()
for col_info in table.columns:
if isinstance(col_info, dict):
col_name = col_info.get('name', '')
if col_name and pattern_lower in col_name.lower():
return col_name
return None
def _discover_underscore_relationships(self, tables: List[TableInfo]) -> List[Dict[str, Any]]:
"""Discover relationships with underscore patterns like Sample_SampleType ā SampleType"""
relationships = []
try:
# Convert tables to dictionary for easier lookup
tables_dict = {table.name: table for table in tables}
logger.info("Analyzing underscore patterns for foreign key relationships...")
for table in tables:
for col_info in table.columns:
if isinstance(col_info, dict):
col_name = col_info.get('name', '')
# Look for underscore patterns: TablePrefix_TargetTable
if '_' in col_name:
parts = col_name.split('_')
if len(parts) >= 2:
# Try different combinations
potential_targets = []
# Pattern 1: Sample_SampleType ā SampleType
if len(parts) == 2:
potential_targets.append(parts[1]) # SampleType from Sample_SampleType
# Pattern 2: Sample_SpeciesType ā SpeciesType
for i in range(1, len(parts)):
potential_target = '_'.join(parts[i:]) # Join remaining parts
potential_targets.append(potential_target)
# Pattern 3: Remove common suffixes and try plural forms
for target in potential_targets[:]: # Copy list to avoid modification during iteration
# Try singular/plural variations
if target.endswith('s'):
potential_targets.append(target[:-1]) # Remove 's'
else:
potential_targets.append(target + 's') # Add 's'
# Look for matching tables
for potential_target in potential_targets:
# Try exact match first
if potential_target in tables_dict:
target_table = tables_dict[potential_target]
# Find ID column in target table
target_id_col = self._find_id_column(target_table)
if target_id_col:
# Validate with data
confidence = self._validate_relationship_with_data(
table.name, col_name,
target_table.name, target_id_col
)
if confidence > 0.1: # Lower threshold for underscore patterns
relationship = {
'from_table': table.name,
'from_column': col_name,
'to_table': target_table.name,
'to_column': target_id_col,
'type': 'foreign_key',
'confidence': confidence,
'detection_method': 'underscore_pattern_analysis'
}
relationships.append(relationship)
logger.info(f"Underscore relationship: {table.name}.{col_name} ā {target_table.name}.{target_id_col} (confidence: {confidence:.2f})")
# Try case-insensitive match
else:
for table_name, target_table in tables_dict.items():
if table_name.lower() == potential_target.lower():
target_id_col = self._find_id_column(target_table)
if target_id_col:
confidence = self._validate_relationship_with_data(
table.name, col_name,
target_table.name, target_id_col
)
if confidence > 0.1:
relationship = {
'from_table': table.name,
'from_column': col_name,
'to_table': target_table.name,
'to_column': target_id_col,
'type': 'foreign_key',
'confidence': confidence,
'detection_method': 'underscore_pattern_analysis'
}
relationships.append(relationship)
logger.info(f"Underscore relationship (case-insensitive): {table.name}.{col_name} ā {target_table.name}.{target_id_col} (confidence: {confidence:.2f})")
break
logger.info(f"Discovered {len(relationships)} underscore pattern relationships")
return relationships
except Exception as e:
logger.error(f"Error in underscore relationship discovery: {e}")
return []
def _find_id_column(self, table: TableInfo) -> str:
"""Find the ID column in a table"""
for col_info in table.columns:
if isinstance(col_info, dict):
col_name = col_info.get('name', '')
# Look for common ID column patterns
if col_name.lower() in ['id', 'pk', f'{table.name.lower()}_id', f'{table.name.lower()}id']:
return col_name
# Check for identity columns or primary keys
if col_info.get('is_primary_key', False) or col_info.get('is_identity', False):
return col_name
# Fallback: return 'Id' if nothing found
return 'Id'
def _has_valid_cache(self) -> bool:
"""Check if we have a valid cached schema with LLM descriptions when statistical agent is available"""
if not self.cache_file.exists():
return False
try:
with open(self.cache_file, 'r', encoding='utf-8') as f:
cache_data = json.load(f)
cache_time = datetime.fromisoformat(cache_data.get('discovery_timestamp', '2000-01-01'))
hours_old = (datetime.now() - cache_time).total_seconds() / 3600
# Check if cache is too old
if hours_old >= self.cache_duration_hours:
return False
# If we have a statistical agent, check if cached tables have LLM descriptions
if self.statistical_agent is not None:
tables = cache_data.get('tables', [])
for table_data in tables:
# Check if table has an LLM-generated description
description = table_data.get('description', '')
if not description or description.startswith('Table containing'):
# Missing or has fallback description - need to refresh cache
logger.info(f"Cache invalid: Table {table_data.get('name', 'unknown')} missing LLM description")
return False
return True
except Exception as e:
logger.warning(f"Error checking cache validity: {str(e)}")
return False
def _load_cached_schema(self) -> DatabaseInfo:
"""Load schema from cache file"""
with open(self.cache_file, 'r', encoding='utf-8') as f:
cache_data = json.load(f)
tables = []
for table_data in cache_data.get('tables', []):
table_info = TableInfo(
name=table_data['name'],
schema=table_data['schema'],
table_type=table_data['table_type'],
row_count=table_data['row_count'],
columns=table_data['columns'],
primary_keys=table_data['primary_keys'],
foreign_keys=table_data['foreign_keys'],
description=table_data.get('description'),
data_quality_score=table_data.get('data_quality_score')
)
tables.append(table_info)
return DatabaseInfo(
database_name=cache_data['database_name'],
server_name=cache_data['server_name'],
discovery_timestamp=cache_data['discovery_timestamp'],
total_tables=cache_data['total_tables'],
total_columns=cache_data['total_columns'],
total_rows=cache_data['total_rows'],
tables=tables,
relationships=cache_data.get('relationships', [])
)
def _calculate_simple_quality_score(self, row_count: int, columns: List[Dict[str, Any]]) -> int:
"""Calculate a simple quality score for a table based on basic metrics"""
try:
score = 0
# Row count scoring (30 points max)
if row_count > 10000:
score += 30
elif row_count > 1000:
score += 25
elif row_count > 100:
score += 20
elif row_count > 10:
score += 15
elif row_count > 0:
score += 10
# Column count scoring (20 points max)
column_count = len(columns)
if column_count > 10:
score += 20
elif column_count > 5:
score += 15
elif column_count > 2:
score += 10
elif column_count > 0:
score += 5
# Data type diversity scoring (25 points max)
data_types = set()
nullable_columns = 0
for col in columns:
if isinstance(col, dict):
data_types.add(col.get('data_type', '').lower())
if col.get('is_nullable') == 'YES':
nullable_columns += 1
# Diversity bonus
type_diversity = len(data_types)
if type_diversity > 5:
score += 25
elif type_diversity > 3:
score += 20
elif type_diversity > 1:
score += 15
else:
score += 5
# Completeness penalty (25 points max)
if column_count > 0:
nullable_ratio = nullable_columns / column_count
completeness_score = int((1 - nullable_ratio) * 25)
score += completeness_score
return min(score, 100) # Cap at 100
except Exception as e:
logger.warning(f"Error calculating quality score: {e}")
return 50 # Default moderate score
def _get_cached_table_description(self, table_name: str) -> Optional[str]:
"""Get cached table description if available and valid"""
if not self.descriptions_cache_file.exists():
return None
try:
with open(self.descriptions_cache_file, 'r', encoding='utf-8') as f:
cache_data = json.load(f)
# Check if cache is still valid
cache_time = datetime.fromisoformat(cache_data.get('cache_timestamp', '2000-01-01'))
hours_old = (datetime.now() - cache_time).total_seconds() / 3600
if hours_old >= self.descriptions_cache_duration_hours:
return None
# Get table description
table_hash = self._get_table_hash(table_name)
descriptions = cache_data.get('descriptions', {})
return descriptions.get(table_hash)
except Exception as e:
logger.warning(f"Error reading table descriptions cache: {e}")
return None
def _cache_table_description(self, table_name: str, description: str):
"""Cache a table description"""
try:
# Load existing cache or create new
cache_data = {'descriptions': {}, 'cache_timestamp': datetime.now().isoformat()}
if self.descriptions_cache_file.exists():
try:
with open(self.descriptions_cache_file, 'r', encoding='utf-8') as f:
cache_data = json.load(f)
except:
pass # Use new cache_data if loading fails
# Update cache
table_hash = self._get_table_hash(table_name)
cache_data['descriptions'][table_hash] = description
cache_data['cache_timestamp'] = datetime.now().isoformat()
# Save cache
with open(self.descriptions_cache_file, 'w', encoding='utf-8') as f:
json.dump(cache_data, f, indent=2, ensure_ascii=False)
except Exception as e:
logger.error(f"Error caching table description for {table_name}: {e}")
def _get_table_hash(self, table_name: str) -> str:
"""Generate a hash for table name to use as cache key"""
return hashlib.md5(table_name.lower().encode()).hexdigest()[:12]
def _cache_schema(self, schema_info: DatabaseInfo):
"""Cache the discovered schema to file"""
try:
with open(self.cache_file, 'w', encoding='utf-8') as f:
json.dump(schema_info.to_dict(), f, indent=2, ensure_ascii=False)
logger.info(f"Schema cached to {self.cache_file}")
except Exception as e:
logger.error(f"Error caching schema: {str(e)}")
def get_schema_summary(self, schema_info: DatabaseInfo) -> str:
"""Generate a human-readable summary of the discovered schema"""
summary = f"""
šļø DATABASE SCHEMA SUMMARY
āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā
š Database: {schema_info.database_name} on {schema_info.server_name}
š
Discovered: {schema_info.discovery_timestamp}
š Statistics: {schema_info.total_tables} tables, {schema_info.total_columns} columns, {schema_info.total_rows:,} total rows
š KEY RELATIONSHIPS ({len(schema_info.relationships)} found):
"""
# Show key relationships
core_tables = ['Requests', 'Samples', 'Results', 'Analyses', 'Companies']
shown_relationships = 0
for rel in schema_info.relationships:
# Handle both old and new relationship formats
from_table = rel.get('from_table', rel.get('source_table', ''))
from_column = rel.get('from_column', rel.get('source_column', ''))
to_table = rel.get('to_table', rel.get('target_table', ''))
to_column = rel.get('to_column', rel.get('target_column', ''))
if (from_table in core_tables or to_table in core_tables) and shown_relationships < 10:
summary += f" {from_table}.{from_column} ā {to_table}.{to_column}\n"
shown_relationships += 1
if shown_relationships < len(schema_info.relationships):
summary += f" ... and {len(schema_info.relationships) - shown_relationships} more relationships\n"
summary += f"\nš CORE TABLES (showing largest by row count):\n"
# Show tables sorted by row count
sorted_tables = sorted(schema_info.tables, key=lambda t: t.row_count, reverse=True)[:10]
for table in sorted_tables:
col_count = len(table.columns)
pk_info = f" (PK: {', '.join(table.primary_keys)})" if table.primary_keys else ""
summary += f" {table.name}: {table.row_count:,} rows, {col_count} columns{pk_info}\n"
return summary
Parameters
| Name | Type | Default | Kind |
|---|---|---|---|
bases |
- | - |
Parameter Details
bases: Parameter of type
Return Value
Returns unspecified type
Class Interface
Methods
__init__(self, data_processor, statistical_agent)
Purpose: Initialize with a data processor that has database connection and optional LLM agent
Parameters:
data_processor: Parameterstatistical_agent: Parameter
Returns: None
discover_schema(self, force_refresh) -> DatabaseInfo
Purpose: Discover complete database schema from live connection
Parameters:
force_refresh: Type: bool
Returns: Returns DatabaseInfo
_get_database_info(self) -> Dict[str, str]
Purpose: Get basic database information
Returns: Returns Dict[str, str]
_discover_tables(self) -> List[TableInfo]
Purpose: Discover all tables and their detailed information
Returns: Returns List[TableInfo]
_get_table_columns(self, table_name, schema_name) -> List[Dict[str, Any]]
Purpose: Get detailed column information for a table
Parameters:
table_name: Type: strschema_name: Type: str
Returns: Returns List[Dict[str, Any]]
_get_primary_keys(self, table_name, schema_name) -> List[str]
Purpose: Get primary key columns for a table
Parameters:
table_name: Type: strschema_name: Type: str
Returns: Returns List[str]
_get_foreign_keys(self, table_name, schema_name) -> List[Dict[str, str]]
Purpose: Get foreign key relationships for a table
Parameters:
table_name: Type: strschema_name: Type: str
Returns: Returns List[Dict[str, str]]
_get_table_description(self, table_name, schema_name) -> Optional[str]
Purpose: Get table description using LLM analysis with schema and data samples
Parameters:
table_name: Type: strschema_name: Type: str
Returns: Returns Optional[str]
_generate_llm_table_description(self, table_name, schema_name) -> Optional[str]
Purpose: Generate table description using LLM with schema and data sample
Parameters:
table_name: Type: strschema_name: Type: str
Returns: Returns Optional[str]
_get_table_schema_details(self, table_name, schema_name) -> Optional[Dict]
Purpose: Get detailed schema information for a table
Parameters:
table_name: Type: strschema_name: Type: str
Returns: Returns Optional[Dict]
_get_table_data_sample(self, table_name, schema_name, sample_size) -> Optional[pd.DataFrame]
Purpose: Get a sample of data from the table
Parameters:
table_name: Type: strschema_name: Type: strsample_size: Type: int
Returns: Returns Optional[pd.DataFrame]
_create_table_description_prompt(self, table_name, table_schema, data_sample) -> str
Purpose: Create a prompt for LLM to generate table description
Parameters:
table_name: Type: strtable_schema: Type: Dictdata_sample: Type: Optional[pd.DataFrame]
Returns: Returns str
_extract_description_from_response(self, response) -> str
Purpose: Extract and clean the description from LLM response
Parameters:
response: Type: str
Returns: Returns str
_analyze_table_business_logic(self, table_name) -> str
Purpose: Analyze table's business purpose based on name and categorize it
Parameters:
table_name: Type: str
Returns: Returns str
_discover_relationships(self, tables) -> List[Dict[str, Any]]
Purpose: Discover relationships using column name patterns and data analysis
Parameters:
tables: Type: List[TableInfo]
Returns: Returns List[Dict[str, Any]]
_validate_relationship_with_data(self, from_table, from_column, to_table, to_column) -> float
Purpose: Validate potential relationship by analyzing actual data content
Parameters:
from_table: Type: strfrom_column: Type: strto_table: Type: strto_column: Type: str
Returns: Returns float
_find_cross_table_relationships(self, tables) -> List[Dict[str, Any]]
Purpose: Find relationships by analyzing data content across tables, even with different column names
Parameters:
tables: Type: List[TableInfo]
Returns: Returns List[Dict[str, Any]]
_find_matching_column(self, table, pattern) -> str
Purpose: Find a column in the table that matches the given pattern
Parameters:
table: Type: TableInfopattern: Type: str
Returns: Returns str
_discover_underscore_relationships(self, tables) -> List[Dict[str, Any]]
Purpose: Discover relationships with underscore patterns like Sample_SampleType ā SampleType
Parameters:
tables: Type: List[TableInfo]
Returns: Returns List[Dict[str, Any]]
_find_id_column(self, table) -> str
Purpose: Find the ID column in a table
Parameters:
table: Type: TableInfo
Returns: Returns str
_has_valid_cache(self) -> bool
Purpose: Check if we have a valid cached schema with LLM descriptions when statistical agent is available
Returns: Returns bool
_load_cached_schema(self) -> DatabaseInfo
Purpose: Load schema from cache file
Returns: Returns DatabaseInfo
_calculate_simple_quality_score(self, row_count, columns) -> int
Purpose: Calculate a simple quality score for a table based on basic metrics
Parameters:
row_count: Type: intcolumns: Type: List[Dict[str, Any]]
Returns: Returns int
_get_cached_table_description(self, table_name) -> Optional[str]
Purpose: Get cached table description if available and valid
Parameters:
table_name: Type: str
Returns: Returns Optional[str]
_cache_table_description(self, table_name, description)
Purpose: Cache a table description
Parameters:
table_name: Type: strdescription: Type: str
Returns: None
_get_table_hash(self, table_name) -> str
Purpose: Generate a hash for table name to use as cache key
Parameters:
table_name: Type: str
Returns: Returns str
_cache_schema(self, schema_info)
Purpose: Cache the discovered schema to file
Parameters:
schema_info: Type: DatabaseInfo
Returns: None
get_schema_summary(self, schema_info) -> str
Purpose: Generate a human-readable summary of the discovered schema
Parameters:
schema_info: Type: DatabaseInfo
Returns: Returns str
Required Imports
import json
import logging
import pandas as pd
import hashlib
from typing import Dict
Usage Example
# Example usage:
# result = DynamicSchemaDiscovery(bases)
Similar Components
AI-powered semantic similarity - components with related functionality:
-
class DatabaseSchema 56.4% similar
-
function refresh_database_schema 54.2% similar
-
class DatabaseSchema_v1 52.9% similar
-
class DatabaseInfo 49.2% similar
-
function load_database_schema 48.1% similar