🔍 Code Extractor

class DatabaseManager

Maturity: 48

SQLite database manager for SmartStat application that handles persistence of statistical analysis sessions, steps, and results.

File:
/tf/active/vicechatdev/smartstat/models.py
Lines:
198 - 576
Complexity:
moderate

Purpose

DatabaseManager provides a complete data persistence layer for the SmartStat statistical analysis application. It manages three main entities: sessions (analysis workflows), analysis_steps (individual computation steps), and analysis_results (outputs from steps). The class handles database initialization, CRUD operations for all entities, and maintains referential integrity through foreign key relationships. It follows the vice_ai patterns for database management and includes comprehensive error handling and logging.

Source Code

class DatabaseManager:
    """Database manager for SmartStat following vice_ai patterns"""
    
    def __init__(self, db_url: str = "smartstat.db"):
        # Parse database URL to get actual path
        if db_url.startswith('sqlite:///'):
            self.db_path = db_url[10:]  # Remove 'sqlite:///' prefix
        elif db_url.startswith('sqlite://'):
            self.db_path = db_url[9:]   # Remove 'sqlite://' prefix
        else:
            self.db_path = db_url
        
        # Ensure parent directory exists
        from pathlib import Path
        Path(self.db_path).parent.mkdir(parents=True, exist_ok=True)
        
        self.init_database()
    
    def init_database(self):
        """Initialize database with required tables"""
        with sqlite3.connect(self.db_path) as conn:
            conn.execute('''
                CREATE TABLE IF NOT EXISTS sessions (
                    session_id TEXT PRIMARY KEY,
                    user_id TEXT NOT NULL,
                    created_at TIMESTAMP,
                    updated_at TIMESTAMP,
                    title TEXT,
                    description TEXT,
                    data_source_json TEXT,
                    analysis_config_json TEXT,
                    status TEXT
                )
            ''')
            
            conn.execute('''
                CREATE TABLE IF NOT EXISTS analysis_steps (
                    step_id TEXT PRIMARY KEY,
                    session_id TEXT,
                    step_number INTEGER,
                    step_type TEXT,
                    input_data_json TEXT,
                    generated_script TEXT,
                    execution_output TEXT,
                    execution_error TEXT,
                    execution_success BOOLEAN,
                    created_at TIMESTAMP,
                    FOREIGN KEY (session_id) REFERENCES sessions (session_id)
                )
            ''')
            
            conn.execute('''
                CREATE TABLE IF NOT EXISTS analysis_results (
                    result_id TEXT PRIMARY KEY,
                    session_id TEXT,
                    step_id TEXT,
                    result_type TEXT,
                    result_data_json TEXT,
                    file_paths_json TEXT,
                    metadata_json TEXT,
                    created_at TIMESTAMP,
                    FOREIGN KEY (session_id) REFERENCES sessions (session_id),
                    FOREIGN KEY (step_id) REFERENCES analysis_steps (step_id)
                )
            ''')
            
            conn.commit()
    
    def create_session(self, session: StatisticalSession) -> str:
        """Create new analysis session"""
        try:
            import logging
            logger = logging.getLogger(__name__)
            logger.info(f"DatabaseManager.create_session called for session: {session.session_id}")
            
            with sqlite3.connect(self.db_path) as conn:
                logger.info(f"Database connection established")
                
                # Prepare data for insertion
                data_source_json = json.dumps(session.data_source.to_dict()) if session.data_source else None
                analysis_config_json = json.dumps(session.analysis_config.to_dict()) if session.analysis_config else None
                status_value = session.status.value
                
                logger.info(f"Prepared data: status={status_value}, data_source={data_source_json is not None}, analysis_config={analysis_config_json is not None}")
                
                try:
                    conn.execute('''
                        INSERT INTO sessions 
                        (session_id, user_id, created_at, updated_at, title, description, 
                         data_source_json, analysis_config_json, status)
                        VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
                    ''', (
                        session.session_id,
                        session.user_id,
                        session.created_at,
                        session.updated_at,
                        session.title,
                        session.description,
                        data_source_json,
                        analysis_config_json,
                        status_value
                    ))
                    conn.commit()
                    logger.info(f"Session {session.session_id} inserted successfully")
                except Exception as insert_error:
                    logger.error(f"Database insert error: {str(insert_error)}")
                    raise insert_error
                
            return session.session_id
            
        except Exception as e:
            import logging
            logger = logging.getLogger(__name__)
            logger.error(f"Error in create_session: {str(e)}")
            logger.error(f"Exception type: {type(e)}")
            import traceback
            logger.error(f"Traceback: {traceback.format_exc()}")
            raise e
    
    def get_session(self, session_id: str) -> Optional[StatisticalSession]:
        """Retrieve session by ID"""
        with sqlite3.connect(self.db_path) as conn:
            conn.row_factory = sqlite3.Row
            cursor = conn.execute('''
                SELECT * FROM sessions WHERE session_id = ?
            ''', (session_id,))
            row = cursor.fetchone()
            
            if row:
                return StatisticalSession(
                    session_id=row['session_id'],
                    user_id=row['user_id'],
                    created_at=datetime.fromisoformat(row['created_at']) if row['created_at'] else None,
                    updated_at=datetime.fromisoformat(row['updated_at']) if row['updated_at'] else None,
                    title=row['title'] or "",
                    description=row['description'] or "",
                    data_source=DataSource.from_dict(json.loads(row['data_source_json'])) if row['data_source_json'] and row['data_source_json'] != '{}' else None,
                    analysis_config=AnalysisConfiguration.from_dict(json.loads(row['analysis_config_json'])) if row['analysis_config_json'] and row['analysis_config_json'] != '{}' else None,
                    status=AnalysisStatus(row['status'])
                )
        return None
    
    def update_session_status(self, session_id: str, status: AnalysisStatus):
        """Update session status"""
        with sqlite3.connect(self.db_path) as conn:
            conn.execute('''
                UPDATE sessions SET status = ?, updated_at = ? WHERE session_id = ?
            ''', (status.value, datetime.now(), session_id))
            conn.commit()
    
    def update_session(self, session: StatisticalSession):
        """Update session data"""
        with sqlite3.connect(self.db_path) as conn:
            conn.execute('''
                UPDATE sessions SET 
                    updated_at = ?, title = ?, description = ?, 
                    data_source_json = ?, analysis_config_json = ?, status = ?
                WHERE session_id = ?
            ''', (
                session.updated_at,
                session.title,
                session.description,
                json.dumps(session.data_source.to_dict()) if session.data_source else "{}",
                json.dumps(session.analysis_config.to_dict()) if session.analysis_config else "{}",
                session.status.value,
                session.session_id
            ))
            conn.commit()
    
    def add_analysis_step(self, step: AnalysisStep) -> str:
        """Add analysis step"""
        with sqlite3.connect(self.db_path) as conn:
            conn.execute('''
                INSERT INTO analysis_steps 
                (step_id, session_id, step_number, step_type, input_data_json,
                 generated_script, execution_output, execution_error, execution_success, created_at)
                VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
            ''', (
                step.step_id,
                step.session_id,
                step.step_number,
                step.step_type,
                json.dumps(step.input_data),
                step.generated_script,
                step.execution_output,
                step.execution_error,
                step.execution_success,
                step.created_at
            ))
            conn.commit()
        return step.step_id
    
    def get_session_steps(self, session_id: str) -> List[AnalysisStep]:
        """Get all steps for a session"""
        with sqlite3.connect(self.db_path) as conn:
            conn.row_factory = sqlite3.Row
            cursor = conn.execute('''
                SELECT * FROM analysis_steps WHERE session_id = ? ORDER BY step_number
            ''', (session_id,))
            
            steps = []
            for row in cursor.fetchall():
                steps.append(AnalysisStep(
                    step_id=row['step_id'],
                    session_id=row['session_id'],
                    step_number=row['step_number'],
                    step_type=row['step_type'],
                    input_data=json.loads(row['input_data_json']) if row['input_data_json'] else {},
                    generated_script=row['generated_script'] or "",
                    execution_output=row['execution_output'] or "",
                    execution_error=row['execution_error'] or "",
                    execution_success=bool(row['execution_success']),
                    created_at=datetime.fromisoformat(row['created_at']) if row['created_at'] else None
                ))
            return steps
    
    def add_analysis_result(self, result: AnalysisResult) -> str:
        """Add analysis result"""
        with sqlite3.connect(self.db_path) as conn:
            conn.execute('''
                INSERT INTO analysis_results 
                (result_id, session_id, step_id, result_type, result_data_json,
                 file_paths_json, metadata_json, created_at)
                VALUES (?, ?, ?, ?, ?, ?, ?, ?)
            ''', (
                result.result_id,
                result.session_id,
                result.step_id,
                result.result_type,
                json.dumps(result.result_data),
                json.dumps(result.file_paths),
                json.dumps(result.metadata),
                result.created_at
            ))
            conn.commit()
        return result.result_id
    
    def get_session_results(self, session_id: str) -> List[AnalysisResult]:
        """Get all results for a session"""
        with sqlite3.connect(self.db_path) as conn:
            conn.row_factory = sqlite3.Row
            cursor = conn.execute('''
                SELECT * FROM analysis_results WHERE session_id = ? ORDER BY created_at
            ''', (session_id,))
            
            results = []
            for row in cursor.fetchall():
                results.append(AnalysisResult(
                    result_id=row['result_id'],
                    session_id=row['session_id'],
                    step_id=row['step_id'],
                    result_type=row['result_type'],
                    result_data=json.loads(row['result_data_json']) if row['result_data_json'] else {},
                    file_paths=json.loads(row['file_paths_json']) if row['file_paths_json'] else [],
                    metadata=json.loads(row['metadata_json']) if row['metadata_json'] else {},
                    created_at=datetime.fromisoformat(row['created_at']) if row['created_at'] else None
                ))
            return results
    
    def get_recent_sessions(self, user_id: str = "default", limit: int = 10) -> List[StatisticalSession]:
        """Get recent sessions for a user"""
        with sqlite3.connect(self.db_path) as conn:
            conn.row_factory = sqlite3.Row
            cursor = conn.execute('''
                SELECT * FROM sessions WHERE user_id = ? 
                ORDER BY updated_at DESC LIMIT ?
            ''', (user_id, limit))
            
            sessions = []
            for row in cursor.fetchall():
                # Handle analysis_config loading with proper error handling
                analysis_config = None
                if row['analysis_config_json']:
                    try:
                        config_data = json.loads(row['analysis_config_json'])
                        # Ensure analysis_type is present and valid
                        if 'analysis_type' in config_data and config_data['analysis_type']:
                            analysis_config = AnalysisConfiguration.from_dict(config_data)
                    except Exception as e:
                        logger.warning(f"Could not load analysis config for session {row['session_id']}: {e}")
                        
                session = StatisticalSession(
                    session_id=row['session_id'],
                    title=row['title'],
                    description=row['description'],
                    user_id=row['user_id'],
                    status=AnalysisStatus(row['status']) if row['status'] else AnalysisStatus.CREATED,
                    analysis_config=analysis_config,
                    created_at=datetime.fromisoformat(row['created_at']) if row['created_at'] else datetime.now(),
                    updated_at=datetime.fromisoformat(row['updated_at']) if row['updated_at'] else datetime.now()
                )
                sessions.append(session)
                
            return sessions
    
    def get_all_sessions(self, user_id: str = "default") -> List[StatisticalSession]:
        """Get all sessions for a user"""
        with sqlite3.connect(self.db_path) as conn:
            conn.row_factory = sqlite3.Row
            cursor = conn.execute('''
                SELECT * FROM sessions WHERE user_id = ? 
                ORDER BY created_at DESC
            ''', (user_id,))
            
            sessions = []
            for row in cursor.fetchall():
                # Handle analysis_config loading with proper error handling
                analysis_config = None
                if row['analysis_config_json']:
                    try:
                        config_data = json.loads(row['analysis_config_json'])
                        # Ensure analysis_type is present and valid
                        if 'analysis_type' in config_data and config_data['analysis_type']:
                            analysis_config = AnalysisConfiguration.from_dict(config_data)
                    except Exception as e:
                        logger.warning(f"Could not load analysis config for session {row['session_id']}: {e}")
                
                sessions.append(StatisticalSession(
                    session_id=row['session_id'],
                    user_id=row['user_id'],
                    created_at=datetime.fromisoformat(row['created_at']) if row['created_at'] else None,
                    updated_at=datetime.fromisoformat(row['updated_at']) if row['updated_at'] else None,
                    title=row['title'] or "",
                    description=row['description'] or "",
                    data_source=DataSource(**json.loads(row['data_source_json'])) if row['data_source_json'] else None,
                    analysis_config=analysis_config,
                    status=AnalysisStatus(row['status'])
                ))
            return sessions

    def delete_session(self, session_id: str) -> bool:
        """Delete analysis session"""
        try:
            with sqlite3.connect(self.db_path) as conn:
                cursor = conn.execute('DELETE FROM sessions WHERE session_id = ?', (session_id,))
                return cursor.rowcount > 0
        except Exception as e:
            logger.error(f"Error deleting session {session_id}: {str(e)}")
            return False

    def delete_session_steps(self, session_id: str) -> bool:
        """Delete all analysis steps for a session"""
        try:
            with sqlite3.connect(self.db_path) as conn:
                cursor = conn.execute('DELETE FROM analysis_steps WHERE session_id = ?', (session_id,))
                return True
        except Exception as e:
            logger.error(f"Error deleting session steps {session_id}: {str(e)}")
            return False

    def delete_session_results(self, session_id: str) -> bool:
        """Delete all analysis results for a session"""
        try:
            with sqlite3.connect(self.db_path) as conn:
                cursor = conn.execute('DELETE FROM analysis_results WHERE session_id = ?', (session_id,))
                return True
        except Exception as e:
            logger.error(f"Error deleting session results {session_id}: {str(e)}")
            return False

    def delete_step(self, step_id: str) -> bool:
        """Delete a specific analysis step"""
        try:
            with sqlite3.connect(self.db_path) as conn:
                cursor = conn.execute('DELETE FROM analysis_steps WHERE step_id = ?', (step_id,))
                return cursor.rowcount > 0
        except Exception as e:
            logger.error(f"Error deleting step {step_id}: {str(e)}")
            return False

    def delete_result(self, result_id: str) -> bool:
        """Delete a specific analysis result"""
        try:
            with sqlite3.connect(self.db_path) as conn:
                cursor = conn.execute('DELETE FROM analysis_results WHERE result_id = ?', (result_id,))
                return cursor.rowcount > 0
        except Exception as e:
            logger.error(f"Error deleting result {result_id}: {str(e)}")
            return False

