class DataProcessor_v1
Handles data loading, validation, and preprocessing
/tf/active/vicechatdev/smartstat/data_processor.py
25 - 516
moderate
Purpose
Handles data loading, validation, and preprocessing
Source Code
class DataProcessor:
"""Handles data loading, validation, and preprocessing"""
def __init__(self, config: Config):
self.config = config
self.max_rows = config.MAX_DATASET_ROWS
self.max_columns = config.MAX_COLUMNS
self.sql_query_generator = None # Will be initialized when needed
def load_data_from_sql_workflow(self, user_query: str, schema_file: str = None,
connection_config: str = None, statistical_agent=None) -> Tuple[pd.DataFrame, Dict[str, Any]]:
"""
New SQL workflow: Generate SQL query based on user request and database schema,
then execute query to get dataset
"""
try:
# Use defaults if not provided
if not schema_file:
from sql_query_generator import get_default_schema_file
schema_file = get_default_schema_file()
if not connection_config:
from sql_query_generator import get_default_connection_config
conn_config = get_default_connection_config()
else:
conn_config = ConnectionConfig.from_config_file(connection_config)
# Load database schema
schema = DatabaseSchema.from_json(schema_file)
# Initialize SQL query generator with enhanced capabilities
query_generator = SQLQueryGenerator(schema, statistical_agent)
# Generate SQL query based on user request
sql_query, query_metadata = query_generator.generate_sql_query(user_query, self.max_rows)
logger.info(f"Generated SQL query for user request: '{user_query}'")
logger.info(f"SQL Query: {sql_query}")
# Create data source for SQL execution
data_source = DataSource(
source_type=DataSourceType.SQL_QUERY,
sql_connection=conn_config.to_connection_string(),
sql_query=sql_query,
parameters={}
)
# Execute SQL query to get data
df = self._load_from_sql(data_source)
# Prepare comprehensive metadata
metadata = {
'source_type': 'sql_workflow',
'loaded_at': datetime.now().isoformat(),
'original_shape': df.shape,
'user_query': user_query,
'database_name': schema.database_name,
'database_description': schema.description,
'sql_query': sql_query,
'query_explanation': query_metadata.get('explanation', ''),
'connection_server': conn_config.server,
'connection_database': conn_config.database,
'schema_file_used': schema_file,
'is_poulpharm_optimized': schema.database_name.lower() == 'poulpharm',
'sampling_applied': False,
'warnings': []
}
# Validate and process data (same as regular workflow)
df, processing_info = self._validate_and_process(df)
metadata.update(processing_info)
metadata['processed_shape'] = df.shape
logger.info(f"Successfully loaded {df.shape[0]} rows and {df.shape[1]} columns from SQL workflow")
return df, metadata
except Exception as e:
logger.error(f"Error in SQL workflow: {str(e)}")
raise
def load_data(self, data_source: DataSource) -> Tuple[pd.DataFrame, Dict[str, Any]]:
"""
Load data from various sources
Returns: (dataframe, metadata)
"""
metadata = {
'source_type': data_source.source_type.value,
'loaded_at': datetime.now().isoformat(),
'original_shape': None,
'processed_shape': None,
'sampling_applied': False,
'warnings': []
}
try:
if data_source.source_type == DataSourceType.FILE_UPLOAD:
df = self._load_from_file(data_source.file_path)
metadata['file_path'] = data_source.file_path
elif data_source.source_type == DataSourceType.SQL_QUERY:
df = self._load_from_sql(data_source)
metadata['sql_query'] = data_source.sql_query
metadata['connection'] = data_source.sql_connection
else:
raise ValueError(f"Unsupported data source type: {data_source.source_type}")
metadata['original_shape'] = df.shape
# Validate and process data
df, processing_info = self._validate_and_process(df)
metadata.update(processing_info)
metadata['processed_shape'] = df.shape
return df, metadata
except Exception as e:
logger.error(f"Error loading data: {str(e)}")
raise
def _load_from_file(self, file_path: str) -> pd.DataFrame:
"""Load data from uploaded file"""
if not file_path or not os.path.exists(file_path):
raise FileNotFoundError(f"File not found: {file_path}")
file_ext = Path(file_path).suffix.lower()
try:
if file_ext == '.csv':
# First, try to detect the separator by reading a sample
best_df = None
best_shape = (0, 0)
for encoding in ['utf-8', 'latin-1', 'cp1252']:
for sep in [',', ';', '\t', '|']:
try:
# Read just the first few rows to detect structure
sample_df = pd.read_csv(file_path, sep=sep, encoding=encoding, nrows=5)
# Prefer the combination that gives us the most columns and sensible data
if sample_df.shape[1] > best_shape[1]:
# Read the full file with the detected separator and encoding
df = pd.read_csv(file_path, sep=sep, encoding=encoding)
best_df = df
best_shape = df.shape
except (UnicodeDecodeError, pd.errors.EmptyDataError, pd.errors.ParserError):
continue
if best_df is None:
raise ValueError("Unable to parse CSV file with any standard separator or encoding")
df = best_df
elif file_ext in ['.xlsx', '.xls']:
df = pd.read_excel(file_path, engine='openpyxl' if file_ext == '.xlsx' else 'xlrd')
elif file_ext == '.tsv':
df = pd.read_csv(file_path, sep='\t')
elif file_ext == '.txt':
# Use the same robust separator detection as CSV
best_df = None
best_shape = (0, 0)
for encoding in ['utf-8', 'latin-1', 'cp1252']:
for sep in ['\t', ',', ';', '|', ' ']:
try:
# Read just the first few rows to detect structure
sample_df = pd.read_csv(file_path, sep=sep, encoding=encoding, nrows=5)
# Prefer the combination that gives us the most columns
if sample_df.shape[1] > best_shape[1]:
# Read the full file with the detected separator and encoding
df = pd.read_csv(file_path, sep=sep, encoding=encoding)
best_df = df
best_shape = df.shape
except (UnicodeDecodeError, pd.errors.EmptyDataError, pd.errors.ParserError):
continue
if best_df is None:
raise ValueError("Unable to parse TXT file with any standard separator or encoding")
df = best_df
else:
raise ValueError(f"Unsupported file format: {file_ext}")
logger.info(f"Successfully loaded {df.shape[0]} rows and {df.shape[1]} columns from {file_path}")
return df
except Exception as e:
logger.error(f"Error loading file {file_path}: {str(e)}")
raise
def _load_from_sql(self, data_source: DataSource) -> pd.DataFrame:
"""Load data from SQL Server"""
try:
# Parse connection string or build from components
if data_source.sql_connection:
# Use provided connection string
conn_str = data_source.sql_connection
else:
# Build connection string from parameters
params = data_source.parameters or {}
server = params.get('server', 'localhost')
database = params.get('database', '')
username = params.get('username', '')
password = params.get('password', '')
trusted_connection = params.get('trusted_connection', True)
if trusted_connection:
conn_str = f"mssql+pyodbc://{server}/{database}?driver={self.config.MSSQL_DRIVER}&trusted_connection=yes"
else:
conn_str = f"mssql+pyodbc://{username}:{password}@{server}/{database}?driver={self.config.MSSQL_DRIVER}"
# Create engine and execute query
engine = create_engine(conn_str, connect_args={"timeout": self.config.MSSQL_TIMEOUT})
with engine.connect() as connection:
df = pd.read_sql(text(data_source.sql_query), connection)
logger.info(f"Successfully loaded {df.shape[0]} rows and {df.shape[1]} columns from SQL query")
return df
except Exception as e:
logger.error(f"Error executing SQL query: {str(e)}")
raise
def _validate_and_process(self, df: pd.DataFrame) -> Tuple[pd.DataFrame, Dict[str, Any]]:
"""Validate and preprocess the dataframe"""
processing_info = {
'warnings': [],
'sampling_applied': False,
'columns_renamed': [],
'missing_data_summary': {},
'data_types': {}
}
# Check size limits
if df.shape[0] > self.max_rows:
# Sample data if too large
df = df.sample(n=self.max_rows, random_state=42)
processing_info['sampling_applied'] = True
processing_info['warnings'].append(f"Dataset sampled to {self.max_rows} rows due to size limits")
if df.shape[1] > self.max_columns:
processing_info['warnings'].append(f"Dataset has {df.shape[1]} columns, which exceeds recommended limit of {self.max_columns}")
# Clean column names
original_columns = df.columns.tolist()
df.columns = df.columns.str.strip().str.replace(r'[^\w\s]', '_', regex=True).str.replace(r'\s+', '_', regex=True)
renamed_columns = [(orig, new) for orig, new in zip(original_columns, df.columns) if orig != new]
if renamed_columns:
processing_info['columns_renamed'] = renamed_columns
# Detect and convert data types
for col in df.columns:
# Try to convert to numeric if possible
if df[col].dtype == 'object':
# Check if it's numeric (handle European decimal format with commas)
try:
# First try normal numeric conversion
pd.to_numeric(df[col], errors='raise')
df[col] = pd.to_numeric(df[col])
except:
# Try converting European decimal format (comma as decimal separator)
try:
# Check if column contains comma-decimal numbers
sample_vals = df[col].dropna().astype(str).head(10)
if any(',' in val for val in sample_vals):
# Convert commas to periods and try numeric conversion
df_converted = df[col].astype(str).str.replace(',', '.', regex=False)
pd.to_numeric(df_converted, errors='raise')
df[col] = pd.to_numeric(df_converted)
else:
raise ValueError("Not numeric")
except:
# Check if it's datetime - only if it looks like date strings
try:
# Only try datetime conversion if values look like dates
sample_vals = df[col].dropna().astype(str).head(5)
if any(any(sep in val for sep in ['/', '-', '.']) and
any(val.count(sep) >= 2 for sep in ['/', '-', '.'])
for val in sample_vals):
# Try different date formats commonly used in CSV files
for date_format in ['%Y-%m-%d', '%d/%m/%Y', '%m/%d/%Y', '%Y/%m/%d']:
try:
df[col] = pd.to_datetime(df[col], format=date_format, errors='raise')
break
except (ValueError, TypeError):
continue
else:
# If no explicit format worked, try with dayfirst=True for European dates
try:
df[col] = pd.to_datetime(df[col], dayfirst=True, errors='raise')
except (ValueError, TypeError):
# Try general inference as fallback
pd.to_datetime(df[col], errors='raise')
df[col] = pd.to_datetime(df[col], infer_datetime_format=True)
else:
raise ValueError("Not datetime")
except:
# Keep as string/categorical
pass
# Record final data types
processing_info['data_types'] = {col: str(dtype) for col, dtype in df.dtypes.items()}
# Missing data summary
missing_counts = df.isnull().sum()
processing_info['missing_data_summary'] = {
col: {'count': int(count), 'percentage': float(count / len(df) * 100)}
for col, count in missing_counts.items() if count > 0
}
# Basic data quality checks
if df.empty:
raise ValueError("Dataset is empty")
if df.shape[1] == 0:
raise ValueError("Dataset has no columns")
# Check for completely missing columns
completely_missing = [col for col in df.columns if df[col].isnull().all()]
if completely_missing:
processing_info['warnings'].append(f"Columns with all missing values: {completely_missing}")
# Check for duplicate rows
duplicates = df.duplicated().sum()
if duplicates > 0:
processing_info['warnings'].append(f"Found {duplicates} duplicate rows")
return df, processing_info
def get_data_summary(self, df: pd.DataFrame) -> Dict[str, Any]:
"""Generate comprehensive data summary"""
summary = {
'shape': df.shape,
'column_info': {},
'numeric_summary': {},
'categorical_summary': {},
'missing_data': {},
'data_quality': {}
}
# Column information
for col in df.columns:
col_info = {
'dtype': str(df[col].dtype),
'non_null_count': int(df[col].count()),
'null_count': int(df[col].isnull().sum()),
'null_percentage': float(df[col].isnull().sum() / len(df) * 100)
}
if pd.api.types.is_numeric_dtype(df[col]):
col_info.update({
'mean': float(df[col].mean()) if not df[col].empty else None,
'std': float(df[col].std()) if not df[col].empty else None,
'min': float(df[col].min()) if not df[col].empty else None,
'max': float(df[col].max()) if not df[col].empty else None,
'unique_count': int(df[col].nunique())
})
else:
col_info.update({
'unique_count': int(df[col].nunique()),
'most_frequent': str(df[col].mode().iloc[0]) if not df[col].mode().empty else None
})
summary['column_info'][col] = col_info
# Numeric columns summary
numeric_cols = df.select_dtypes(include=[np.number]).columns
if len(numeric_cols) > 0:
summary['numeric_summary'] = df[numeric_cols].describe().to_dict()
# Categorical columns summary
categorical_cols = df.select_dtypes(include=['object', 'category']).columns
if len(categorical_cols) > 0:
cat_summary = {}
for col in categorical_cols:
value_counts = df[col].value_counts().head(10)
cat_summary[col] = {
'unique_count': int(df[col].nunique()),
'top_values': value_counts.to_dict()
}
summary['categorical_summary'] = cat_summary
# Missing data overview
missing_counts = df.isnull().sum()
summary['missing_data'] = {
'total_missing': int(missing_counts.sum()),
'columns_with_missing': len(missing_counts[missing_counts > 0]),
'missing_by_column': missing_counts[missing_counts > 0].to_dict()
}
# Data quality metrics
summary['data_quality'] = {
'duplicate_rows': int(df.duplicated().sum()),
'completely_missing_columns': [col for col in df.columns if df[col].isnull().all()],
'constant_columns': [col for col in df.columns if df[col].nunique() <= 1],
'high_cardinality_columns': [col for col in categorical_cols if df[col].nunique() > len(df) * 0.9]
}
return summary
def validate_columns_for_analysis(self, df: pd.DataFrame,
target_vars: List[str] = None,
grouping_vars: List[str] = None) -> Dict[str, Any]:
"""Validate that specified columns exist and are appropriate for analysis"""
validation = {
'valid': True,
'errors': [],
'warnings': [],
'column_types': {}
}
all_columns = set(df.columns)
# Check target variables
if target_vars:
for var in target_vars:
if var not in all_columns:
validation['errors'].append(f"Target variable '{var}' not found in dataset")
validation['valid'] = False
else:
validation['column_types'][var] = {
'dtype': str(df[var].dtype),
'is_numeric': pd.api.types.is_numeric_dtype(df[var]),
'unique_count': int(df[var].nunique()),
'missing_count': int(df[var].isnull().sum())
}
# Check grouping variables
if grouping_vars:
for var in grouping_vars:
if var not in all_columns:
validation['errors'].append(f"Grouping variable '{var}' not found in dataset")
validation['valid'] = False
else:
unique_count = df[var].nunique()
if unique_count > 20:
validation['warnings'].append(f"Grouping variable '{var}' has {unique_count} unique values, which may be too many for effective grouping")
validation['column_types'][var] = {
'dtype': str(df[var].dtype),
'is_numeric': pd.api.types.is_numeric_dtype(df[var]),
'unique_count': unique_count,
'missing_count': int(df[var].isnull().sum())
}
return validation
def suggest_analysis_variables(self, df: pd.DataFrame) -> Dict[str, List[str]]:
"""Suggest appropriate variables for different types of analysis"""
suggestions = {
'numeric_continuous': [],
'numeric_discrete': [],
'categorical': [],
'binary': [],
'datetime': [],
'id_columns': []
}
for col in df.columns:
unique_count = df[col].nunique()
total_count = len(df)
if pd.api.types.is_datetime64_any_dtype(df[col]):
suggestions['datetime'].append(col)
elif pd.api.types.is_numeric_dtype(df[col]):
# Check if it's likely an ID column
if unique_count == total_count or col.lower() in ['id', 'index', 'key']:
suggestions['id_columns'].append(col)
elif unique_count == 2:
suggestions['binary'].append(col)
elif unique_count <= 20:
suggestions['numeric_discrete'].append(col)
else:
suggestions['numeric_continuous'].append(col)
else:
# Check if it's likely an ID column
if unique_count == total_count or col.lower() in ['id', 'index', 'key']:
suggestions['id_columns'].append(col)
elif unique_count == 2:
suggestions['binary'].append(col)
else:
suggestions['categorical'].append(col)
return suggestions
Parameters
| Name | Type | Default | Kind |
|---|---|---|---|
bases |
- | - |
Parameter Details
bases: Parameter of type
Return Value
Returns unspecified type
Class Interface
Methods
__init__(self, config)
Purpose: Internal method: init
Parameters:
config: Type: Config
Returns: None
load_data_from_sql_workflow(self, user_query, schema_file, connection_config, statistical_agent) -> Tuple[pd.DataFrame, Dict[str, Any]]
Purpose: New SQL workflow: Generate SQL query based on user request and database schema, then execute query to get dataset
Parameters:
user_query: Type: strschema_file: Type: strconnection_config: Type: strstatistical_agent: Parameter
Returns: Returns Tuple[pd.DataFrame, Dict[str, Any]]
load_data(self, data_source) -> Tuple[pd.DataFrame, Dict[str, Any]]
Purpose: Load data from various sources Returns: (dataframe, metadata)
Parameters:
data_source: Type: DataSource
Returns: Returns Tuple[pd.DataFrame, Dict[str, Any]]
_load_from_file(self, file_path) -> pd.DataFrame
Purpose: Load data from uploaded file
Parameters:
file_path: Type: str
Returns: Returns pd.DataFrame
_load_from_sql(self, data_source) -> pd.DataFrame
Purpose: Load data from SQL Server
Parameters:
data_source: Type: DataSource
Returns: Returns pd.DataFrame
_validate_and_process(self, df) -> Tuple[pd.DataFrame, Dict[str, Any]]
Purpose: Validate and preprocess the dataframe
Parameters:
df: Type: pd.DataFrame
Returns: Returns Tuple[pd.DataFrame, Dict[str, Any]]
get_data_summary(self, df) -> Dict[str, Any]
Purpose: Generate comprehensive data summary
Parameters:
df: Type: pd.DataFrame
Returns: Returns Dict[str, Any]
validate_columns_for_analysis(self, df, target_vars, grouping_vars) -> Dict[str, Any]
Purpose: Validate that specified columns exist and are appropriate for analysis
Parameters:
df: Type: pd.DataFrametarget_vars: Type: List[str]grouping_vars: Type: List[str]
Returns: Returns Dict[str, Any]
suggest_analysis_variables(self, df) -> Dict[str, List[str]]
Purpose: Suggest appropriate variables for different types of analysis
Parameters:
df: Type: pd.DataFrame
Returns: Returns Dict[str, List[str]]
Required Imports
import pandas as pd
import numpy as np
import pyodbc
import sqlalchemy
from sqlalchemy import create_engine
Usage Example
# Example usage:
# result = DataProcessor(bases)
Tags
Similar Components
AI-powered semantic similarity - components with related functionality:
-
class DataProcessor 97.2% similar
-
class DocumentProcessor_v6 58.0% similar
-
class DocumentProcessor_v7 57.7% similar
-
class DocumentProcessor 55.4% similar
-
class DocumentProcessor_v5 54.5% similar