🔍 Code Extractor

class DataProcessor_v1

Maturity: 26

Handles data loading, validation, and preprocessing

File:
/tf/active/vicechatdev/smartstat/data_processor.py
Lines:
25 - 516
Complexity:
moderate

Purpose

Handles data loading, validation, and preprocessing

Source Code

class DataProcessor:
    """Handles data loading, validation, and preprocessing"""
    
    def __init__(self, config: Config):
        self.config = config
        self.max_rows = config.MAX_DATASET_ROWS
        self.max_columns = config.MAX_COLUMNS
        self.sql_query_generator = None  # Will be initialized when needed
        
    def load_data_from_sql_workflow(self, user_query: str, schema_file: str = None, 
                                   connection_config: str = None, statistical_agent=None) -> Tuple[pd.DataFrame, Dict[str, Any]]:
        """
        New SQL workflow: Generate SQL query based on user request and database schema,
        then execute query to get dataset
        """
        try:
            # Use defaults if not provided
            if not schema_file:
                from sql_query_generator import get_default_schema_file
                schema_file = get_default_schema_file()
                
            if not connection_config:
                from sql_query_generator import get_default_connection_config
                conn_config = get_default_connection_config()
            else:
                conn_config = ConnectionConfig.from_config_file(connection_config)
            
            # Load database schema
            schema = DatabaseSchema.from_json(schema_file)
            
            # Initialize SQL query generator with enhanced capabilities
            query_generator = SQLQueryGenerator(schema, statistical_agent)
            
            # Generate SQL query based on user request
            sql_query, query_metadata = query_generator.generate_sql_query(user_query, self.max_rows)
            
            logger.info(f"Generated SQL query for user request: '{user_query}'")
            logger.info(f"SQL Query: {sql_query}")
            
            # Create data source for SQL execution
            data_source = DataSource(
                source_type=DataSourceType.SQL_QUERY,
                sql_connection=conn_config.to_connection_string(),
                sql_query=sql_query,
                parameters={}
            )
            
            # Execute SQL query to get data
            df = self._load_from_sql(data_source)
            
            # Prepare comprehensive metadata
            metadata = {
                'source_type': 'sql_workflow',
                'loaded_at': datetime.now().isoformat(),
                'original_shape': df.shape,
                'user_query': user_query,
                'database_name': schema.database_name,
                'database_description': schema.description,
                'sql_query': sql_query,
                'query_explanation': query_metadata.get('explanation', ''),
                'connection_server': conn_config.server,
                'connection_database': conn_config.database,
                'schema_file_used': schema_file,
                'is_poulpharm_optimized': schema.database_name.lower() == 'poulpharm',
                'sampling_applied': False,
                'warnings': []
            }
            
            # Validate and process data (same as regular workflow)
            df, processing_info = self._validate_and_process(df)
            metadata.update(processing_info)
            metadata['processed_shape'] = df.shape
            
            logger.info(f"Successfully loaded {df.shape[0]} rows and {df.shape[1]} columns from SQL workflow")
            
            return df, metadata
            
        except Exception as e:
            logger.error(f"Error in SQL workflow: {str(e)}")
            raise
        
    def load_data(self, data_source: DataSource) -> Tuple[pd.DataFrame, Dict[str, Any]]:
        """
        Load data from various sources
        Returns: (dataframe, metadata)
        """
        metadata = {
            'source_type': data_source.source_type.value,
            'loaded_at': datetime.now().isoformat(),
            'original_shape': None,
            'processed_shape': None,
            'sampling_applied': False,
            'warnings': []
        }
        
        try:
            if data_source.source_type == DataSourceType.FILE_UPLOAD:
                df = self._load_from_file(data_source.file_path)
                metadata['file_path'] = data_source.file_path
                
            elif data_source.source_type == DataSourceType.SQL_QUERY:
                df = self._load_from_sql(data_source)
                metadata['sql_query'] = data_source.sql_query
                metadata['connection'] = data_source.sql_connection
                
            else:
                raise ValueError(f"Unsupported data source type: {data_source.source_type}")
            
            metadata['original_shape'] = df.shape
            
            # Validate and process data
            df, processing_info = self._validate_and_process(df)
            metadata.update(processing_info)
            metadata['processed_shape'] = df.shape
            
            return df, metadata
            
        except Exception as e:
            logger.error(f"Error loading data: {str(e)}")
            raise
    
    def _load_from_file(self, file_path: str) -> pd.DataFrame:
        """Load data from uploaded file"""
        if not file_path or not os.path.exists(file_path):
            raise FileNotFoundError(f"File not found: {file_path}")
        
        file_ext = Path(file_path).suffix.lower()
        
        try:
            if file_ext == '.csv':
                # First, try to detect the separator by reading a sample
                best_df = None
                best_shape = (0, 0)
                
                for encoding in ['utf-8', 'latin-1', 'cp1252']:
                    for sep in [',', ';', '\t', '|']:
                        try:
                            # Read just the first few rows to detect structure
                            sample_df = pd.read_csv(file_path, sep=sep, encoding=encoding, nrows=5)
                            
                            # Prefer the combination that gives us the most columns and sensible data
                            if sample_df.shape[1] > best_shape[1]:
                                # Read the full file with the detected separator and encoding
                                df = pd.read_csv(file_path, sep=sep, encoding=encoding)
                                best_df = df
                                best_shape = df.shape
                                
                        except (UnicodeDecodeError, pd.errors.EmptyDataError, pd.errors.ParserError):
                            continue
                
                if best_df is None:
                    raise ValueError("Unable to parse CSV file with any standard separator or encoding")
                
                df = best_df
                        
            elif file_ext in ['.xlsx', '.xls']:
                df = pd.read_excel(file_path, engine='openpyxl' if file_ext == '.xlsx' else 'xlrd')
                
            elif file_ext == '.tsv':
                df = pd.read_csv(file_path, sep='\t')
                
            elif file_ext == '.txt':
                # Use the same robust separator detection as CSV
                best_df = None
                best_shape = (0, 0)
                
                for encoding in ['utf-8', 'latin-1', 'cp1252']:
                    for sep in ['\t', ',', ';', '|', ' ']:
                        try:
                            # Read just the first few rows to detect structure
                            sample_df = pd.read_csv(file_path, sep=sep, encoding=encoding, nrows=5)
                            
                            # Prefer the combination that gives us the most columns
                            if sample_df.shape[1] > best_shape[1]:
                                # Read the full file with the detected separator and encoding
                                df = pd.read_csv(file_path, sep=sep, encoding=encoding)
                                best_df = df
                                best_shape = df.shape
                                
                        except (UnicodeDecodeError, pd.errors.EmptyDataError, pd.errors.ParserError):
                            continue
                
                if best_df is None:
                    raise ValueError("Unable to parse TXT file with any standard separator or encoding")
                
                df = best_df
                
            else:
                raise ValueError(f"Unsupported file format: {file_ext}")
            
            logger.info(f"Successfully loaded {df.shape[0]} rows and {df.shape[1]} columns from {file_path}")
            return df
            
        except Exception as e:
            logger.error(f"Error loading file {file_path}: {str(e)}")
            raise
    
    def _load_from_sql(self, data_source: DataSource) -> pd.DataFrame:
        """Load data from SQL Server"""
        try:
            # Parse connection string or build from components
            if data_source.sql_connection:
                # Use provided connection string
                conn_str = data_source.sql_connection
            else:
                # Build connection string from parameters
                params = data_source.parameters or {}
                server = params.get('server', 'localhost')
                database = params.get('database', '')
                username = params.get('username', '')
                password = params.get('password', '')
                trusted_connection = params.get('trusted_connection', True)
                
                if trusted_connection:
                    conn_str = f"mssql+pyodbc://{server}/{database}?driver={self.config.MSSQL_DRIVER}&trusted_connection=yes"
                else:
                    conn_str = f"mssql+pyodbc://{username}:{password}@{server}/{database}?driver={self.config.MSSQL_DRIVER}"
            
            # Create engine and execute query
            engine = create_engine(conn_str, connect_args={"timeout": self.config.MSSQL_TIMEOUT})
            
            with engine.connect() as connection:
                df = pd.read_sql(text(data_source.sql_query), connection)
            
            logger.info(f"Successfully loaded {df.shape[0]} rows and {df.shape[1]} columns from SQL query")
            return df
            
        except Exception as e:
            logger.error(f"Error executing SQL query: {str(e)}")
            raise
    
    def _validate_and_process(self, df: pd.DataFrame) -> Tuple[pd.DataFrame, Dict[str, Any]]:
        """Validate and preprocess the dataframe"""
        processing_info = {
            'warnings': [],
            'sampling_applied': False,
            'columns_renamed': [],
            'missing_data_summary': {},
            'data_types': {}
        }
        
        # Check size limits
        if df.shape[0] > self.max_rows:
            # Sample data if too large
            df = df.sample(n=self.max_rows, random_state=42)
            processing_info['sampling_applied'] = True
            processing_info['warnings'].append(f"Dataset sampled to {self.max_rows} rows due to size limits")
        
        if df.shape[1] > self.max_columns:
            processing_info['warnings'].append(f"Dataset has {df.shape[1]} columns, which exceeds recommended limit of {self.max_columns}")
        
        # Clean column names
        original_columns = df.columns.tolist()
        df.columns = df.columns.str.strip().str.replace(r'[^\w\s]', '_', regex=True).str.replace(r'\s+', '_', regex=True)
        renamed_columns = [(orig, new) for orig, new in zip(original_columns, df.columns) if orig != new]
        if renamed_columns:
            processing_info['columns_renamed'] = renamed_columns
        
        # Detect and convert data types
        for col in df.columns:
            # Try to convert to numeric if possible
            if df[col].dtype == 'object':
                # Check if it's numeric (handle European decimal format with commas)
                try:
                    # First try normal numeric conversion
                    pd.to_numeric(df[col], errors='raise')
                    df[col] = pd.to_numeric(df[col])
                except:
                    # Try converting European decimal format (comma as decimal separator)
                    try:
                        # Check if column contains comma-decimal numbers
                        sample_vals = df[col].dropna().astype(str).head(10)
                        if any(',' in val for val in sample_vals):
                            # Convert commas to periods and try numeric conversion
                            df_converted = df[col].astype(str).str.replace(',', '.', regex=False)
                            pd.to_numeric(df_converted, errors='raise')
                            df[col] = pd.to_numeric(df_converted)
                        else:
                            raise ValueError("Not numeric")
                    except:
                        # Check if it's datetime - only if it looks like date strings
                        try:
                            # Only try datetime conversion if values look like dates
                            sample_vals = df[col].dropna().astype(str).head(5)
                            if any(any(sep in val for sep in ['/', '-', '.']) and 
                                  any(val.count(sep) >= 2 for sep in ['/', '-', '.'])
                                  for val in sample_vals):
                                # Try different date formats commonly used in CSV files
                                for date_format in ['%Y-%m-%d', '%d/%m/%Y', '%m/%d/%Y', '%Y/%m/%d']:
                                    try:
                                        df[col] = pd.to_datetime(df[col], format=date_format, errors='raise')
                                        break
                                    except (ValueError, TypeError):
                                        continue
                                else:
                                    # If no explicit format worked, try with dayfirst=True for European dates
                                    try:
                                        df[col] = pd.to_datetime(df[col], dayfirst=True, errors='raise')
                                    except (ValueError, TypeError):
                                        # Try general inference as fallback
                                        pd.to_datetime(df[col], errors='raise')
                                        df[col] = pd.to_datetime(df[col], infer_datetime_format=True)
                            else:
                                raise ValueError("Not datetime")
                        except:
                            # Keep as string/categorical
                            pass
        
        # Record final data types
        processing_info['data_types'] = {col: str(dtype) for col, dtype in df.dtypes.items()}
        
        # Missing data summary
        missing_counts = df.isnull().sum()
        processing_info['missing_data_summary'] = {
            col: {'count': int(count), 'percentage': float(count / len(df) * 100)}
            for col, count in missing_counts.items() if count > 0
        }
        
        # Basic data quality checks
        if df.empty:
            raise ValueError("Dataset is empty")
        
        if df.shape[1] == 0:
            raise ValueError("Dataset has no columns")
        
        # Check for completely missing columns
        completely_missing = [col for col in df.columns if df[col].isnull().all()]
        if completely_missing:
            processing_info['warnings'].append(f"Columns with all missing values: {completely_missing}")
        
        # Check for duplicate rows
        duplicates = df.duplicated().sum()
        if duplicates > 0:
            processing_info['warnings'].append(f"Found {duplicates} duplicate rows")
        
        return df, processing_info
    
    def get_data_summary(self, df: pd.DataFrame) -> Dict[str, Any]:
        """Generate comprehensive data summary"""
        summary = {
            'shape': df.shape,
            'column_info': {},
            'numeric_summary': {},
            'categorical_summary': {},
            'missing_data': {},
            'data_quality': {}
        }
        
        # Column information
        for col in df.columns:
            col_info = {
                'dtype': str(df[col].dtype),
                'non_null_count': int(df[col].count()),
                'null_count': int(df[col].isnull().sum()),
                'null_percentage': float(df[col].isnull().sum() / len(df) * 100)
            }
            
            if pd.api.types.is_numeric_dtype(df[col]):
                col_info.update({
                    'mean': float(df[col].mean()) if not df[col].empty else None,
                    'std': float(df[col].std()) if not df[col].empty else None,
                    'min': float(df[col].min()) if not df[col].empty else None,
                    'max': float(df[col].max()) if not df[col].empty else None,
                    'unique_count': int(df[col].nunique())
                })
            else:
                col_info.update({
                    'unique_count': int(df[col].nunique()),
                    'most_frequent': str(df[col].mode().iloc[0]) if not df[col].mode().empty else None
                })
            
            summary['column_info'][col] = col_info
        
        # Numeric columns summary
        numeric_cols = df.select_dtypes(include=[np.number]).columns
        if len(numeric_cols) > 0:
            summary['numeric_summary'] = df[numeric_cols].describe().to_dict()
        
        # Categorical columns summary
        categorical_cols = df.select_dtypes(include=['object', 'category']).columns
        if len(categorical_cols) > 0:
            cat_summary = {}
            for col in categorical_cols:
                value_counts = df[col].value_counts().head(10)
                cat_summary[col] = {
                    'unique_count': int(df[col].nunique()),
                    'top_values': value_counts.to_dict()
                }
            summary['categorical_summary'] = cat_summary
        
        # Missing data overview
        missing_counts = df.isnull().sum()
        summary['missing_data'] = {
            'total_missing': int(missing_counts.sum()),
            'columns_with_missing': len(missing_counts[missing_counts > 0]),
            'missing_by_column': missing_counts[missing_counts > 0].to_dict()
        }
        
        # Data quality metrics
        summary['data_quality'] = {
            'duplicate_rows': int(df.duplicated().sum()),
            'completely_missing_columns': [col for col in df.columns if df[col].isnull().all()],
            'constant_columns': [col for col in df.columns if df[col].nunique() <= 1],
            'high_cardinality_columns': [col for col in categorical_cols if df[col].nunique() > len(df) * 0.9]
        }
        
        return summary
    
    def validate_columns_for_analysis(self, df: pd.DataFrame, 
                                    target_vars: List[str] = None,
                                    grouping_vars: List[str] = None) -> Dict[str, Any]:
        """Validate that specified columns exist and are appropriate for analysis"""
        validation = {
            'valid': True,
            'errors': [],
            'warnings': [],
            'column_types': {}
        }
        
        all_columns = set(df.columns)
        
        # Check target variables
        if target_vars:
            for var in target_vars:
                if var not in all_columns:
                    validation['errors'].append(f"Target variable '{var}' not found in dataset")
                    validation['valid'] = False
                else:
                    validation['column_types'][var] = {
                        'dtype': str(df[var].dtype),
                        'is_numeric': pd.api.types.is_numeric_dtype(df[var]),
                        'unique_count': int(df[var].nunique()),
                        'missing_count': int(df[var].isnull().sum())
                    }
        
        # Check grouping variables
        if grouping_vars:
            for var in grouping_vars:
                if var not in all_columns:
                    validation['errors'].append(f"Grouping variable '{var}' not found in dataset")
                    validation['valid'] = False
                else:
                    unique_count = df[var].nunique()
                    if unique_count > 20:
                        validation['warnings'].append(f"Grouping variable '{var}' has {unique_count} unique values, which may be too many for effective grouping")
                    
                    validation['column_types'][var] = {
                        'dtype': str(df[var].dtype),
                        'is_numeric': pd.api.types.is_numeric_dtype(df[var]),
                        'unique_count': unique_count,
                        'missing_count': int(df[var].isnull().sum())
                    }
        
        return validation
    
    def suggest_analysis_variables(self, df: pd.DataFrame) -> Dict[str, List[str]]:
        """Suggest appropriate variables for different types of analysis"""
        suggestions = {
            'numeric_continuous': [],
            'numeric_discrete': [],
            'categorical': [],
            'binary': [],
            'datetime': [],
            'id_columns': []
        }
        
        for col in df.columns:
            unique_count = df[col].nunique()
            total_count = len(df)
            
            if pd.api.types.is_datetime64_any_dtype(df[col]):
                suggestions['datetime'].append(col)
            elif pd.api.types.is_numeric_dtype(df[col]):
                # Check if it's likely an ID column
                if unique_count == total_count or col.lower() in ['id', 'index', 'key']:
                    suggestions['id_columns'].append(col)
                elif unique_count == 2:
                    suggestions['binary'].append(col)
                elif unique_count <= 20:
                    suggestions['numeric_discrete'].append(col)
                else:
                    suggestions['numeric_continuous'].append(col)
            else:
                # Check if it's likely an ID column
                if unique_count == total_count or col.lower() in ['id', 'index', 'key']:
                    suggestions['id_columns'].append(col)
                elif unique_count == 2:
                    suggestions['binary'].append(col)
                else:
                    suggestions['categorical'].append(col)
        
        return suggestions

