🔍 Code Extractor

class DataProcessor

Maturity: 26

Handles data loading, validation, and preprocessing

File:
/tf/active/vicechatdev/full_smartstat/data_processor.py
Lines:
25 - 637
Complexity:
moderate

Purpose

Handles data loading, validation, and preprocessing

Source Code

class DataProcessor:
    """Handles data loading, validation, and preprocessing"""
    
    def __init__(self, config: Config):
        self.config = config
        self.max_rows = config.MAX_DATASET_ROWS
        self.max_columns = config.MAX_COLUMNS
        self.sql_query_generator = None  # Will be initialized when needed
        
    def load_data_from_sql_workflow(self, user_query: str, schema_file: str = None, 
                                   connection_config: str = None, statistical_agent=None) -> Tuple[pd.DataFrame, Dict[str, Any]]:
        """
        New SQL workflow: Generate SQL query based on user request and database schema,
        then execute query to get dataset
        """
        try:
            # Use defaults if not provided
            if not schema_file:
                from sql_query_generator import get_default_schema_file
                schema_file = get_default_schema_file()
                
            if not connection_config:
                from sql_query_generator import get_default_connection_config
                conn_config = get_default_connection_config()
            else:
                conn_config = ConnectionConfig.from_config_file(connection_config)
            
            # Load database schema
            schema = DatabaseSchema.from_json(schema_file)
            
            # Initialize SQL query generator with enhanced capabilities
            query_generator = SQLQueryGenerator(schema, statistical_agent)
            
            # Generate SQL query based on user request
            sql_query, query_metadata = query_generator.generate_sql_query(user_query, self.max_rows)
            
            logger.info(f"Generated SQL query for user request: '{user_query}'")
            logger.info(f"SQL Query: {sql_query}")
            
            # Create data source for SQL execution
            data_source = DataSource(
                source_type=DataSourceType.SQL_QUERY,
                sql_connection=conn_config.to_connection_string(),
                sql_query=sql_query,
                parameters={}
            )
            
            # Execute SQL query to get data
            df = self._load_from_sql(data_source)
            
            # Handle empty results with diagnostic information
            diagnostic_info = {}
            if df.empty:
                logger.warning(f"Main query returned no results. Running diagnostic query...")
                
                # Generate and run diagnostic query
                diagnostic_sql, diagnostic_explanation = query_generator.generate_diagnostic_query(
                    user_query, query_metadata.get('detected_intent', {})
                )
                
                diagnostic_data_source = DataSource(
                    source_type=DataSourceType.SQL_QUERY,
                    sql_connection=conn_config.to_connection_string(),
                    sql_query=diagnostic_sql,
                    parameters={}
                )
                
                try:
                    diagnostic_df = self._load_from_sql(diagnostic_data_source)
                    diagnostic_info = {
                        'diagnostic_sql': diagnostic_sql,
                        'diagnostic_explanation': diagnostic_explanation,
                        'diagnostic_results': diagnostic_df.to_dict('records') if not diagnostic_df.empty else []
                    }
                    logger.info(f"Diagnostic query returned {len(diagnostic_info['diagnostic_results'])} rows")
                except Exception as e:
                    logger.error(f"Diagnostic query failed: {str(e)}")
                    diagnostic_info = {'diagnostic_error': str(e)}
            
            # Prepare comprehensive metadata
            metadata = {
                'source_type': 'sql_workflow',
                'loaded_at': datetime.now().isoformat(),
                'original_shape': df.shape,
                'user_query': user_query,
                'database_name': schema.database_name,
                'database_description': schema.description,
                'sql_query': sql_query,
                'query_explanation': query_metadata.get('explanation', ''),
                'connection_server': conn_config.server,
                'connection_database': conn_config.database,
                'schema_file_used': schema_file,
                'is_poulpharm_optimized': schema.database_name.lower() == 'poulpharm',
                'sampling_applied': False,
                'warnings': [],
                'query_metadata': query_metadata,
                **diagnostic_info
            }
            
            # Validate and process data (same as regular workflow)
            # Special handling for empty datasets - don't fail completely
            if df.empty and df.shape[1] > 0:
                # We have column structure but no data - this is informative
                logger.warning(f"Query returned no data rows but has {df.shape[1]} columns")
                metadata['warnings'].append("Query executed successfully but returned no data rows. This may indicate no matching records for the specified criteria.")
                
                # Create a minimal processing info for empty datasets
                processing_info = {
                    'warnings': metadata['warnings'],
                    'sampling_applied': False,
                    'columns_renamed': [],
                    'missing_data_summary': {},
                    'data_types': {col: str(dtype) for col, dtype in df.dtypes.items()},
                    'empty_dataset': True
                }
                metadata.update(processing_info)
            else:
                # Normal validation for non-empty datasets
                df, processing_info = self._validate_and_process(df)
                metadata.update(processing_info)
            
            metadata['processed_shape'] = df.shape
            
            logger.info(f"Successfully loaded {df.shape[0]} rows and {df.shape[1]} columns from SQL workflow")
            
            return df, metadata
            
        except Exception as e:
            logger.error(f"Error in SQL workflow: {str(e)}")
            raise
        
    def load_data(self, data_source: DataSource) -> Tuple[pd.DataFrame, Dict[str, Any]]:
        """
        Load data from various sources
        Returns: (dataframe, metadata)
        """
        metadata = {
            'source_type': data_source.source_type.value,
            'loaded_at': datetime.now().isoformat(),
            'original_shape': None,
            'processed_shape': None,
            'sampling_applied': False,
            'warnings': []
        }
        
        try:
            if data_source.source_type == DataSourceType.FILE_UPLOAD:
                df = self._load_from_file(data_source.file_path)
                metadata['file_path'] = data_source.file_path
                
            elif data_source.source_type == DataSourceType.SQL_QUERY:
                df = self._load_from_sql(data_source)
                metadata['sql_query'] = data_source.sql_query
                metadata['connection'] = data_source.sql_connection
                
            else:
                raise ValueError(f"Unsupported data source type: {data_source.source_type}")
            
            metadata['original_shape'] = df.shape
            
            # Validate and process data
            df, processing_info = self._validate_and_process(df)
            metadata.update(processing_info)
            metadata['processed_shape'] = df.shape
            
            return df, metadata
            
        except Exception as e:
            logger.error(f"Error loading data: {str(e)}")
            raise
    
    def _load_from_file(self, file_path: str) -> pd.DataFrame:
        """Load data from uploaded file"""
        if not file_path or not os.path.exists(file_path):
            raise FileNotFoundError(f"File not found: {file_path}")
        
        file_ext = Path(file_path).suffix.lower()
        
        try:
            if file_ext == '.csv':
                # First, try to detect the separator by reading a sample
                best_df = None
                best_shape = (0, 0)
                
                for encoding in ['utf-8', 'latin-1', 'cp1252']:
                    for sep in [',', ';', '\t', '|']:
                        try:
                            # Read just the first few rows to detect structure
                            sample_df = pd.read_csv(file_path, sep=sep, encoding=encoding, nrows=5)
                            
                            # Prefer the combination that gives us the most columns and sensible data
                            if sample_df.shape[1] > best_shape[1]:
                                # Read the full file with the detected separator and encoding
                                df = pd.read_csv(file_path, sep=sep, encoding=encoding)
                                best_df = df
                                best_shape = df.shape
                                
                        except (UnicodeDecodeError, pd.errors.EmptyDataError, pd.errors.ParserError):
                            continue
                
                if best_df is None:
                    raise ValueError("Unable to parse CSV file with any standard separator or encoding")
                
                df = best_df
                        
            elif file_ext in ['.xlsx', '.xls']:
                df = pd.read_excel(file_path, engine='openpyxl' if file_ext == '.xlsx' else 'xlrd')
                
            elif file_ext == '.tsv':
                df = pd.read_csv(file_path, sep='\t')
                
            elif file_ext == '.txt':
                # Use the same robust separator detection as CSV
                best_df = None
                best_shape = (0, 0)
                
                for encoding in ['utf-8', 'latin-1', 'cp1252']:
                    for sep in ['\t', ',', ';', '|', ' ']:
                        try:
                            # Read just the first few rows to detect structure
                            sample_df = pd.read_csv(file_path, sep=sep, encoding=encoding, nrows=5)
                            
                            # Prefer the combination that gives us the most columns
                            if sample_df.shape[1] > best_shape[1]:
                                # Read the full file with the detected separator and encoding
                                df = pd.read_csv(file_path, sep=sep, encoding=encoding)
                                best_df = df
                                best_shape = df.shape
                                
                        except (UnicodeDecodeError, pd.errors.EmptyDataError, pd.errors.ParserError):
                            continue
                
                if best_df is None:
                    raise ValueError("Unable to parse TXT file with any standard separator or encoding")
                
                df = best_df
                
            else:
                raise ValueError(f"Unsupported file format: {file_ext}")
            
            logger.info(f"Successfully loaded {df.shape[0]} rows and {df.shape[1]} columns from {file_path}")
            return df
            
        except Exception as e:
            logger.error(f"Error loading file {file_path}: {str(e)}")
            raise
    
    def execute_query(self, query: str, params: List = None) -> Optional[pd.DataFrame]:
        """Execute a SQL query and return results as DataFrame"""
        try:
            from sql_query_generator import get_default_connection_config
            conn_config = get_default_connection_config()
            conn_str = conn_config.to_connection_string()
            
            engine = create_engine(conn_str, connect_args={"timeout": self.config.MSSQL_TIMEOUT})
            
            with engine.connect() as connection:
                if params:
                    df = pd.read_sql(text(query), connection, params=params)
                else:
                    df = pd.read_sql(text(query), connection)
            
            logger.info(f"Query executed successfully: {df.shape[0]} rows, {df.shape[1]} columns")
            return df
            
        except Exception as e:
            logger.error(f"Error executing query: {str(e)}")
            return None
    
    def get_connection_string(self) -> str:
        """Get the database connection string"""
        from sql_query_generator import get_default_connection_config
        conn_config = get_default_connection_config()
        return conn_config.to_connection_string()

    def _load_from_sql(self, data_source: DataSource) -> pd.DataFrame:
        """Load data from SQL Server"""
        try:
            # Parse connection string or build from components
            if data_source.sql_connection:
                # Use provided connection string
                conn_str = data_source.sql_connection
            else:
                # Build connection string from parameters
                params = data_source.parameters or {}
                server = params.get('server', 'localhost')
                database = params.get('database', '')
                username = params.get('username', '')
                password = params.get('password', '')
                trusted_connection = params.get('trusted_connection', True)
                
                if trusted_connection:
                    conn_str = f"mssql+pyodbc://{server}/{database}?driver={self.config.MSSQL_DRIVER}&trusted_connection=yes"
                else:
                    conn_str = f"mssql+pyodbc://{username}:{password}@{server}/{database}?driver={self.config.MSSQL_DRIVER}"
            
            # Create engine and execute query
            engine = create_engine(conn_str, connect_args={"timeout": self.config.MSSQL_TIMEOUT})
            
            with engine.connect() as connection:
                df = pd.read_sql(text(data_source.sql_query), connection)
            
            logger.info(f"Successfully loaded {df.shape[0]} rows and {df.shape[1]} columns from SQL query")
            return df
            
        except Exception as e:
            logger.error(f"Error executing SQL query: {str(e)}")
            raise
    
    def _validate_and_process(self, df: pd.DataFrame) -> Tuple[pd.DataFrame, Dict[str, Any]]:
        """Validate and preprocess the dataframe"""
        processing_info = {
            'warnings': [],
            'sampling_applied': False,
            'columns_renamed': [],
            'missing_data_summary': {},
            'data_types': {}
        }
        
        # Check size limits
        if df.shape[0] > self.max_rows:
            # Sample data if too large
            df = df.sample(n=self.max_rows, random_state=42)
            processing_info['sampling_applied'] = True
            processing_info['warnings'].append(f"Dataset sampled to {self.max_rows} rows due to size limits")
        
        if df.shape[1] > self.max_columns:
            processing_info['warnings'].append(f"Dataset has {df.shape[1]} columns, which exceeds recommended limit of {self.max_columns}")
        
        # Clean column names
        original_columns = df.columns.tolist()
        df.columns = df.columns.str.strip().str.replace(r'[^\w\s]', '_', regex=True).str.replace(r'\s+', '_', regex=True)
        renamed_columns = [(orig, new) for orig, new in zip(original_columns, df.columns) if orig != new]
        if renamed_columns:
            processing_info['columns_renamed'] = renamed_columns
        
        # Detect and convert data types
        for col in df.columns:
            # Try to convert to numeric if possible
            if df[col].dtype == 'object':
                # Check if it's numeric (handle European decimal format with commas)
                try:
                    # First try normal numeric conversion
                    pd.to_numeric(df[col], errors='raise')
                    df[col] = pd.to_numeric(df[col])
                except:
                    # Try converting European decimal format (comma as decimal separator)
                    try:
                        # Check if column contains comma-decimal numbers
                        sample_vals = df[col].dropna().astype(str).head(10)
                        if any(',' in val for val in sample_vals):
                            # Convert commas to periods and try numeric conversion
                            df_converted = df[col].astype(str).str.replace(',', '.', regex=False)
                            pd.to_numeric(df_converted, errors='raise')
                            df[col] = pd.to_numeric(df_converted)
                        else:
                            raise ValueError("Not numeric")
                    except:
                        # Check if it's datetime - only if it looks like date strings
                        try:
                            # Only try datetime conversion if values look like dates
                            sample_vals = df[col].dropna().astype(str).head(5)
                            if any(any(sep in val for sep in ['/', '-', '.']) and 
                                  any(val.count(sep) >= 2 for sep in ['/', '-', '.'])
                                  for val in sample_vals):
                                # Try different date formats commonly used in SQL Server and CSV files
                                date_formats = [
                                    '%Y-%m-%d %H:%M:%S.%f',  # SQL Server datetime format
                                    '%Y-%m-%d %H:%M:%S',     # SQL Server datetime without microseconds
                                    '%Y-%m-%d',              # ISO date format
                                    '%d/%m/%Y %H:%M:%S',     # European datetime
                                    '%d/%m/%Y',              # European date
                                    '%m/%d/%Y %H:%M:%S',     # US datetime
                                    '%m/%d/%Y',              # US date
                                    '%Y/%m/%d',              # Alternative date format
                                ]
                                for date_format in date_formats:
                                    try:
                                        df[col] = pd.to_datetime(df[col], format=date_format, errors='raise')
                                        break
                                    except (ValueError, TypeError):
                                        continue
                                else:
                                    # If no explicit format worked, try with dayfirst=True for European dates
                                    try:
                                        # Suppress warning by specifying format=None for mixed formats
                                        import warnings
                                        with warnings.catch_warnings():
                                            warnings.simplefilter("ignore", UserWarning)
                                            df[col] = pd.to_datetime(df[col], dayfirst=True, errors='raise')
                                    except (ValueError, TypeError):
                                        # Try general inference as fallback
                                        try:
                                            with warnings.catch_warnings():
                                                warnings.simplefilter("ignore", UserWarning)
                                                pd.to_datetime(df[col], errors='raise')
                                                df[col] = pd.to_datetime(df[col], infer_datetime_format=True)
                                        except:
                                            # Keep as original type if all datetime parsing fails
                                            pass
                            else:
                                raise ValueError("Not datetime")
                        except:
                            # Keep as string/categorical
                            pass
        
        # Record final data types
        processing_info['data_types'] = {col: str(dtype) for col, dtype in df.dtypes.items()}
        
        # Missing data summary
        missing_counts = df.isnull().sum()
        processing_info['missing_data_summary'] = {
            col: {'count': int(count), 'percentage': float(count / len(df) * 100)}
            for col, count in missing_counts.items() if count > 0
        }
        
        # Basic data quality checks
        if df.empty:
            # Don't raise error immediately - provide diagnostic information
            processing_info['warnings'].append("Query returned no results. Check diagnostic information for details.")
            # Still validate that we have the expected column structure
            if df.shape[1] == 0:
                raise ValueError("Dataset has no columns - query may have syntax errors")
        
        if df.shape[1] == 0:
            raise ValueError("Dataset has no columns")
        
        # Check for completely missing columns
        completely_missing = [col for col in df.columns if df[col].isnull().all()]
        if completely_missing:
            processing_info['warnings'].append(f"Columns with all missing values: {completely_missing}")
        
        # Check for duplicate rows
        duplicates = df.duplicated().sum()
        if duplicates > 0:
            processing_info['warnings'].append(f"Found {duplicates} duplicate rows")
        
        return df, processing_info
    
    def get_data_summary(self, df: pd.DataFrame) -> Dict[str, Any]:
        """Generate comprehensive data summary"""
        summary = {
            'shape': df.shape,
            'column_info': {},
            'numeric_summary': {},
            'categorical_summary': {},
            'missing_data': {},
            'data_quality': {}
        }
        
        def clean_nan_values(obj):
            """Recursively clean NaN values from nested dictionaries and lists"""
            if isinstance(obj, dict):
                return {k: clean_nan_values(v) for k, v in obj.items()}
            elif isinstance(obj, list):
                return [clean_nan_values(item) for item in obj]
            elif pd.isna(obj):
                return None
            elif isinstance(obj, (np.floating, np.integer)):
                return float(obj) if not pd.isna(obj) else None
            else:
                return obj
        
        # Column information
        for col in df.columns:
            col_info = {
                'dtype': str(df[col].dtype),
                'non_null_count': int(df[col].count()),
                'null_count': int(df[col].isnull().sum()),
                'null_percentage': float(df[col].isnull().sum() / len(df) * 100)
            }
            
            if pd.api.types.is_numeric_dtype(df[col]):
                mean_val = df[col].mean()
                std_val = df[col].std()
                min_val = df[col].min()
                max_val = df[col].max()
                
                col_info.update({
                    'mean': float(mean_val) if not pd.isna(mean_val) else None,
                    'std': float(std_val) if not pd.isna(std_val) else None,
                    'min': float(min_val) if not pd.isna(min_val) else None,
                    'max': float(max_val) if not pd.isna(max_val) else None,
                    'unique_count': int(df[col].nunique())
                })
            else:
                col_info.update({
                    'unique_count': int(df[col].nunique()),
                    'most_frequent': str(df[col].mode().iloc[0]) if not df[col].mode().empty else None
                })
            
            summary['column_info'][col] = col_info
        
        # Numeric columns summary
        numeric_cols = df.select_dtypes(include=[np.number]).columns
        if len(numeric_cols) > 0:
            summary['numeric_summary'] = clean_nan_values(df[numeric_cols].describe().to_dict())
        
        # Categorical columns summary
        categorical_cols = df.select_dtypes(include=['object', 'category']).columns
        if len(categorical_cols) > 0:
            cat_summary = {}
            for col in categorical_cols:
                value_counts = df[col].value_counts().head(10)
                cat_summary[col] = {
                    'unique_count': int(df[col].nunique()),
                    'top_values': value_counts.to_dict()
                }
            summary['categorical_summary'] = cat_summary
        
        # Missing data overview
        missing_counts = df.isnull().sum()
        summary['missing_data'] = {
            'total_missing': int(missing_counts.sum()),
            'columns_with_missing': len(missing_counts[missing_counts > 0]),
            'missing_by_column': missing_counts[missing_counts > 0].to_dict()
        }
        
        # Data quality metrics
        summary['data_quality'] = {
            'duplicate_rows': int(df.duplicated().sum()),
            'completely_missing_columns': [col for col in df.columns if df[col].isnull().all()],
            'constant_columns': [col for col in df.columns if df[col].nunique() <= 1],
            'high_cardinality_columns': [col for col in categorical_cols if df[col].nunique() > len(df) * 0.9]
        }
        
        # Clean any remaining NaN values before returning
        return clean_nan_values(summary)
    
    def validate_columns_for_analysis(self, df: pd.DataFrame, 
                                    target_vars: List[str] = None,
                                    grouping_vars: List[str] = None) -> Dict[str, Any]:
        """Validate that specified columns exist and are appropriate for analysis"""
        validation = {
            'valid': True,
            'errors': [],
            'warnings': [],
            'column_types': {}
        }
        
        all_columns = set(df.columns)
        
        # Check target variables
        if target_vars:
            for var in target_vars:
                if var not in all_columns:
                    validation['errors'].append(f"Target variable '{var}' not found in dataset")
                    validation['valid'] = False
                else:
                    validation['column_types'][var] = {
                        'dtype': str(df[var].dtype),
                        'is_numeric': pd.api.types.is_numeric_dtype(df[var]),
                        'unique_count': int(df[var].nunique()),
                        'missing_count': int(df[var].isnull().sum())
                    }
        
        # Check grouping variables
        if grouping_vars:
            for var in grouping_vars:
                if var not in all_columns:
                    validation['errors'].append(f"Grouping variable '{var}' not found in dataset")
                    validation['valid'] = False
                else:
                    unique_count = df[var].nunique()
                    if unique_count > 20:
                        validation['warnings'].append(f"Grouping variable '{var}' has {unique_count} unique values, which may be too many for effective grouping")
                    
                    validation['column_types'][var] = {
                        'dtype': str(df[var].dtype),
                        'is_numeric': pd.api.types.is_numeric_dtype(df[var]),
                        'unique_count': unique_count,
                        'missing_count': int(df[var].isnull().sum())
                    }
        
        return validation
    
    def suggest_analysis_variables(self, df: pd.DataFrame) -> Dict[str, List[str]]:
        """Suggest appropriate variables for different types of analysis"""
        suggestions = {
            'numeric_continuous': [],
            'numeric_discrete': [],
            'categorical': [],
            'binary': [],
            'datetime': [],
            'id_columns': []
        }
        
        for col in df.columns:
            unique_count = df[col].nunique()
            total_count = len(df)
            
            if pd.api.types.is_datetime64_any_dtype(df[col]):
                suggestions['datetime'].append(col)
            elif pd.api.types.is_numeric_dtype(df[col]):
                # Check if it's likely an ID column
                if unique_count == total_count or col.lower() in ['id', 'index', 'key']:
                    suggestions['id_columns'].append(col)
                elif unique_count == 2:
                    suggestions['binary'].append(col)
                elif unique_count <= 20:
                    suggestions['numeric_discrete'].append(col)
                else:
                    suggestions['numeric_continuous'].append(col)
            else:
                # Check if it's likely an ID column
                if unique_count == total_count or col.lower() in ['id', 'index', 'key']:
                    suggestions['id_columns'].append(col)
                elif unique_count == 2:
                    suggestions['binary'].append(col)
                else:
                    suggestions['categorical'].append(col)
        
        return suggestions