Parameters

Name Type Default Kind
bases - -

Parameter Details

db_url: Database connection string or file path. Accepts 'sqlite:///' or 'sqlite://' prefixed URLs or plain file paths. Defaults to 'smartstat.db' in the current directory. The parent directory is automatically created if it doesn't exist.

Return Value

Constructor returns a DatabaseManager instance with initialized database connection. Methods return: create_session/add_analysis_step/add_analysis_result return string IDs; get_session returns Optional[StatisticalSession]; get_session_steps/get_session_results/get_recent_sessions/get_all_sessions return Lists; update methods return None; delete methods return bool indicating success.

Class Interface

Methods

__init__(self, db_url: str = 'smartstat.db')

Purpose: Initialize database manager, parse URL, create parent directories, and initialize database schema

Parameters:

  • db_url: Database connection string or file path, defaults to 'smartstat.db'

Returns: None (constructor)

init_database(self)

Purpose: Create database tables (sessions, analysis_steps, analysis_results) if they don't exist

Returns: None

create_session(self, session: StatisticalSession) -> str

Purpose: Insert a new statistical analysis session into the database with comprehensive error logging

Parameters:

  • session: StatisticalSession object containing all session data including user_id, title, description, data_source, analysis_config, and status

Returns: String session_id of the created session