Parameters

Name Type Default Kind
bases - -

Parameter Details

bases: Parameter of type

Return Value

Returns unspecified type

Class Interface

Methods

__init__(self, config)

Purpose: Internal method: init

Parameters:

  • config: Type: Config

Returns: None

load_data_from_sql_workflow(self, user_query, schema_file, connection_config, statistical_agent) -> Tuple[pd.DataFrame, Dict[str, Any]]

Purpose: New SQL workflow: Generate SQL query based on user request and database schema, then execute query to get dataset

Parameters:

  • user_query: Type: str
  • schema_file: Type: str
  • connection_config: Type: str
  • statistical_agent: Parameter

Returns: Returns Tuple[pd.DataFrame, Dict[str, Any]]

load_data(self, data_source) -> Tuple[pd.DataFrame, Dict[str, Any]]

Purpose: Load data from various sources Returns: (dataframe, metadata)

Parameters:

  • data_source: Type: DataSource

Returns: Returns Tuple[pd.DataFrame, Dict[str, Any]]

_load_from_file(self, file_path) -> pd.DataFrame

Purpose: Load data from uploaded file

Parameters:

  • file_path: Type: str

Returns: Returns pd.DataFrame

_load_from_sql(self, data_source) -> pd.DataFrame

Purpose: Load data from SQL Server