Parameters

Name Type Default Kind
bases - -

Parameter Details

bases: Parameter of type

Return Value

Returns unspecified type

Class Interface

Methods

__init__(self, config)

Purpose: Internal method: init

Parameters:

  • config: Type: Config

Returns: None

load_data_from_sql_workflow(self, user_query, schema_file, connection_config, statistical_agent) -> Tuple[pd.DataFrame, Dict[str, Any]]

Purpose: New SQL workflow: Generate SQL query based on user request and database schema, then execute query to get dataset

Parameters:

  • user_query: Type: str
  • schema_file: Type: str
  • connection_config: Type: str
  • statistical_agent: Parameter

Returns: Returns Tuple[pd.DataFrame, Dict[str, Any]]

load_data(self, data_source) -> Tuple[pd.DataFrame, Dict[str, Any]]

Purpose: Load data from various sources Returns: (dataframe, metadata)

Parameters:

  • data_source: Type: DataSource

Returns: Returns Tuple[pd.DataFrame, Dict[str, Any]]

_load_from_file(self, file_path) -> pd.DataFrame

Purpose: Load data from uploaded file

Parameters:

  • file_path: Type: str

Returns: Returns pd.DataFrame

execute_query(self, query, params) -> Optional[pd.DataFrame]

Purpose: Execute a SQL query and return results as DataFrame