get_session(self, session_id: str) -> Optional[StatisticalSession]

Purpose: Retrieve a single session by its ID, reconstructing all nested objects from JSON

Parameters:

  • session_id: Unique identifier for the session to retrieve

Returns: StatisticalSession object if found, None otherwise

update_session_status(self, session_id: str, status: AnalysisStatus)

Purpose: Update only the status field of a session and set updated_at timestamp

Parameters:

  • session_id: ID of session to update
  • status: New AnalysisStatus enum value

Returns: None

update_session(self, session: StatisticalSession)

Purpose: Update all fields of an existing session including title, description, data_source, analysis_config, and status

Parameters:

  • session: StatisticalSession object with updated data

Returns: None

add_analysis_step(self, step: AnalysisStep) -> str

Purpose: Insert a new analysis step record including generated script, execution output, and success status

Parameters:

  • step: AnalysisStep object containing step_id, session_id, step_number, step_type, input_data, generated_script, execution details

Returns: String step_id of the created step

get_session_steps(self, session_id: str) -> List[AnalysisStep]

Purpose: Retrieve all analysis steps for a session ordered by step_number

Parameters:

  • session_id: ID of session whose steps to retrieve

Returns: List of AnalysisStep objects ordered by step_number

add_analysis_result(self, result: AnalysisResult) -> str