Parameters:

  • data_source: Type: DataSource

Returns: Returns pd.DataFrame

_validate_and_process(self, df) -> Tuple[pd.DataFrame, Dict[str, Any]]

Purpose: Validate and preprocess the dataframe

Parameters:

  • df: Type: pd.DataFrame

Returns: Returns Tuple[pd.DataFrame, Dict[str, Any]]

get_data_summary(self, df) -> Dict[str, Any]

Purpose: Generate comprehensive data summary

Parameters:

  • df: Type: pd.DataFrame

Returns: Returns Dict[str, Any]

validate_columns_for_analysis(self, df, target_vars, grouping_vars) -> Dict[str, Any]

Purpose: Validate that specified columns exist and are appropriate for analysis

Parameters:

  • df: Type: pd.DataFrame
  • target_vars: Type: List[str]
  • grouping_vars: Type: List[str]

Returns: Returns Dict[str, Any]

suggest_analysis_variables(self, df) -> Dict[str, List[str]]

Purpose: Suggest appropriate variables for different types of analysis

Parameters:

  • df: Type: pd.DataFrame

Returns: Returns Dict[str, List[str]]

Required Imports

import pandas as pd
import numpy as np
import pyodbc
import sqlalchemy
from sqlalchemy import create_engine

Usage Example

# Example usage:
# result = DataProcessor(bases)

Similar Components

AI-powered semantic similarity - components with related functionality:

  • class DataProcessor 97.2% similar

    Handles data loading, validation, and preprocessing

    From: /tf/active/vicechatdev/full_smartstat/data_processor.py
  • class DocumentProcessor_v6 58.0% similar

    Lightweight document processor for chat upload functionality

    From: /tf/active/vicechatdev/vice_ai/document_processor.py
  • class DocumentProcessor_v7 57.7% similar

    Process different document types for indexing

    From: /tf/active/vicechatdev/docchat/document_indexer.py
  • class DocumentProcessor 55.4% similar

    A comprehensive document processing class that converts documents to PDF, adds audit trails, applies security features (watermarks, signatures, hashing), and optionally converts to PDF/A format with document protection.

    From: /tf/active/vicechatdev/document_auditor/src/document_processor.py
  • class DocumentProcessor_v5 54.5% similar

    Process different document types for RAG context extraction

    From: /tf/active/vicechatdev/offline_docstore_multi.py
← Back to Browse