Parameters:

  • query: Type: str
  • params: Type: List

Returns: Returns Optional[pd.DataFrame]

get_connection_string(self) -> str

Purpose: Get the database connection string

Returns: Returns str

_load_from_sql(self, data_source) -> pd.DataFrame

Purpose: Load data from SQL Server

Parameters:

  • data_source: Type: DataSource

Returns: Returns pd.DataFrame

_validate_and_process(self, df) -> Tuple[pd.DataFrame, Dict[str, Any]]

Purpose: Validate and preprocess the dataframe

Parameters:

  • df: Type: pd.DataFrame

Returns: Returns Tuple[pd.DataFrame, Dict[str, Any]]

get_data_summary(self, df) -> Dict[str, Any]

Purpose: Generate comprehensive data summary

Parameters:

  • df: Type: pd.DataFrame

Returns: Returns Dict[str, Any]

validate_columns_for_analysis(self, df, target_vars, grouping_vars) -> Dict[str, Any]

Purpose: Validate that specified columns exist and are appropriate for analysis

Parameters:

  • df: Type: pd.DataFrame
  • target_vars: Type: List[str]
  • grouping_vars: Type: List[str]

Returns: Returns Dict[str, Any]

suggest_analysis_variables(self, df) -> Dict[str, List[str]]