Purpose: Insert a new analysis result record with result data, file paths, and metadata

Parameters:

  • result: AnalysisResult object containing result_id, session_id, step_id, result_type, result_data, file_paths, metadata

Returns: String result_id of the created result

get_session_results(self, session_id: str) -> List[AnalysisResult]

Purpose: Retrieve all analysis results for a session ordered by created_at timestamp

Parameters:

  • session_id: ID of session whose results to retrieve

Returns: List of AnalysisResult objects ordered by creation time

get_recent_sessions(self, user_id: str = 'default', limit: int = 10) -> List[StatisticalSession]

Purpose: Retrieve most recent sessions for a user with pagination support, includes error handling for malformed analysis_config

Parameters:

  • user_id: User identifier to filter sessions, defaults to 'default'
  • limit: Maximum number of sessions to return, defaults to 10

Returns: List of StatisticalSession objects ordered by updated_at descending

get_all_sessions(self, user_id: str = 'default') -> List[StatisticalSession]

Purpose: Retrieve all sessions for a user ordered by creation date, includes error handling for malformed analysis_config

Parameters:

  • user_id: User identifier to filter sessions, defaults to 'default'

Returns: List of all StatisticalSession objects for the user ordered by created_at descending

delete_session(self, session_id: str) -> bool

Purpose: Delete a session record from the database