Purpose: Suggest appropriate variables for different types of analysis

Parameters:

  • df: Type: pd.DataFrame

Returns: Returns Dict[str, List[str]]

Required Imports

import pandas as pd
import numpy as np
import pyodbc
import sqlalchemy
from sqlalchemy import create_engine

Usage Example

# Example usage:
# result = DataProcessor(bases)

Similar Components

AI-powered semantic similarity - components with related functionality:

  • class DataProcessor_v1 97.2% similar

    Handles data loading, validation, and preprocessing

    From: /tf/active/vicechatdev/smartstat/data_processor.py
  • class DocumentProcessor 57.1% similar

    A comprehensive document processing class that converts documents to PDF, adds audit trails, applies security features (watermarks, signatures, hashing), and optionally converts to PDF/A format with document protection.

    From: /tf/active/vicechatdev/document_auditor/src/document_processor.py
  • class DocumentProcessor_v7 57.0% similar

    Process different document types for indexing

    From: /tf/active/vicechatdev/docchat/document_indexer.py
  • class DocumentProcessor_v6 56.7% similar

    Lightweight document processor for chat upload functionality

    From: /tf/active/vicechatdev/vice_ai/document_processor.py
  • class SimpleDataHandle 54.7% similar

    A data handler class that manages multiple data sources with different types (dataframes, vector stores, databases) and their associated processing configurations.

    From: /tf/active/vicechatdev/OneCo_hybrid_RAG copy.py
← Back to Browse