Parameters:

  • session_id: ID of session to delete

Returns: True if session was deleted (rowcount > 0), False otherwise or on error

delete_session_steps(self, session_id: str) -> bool

Purpose: Delete all analysis steps associated with a session

Parameters:

  • session_id: ID of session whose steps to delete

Returns: True if operation succeeded, False on error

delete_session_results(self, session_id: str) -> bool

Purpose: Delete all analysis results associated with a session

Parameters:

  • session_id: ID of session whose results to delete

Returns: True if operation succeeded, False on error

delete_step(self, step_id: str) -> bool

Purpose: Delete a specific analysis step by its ID

Parameters:

  • step_id: ID of step to delete

Returns: True if step was deleted (rowcount > 0), False otherwise or on error

delete_result(self, result_id: str) -> bool

Purpose: Delete a specific analysis result by its ID

Parameters:

  • result_id: ID of result to delete

Returns: True if result was deleted (rowcount > 0), False otherwise or on error

Attributes

Name Type Description Scope
db_path str Parsed file system path to the SQLite database file, extracted from db_url parameter instance

Dependencies

  • sqlite3
  • json
  • logging
  • datetime
  • typing
  • pathlib
  • uuid
  • dataclasses
  • enum
  • traceback

Required Imports

import sqlite3
import json
import logging
from datetime import datetime
from typing import List, Dict, Optional, Any
from pathlib import Path

Conditional/Optional Imports

These imports are only needed under specific conditions:

from pathlib import Path

Condition: used in __init__ to ensure parent directory exists

Required (conditional)
import traceback

Condition: used in create_session error handling for detailed logging

Required (conditional)

Usage Example

# Initialize database manager
db = DatabaseManager('sqlite:///data/smartstat.db')

# Create a new session
session = StatisticalSession(
    session_id='sess_123',
    user_id='user_001',
    title='My Analysis',
    description='Testing analysis',
    status=AnalysisStatus.CREATED
)
session_id = db.create_session(session)

# Retrieve session
retrieved = db.get_session(session_id)

# Update session status
db.update_session_status(session_id, AnalysisStatus.RUNNING)

# Add analysis step
step = AnalysisStep(
    step_id='step_001',
    session_id=session_id,
    step_number=1,
    step_type='data_load',
    input_data={'file': 'data.csv'},
    execution_success=True
)
db.add_analysis_step(step)

# Get all steps for session
steps = db.get_session_steps(session_id)

# Add result
result = AnalysisResult(
    result_id='res_001',
    session_id=session_id,
    step_id='step_001',
    result_type='summary',
    result_data={'mean': 42.5}
)
db.add_analysis_result(result)

# Get recent sessions
recent = db.get_recent_sessions(user_id='user_001', limit=5)

# Delete session and related data
db.delete_session_results(session_id)
db.delete_session_steps(session_id)
db.delete_session(session_id)

Best Practices

  • Always call init_database() during initialization to ensure tables exist (automatically done in __init__)
  • Use context managers (with statements) for all database operations to ensure proper connection handling
  • Handle Optional return types from get_session() - it returns None if session not found
  • Delete related data in correct order: results first, then steps, then session to maintain referential integrity
  • The class automatically creates parent directories for the database file
  • JSON serialization is handled internally for complex objects (DataSource, AnalysisConfiguration)
  • All timestamps are stored as ISO format strings and converted to datetime objects on retrieval
  • Error handling includes detailed logging with traceback information for debugging
  • The database uses foreign key constraints - ensure parent records exist before creating child records
  • Use get_recent_sessions() with limit parameter for pagination instead of loading all sessions
  • The class handles both 'sqlite:///' and 'sqlite://' URL prefixes as well as plain file paths

Similar Components

AI-powered semantic similarity - components with related functionality:

  • class DatabaseManager_v1 79.6% similar

    Database manager for SmartStat following vice_ai patterns

    From: /tf/active/vicechatdev/vice_ai/smartstat_models.py
  • class DatabaseManager_v1 73.9% similar

    SQLite database manager for persistent storage

    From: /tf/active/vicechatdev/vice_ai/models.py
  • class SmartStatSession 63.8% similar

    A session management class that encapsulates a SmartStat statistical analysis session, tracking data, analysis history, plots, and reports for a specific data section.

    From: /tf/active/vicechatdev/vice_ai/smartstat_service.py
  • function inspect_database 55.4% similar

    Inspects a SQLite database at a hardcoded path, examining table structure, row counts, and identifying potentially corrupted chat_session_id values in the text_sections table.

    From: /tf/active/vicechatdev/vice_ai/database_inspector.py
  • function main_v62 55.2% similar

    Demonstrates the SmartStat SQL Workflow by loading a database schema, initializing a SQL query generator, and generating SQL queries from natural language requests for various laboratory data analysis scenarios.

    From: /tf/active/vicechatdev/smartstat/demo_sql_workflow.py
← Back to Browse