🔍 Code Extractor

class DatabaseManager_v1

Maturity: 26

Database manager for SmartStat following vice_ai patterns

File:
/tf/active/vicechatdev/vice_ai/smartstat_models.py
Lines:
209 - 651
Complexity:
moderate

Purpose

Database manager for SmartStat following vice_ai patterns

Source Code

class DatabaseManager:
    """Database manager for SmartStat following vice_ai patterns"""
    
    def __init__(self, db_url: str = "smartstat.db"):
        # Parse database URL to get actual path
        if db_url.startswith('sqlite:///'):
            self.db_path = db_url[10:]  # Remove 'sqlite:///' prefix
        elif db_url.startswith('sqlite://'):
            self.db_path = db_url[9:]   # Remove 'sqlite://' prefix
        else:
            self.db_path = db_url
        
        # Ensure parent directory exists
        from pathlib import Path
        Path(self.db_path).parent.mkdir(parents=True, exist_ok=True)
        
        self.init_database()
    
    def init_database(self):
        """Initialize database with required tables"""
        with sqlite3.connect(self.db_path) as conn:
            conn.execute('''
                CREATE TABLE IF NOT EXISTS sessions (
                    session_id TEXT PRIMARY KEY,
                    user_id TEXT NOT NULL,
                    created_at TIMESTAMP,
                    updated_at TIMESTAMP,
                    title TEXT,
                    description TEXT,
                    data_source_json TEXT,
                    analysis_config_json TEXT,
                    status TEXT,
                    sql_query TEXT
                )
            ''')
            
            conn.execute('''
                CREATE TABLE IF NOT EXISTS analysis_steps (
                    step_id TEXT PRIMARY KEY,
                    session_id TEXT,
                    step_number INTEGER,
                    step_type TEXT,
                    input_data_json TEXT,
                    generated_script TEXT,
                    execution_output TEXT,
                    execution_error TEXT,
                    execution_success BOOLEAN,
                    metadata TEXT,
                    created_at TIMESTAMP,
                    updated_at TIMESTAMP,
                    FOREIGN KEY (session_id) REFERENCES sessions (session_id)
                )
            ''')
            
            conn.execute('''
                CREATE TABLE IF NOT EXISTS analysis_results (
                    result_id TEXT PRIMARY KEY,
                    session_id TEXT,
                    step_id TEXT,
                    result_type TEXT,
                    result_data_json TEXT,
                    file_paths_json TEXT,
                    metadata_json TEXT,
                    created_at TIMESTAMP,
                    FOREIGN KEY (step_id) REFERENCES analysis_steps (step_id)
                )
            ''')
            
            # Migration: Add missing columns to analysis_steps table
            try:
                conn.execute('ALTER TABLE analysis_steps ADD COLUMN metadata TEXT')
            except sqlite3.OperationalError:
                pass  # Column already exists
                
            try:
                conn.execute('ALTER TABLE analysis_steps ADD COLUMN updated_at TIMESTAMP')
            except sqlite3.OperationalError:
                pass  # Column already exists
            
            # Migration: Add sql_query column if it doesn't exist
            try:
                conn.execute('ALTER TABLE sessions ADD COLUMN sql_query TEXT')
            except sqlite3.OperationalError:
                # Column already exists, ignore
                pass
            
            conn.commit()
    
    def create_session(self, session: StatisticalSession) -> str:
        """Create new analysis session"""
        try:
            import logging
            logger = logging.getLogger(__name__)
            logger.info(f"DatabaseManager.create_session called for session: {session.session_id}")
            
            with sqlite3.connect(self.db_path) as conn:
                logger.info(f"Database connection established")
                
                # Prepare data for insertion
                data_source_json = json.dumps(session.data_source.to_dict()) if session.data_source else None
                analysis_config_json = json.dumps(session.analysis_config.to_dict()) if session.analysis_config else None
                status_value = session.status.value
                
                logger.info(f"Prepared data: status={status_value}, data_source={data_source_json is not None}, analysis_config={analysis_config_json is not None}")
                
                try:
                    conn.execute('''
                        INSERT INTO sessions 
                        (session_id, user_id, created_at, updated_at, title, description, 
                         data_source_json, analysis_config_json, status, sql_query)
                        VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
                    ''', (
                        session.session_id,
                        session.user_id,
                        session.created_at,
                        session.updated_at,
                        session.title,
                        session.description,
                        data_source_json,
                        analysis_config_json,
                        status_value,
                        session.sql_query
                    ))
                    conn.commit()
                    logger.info(f"Session {session.session_id} inserted successfully")
                except Exception as insert_error:
                    logger.error(f"Database insert error: {str(insert_error)}")
                    raise insert_error
                
            return session.session_id
            
        except Exception as e:
            import logging
            logger = logging.getLogger(__name__)
            logger.error(f"Error in create_session: {str(e)}")
            logger.error(f"Exception type: {type(e)}")
            import traceback
            logger.error(f"Traceback: {traceback.format_exc()}")
            raise e
    
    def get_session(self, session_id: str) -> Optional[StatisticalSession]:
        """Retrieve session by ID"""
        with sqlite3.connect(self.db_path) as conn:
            conn.row_factory = sqlite3.Row
            cursor = conn.execute('''
                SELECT * FROM sessions WHERE session_id = ?
            ''', (session_id,))
            row = cursor.fetchone()
            
            if row:
                # Safe access to sql_query column (may not exist in older databases)
                sql_query = ""
                try:
                    sql_query = row['sql_query'] or ""
                except (KeyError, IndexError):
                    sql_query = ""
                    
                return StatisticalSession(
                    session_id=row['session_id'],
                    user_id=row['user_id'],
                    created_at=datetime.fromisoformat(row['created_at']) if row['created_at'] else None,
                    updated_at=datetime.fromisoformat(row['updated_at']) if row['updated_at'] else None,
                    title=row['title'] or "",
                    description=row['description'] or "",
                    data_source=DataSource.from_dict(json.loads(row['data_source_json'])) if row['data_source_json'] and row['data_source_json'] != '{}' else None,
                    analysis_config=AnalysisConfiguration.from_dict(json.loads(row['analysis_config_json'])) if row['analysis_config_json'] and row['analysis_config_json'] != '{}' else None,
                    status=AnalysisStatus(row['status']),
                    sql_query=sql_query
                )
        return None
    
    def update_session_status(self, session_id: str, status: AnalysisStatus):
        """Update session status"""
        with sqlite3.connect(self.db_path) as conn:
            conn.execute('''
                UPDATE sessions SET status = ?, updated_at = ? WHERE session_id = ?
            ''', (status.value, datetime.now(), session_id))
            conn.commit()
    
    def update_session(self, session: StatisticalSession):
        """Update session data"""
        with sqlite3.connect(self.db_path) as conn:
            conn.execute('''
                UPDATE sessions SET 
                    updated_at = ?, title = ?, description = ?, 
                    data_source_json = ?, analysis_config_json = ?, status = ?, sql_query = ?
                WHERE session_id = ?
            ''', (
                session.updated_at,
                session.title,
                session.description,
                json.dumps(session.data_source.to_dict()) if session.data_source else "{}",
                json.dumps(session.analysis_config.to_dict()) if session.analysis_config else "{}",
                session.status.value,
                session.sql_query,
                session.session_id
            ))
            conn.commit()
    
    def add_analysis_step(self, step: AnalysisStep) -> str:
        """Add analysis step"""
        with sqlite3.connect(self.db_path) as conn:
            conn.execute('''
                INSERT INTO analysis_steps 
                (step_id, session_id, step_number, step_type, input_data_json,
                 generated_script, execution_output, execution_error, execution_success, created_at)
                VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
            ''', (
                step.step_id,
                step.session_id,
                step.step_number,
                step.step_type,
                json.dumps(step.input_data),
                step.generated_script,
                step.execution_output,
                step.execution_error,
                step.execution_success,
                step.created_at
            ))
            conn.commit()
        return step.step_id
    
    def update_step(self, step: AnalysisStep):
        """Update analysis step with execution results and metadata"""
        with sqlite3.connect(self.db_path) as conn:
            conn.execute('''
                UPDATE analysis_steps SET 
                    execution_output = ?, execution_error = ?, execution_success = ?, 
                    metadata = ?, updated_at = ?
                WHERE step_id = ?
            ''', (
                step.execution_output,
                step.execution_error,
                step.execution_success,
                json.dumps(step.metadata) if step.metadata else None,
                datetime.now(),
                step.step_id
            ))
            conn.commit()
    
    def get_session_steps(self, session_id: str) -> List[AnalysisStep]:
        """Get all steps for a session"""
        with sqlite3.connect(self.db_path) as conn:
            conn.row_factory = sqlite3.Row
            cursor = conn.execute('''
                SELECT * FROM analysis_steps WHERE session_id = ? ORDER BY step_number
            ''', (session_id,))
            
            steps = []
            for row in cursor.fetchall():
                steps.append(AnalysisStep(
                    step_id=row['step_id'],
                    session_id=row['session_id'],
                    step_number=row['step_number'],
                    step_type=row['step_type'],
                    input_data=json.loads(row['input_data_json']) if row['input_data_json'] else {},
                    generated_script=row['generated_script'] or "",
                    execution_output=row['execution_output'] or "",
                    execution_error=row['execution_error'] or "",
                    execution_success=bool(row['execution_success']),
                    created_at=datetime.fromisoformat(row['created_at']) if row['created_at'] else None
                ))
            return steps
    
    def add_analysis_result(self, result: AnalysisResult) -> str:
        """Add analysis result"""
        with sqlite3.connect(self.db_path) as conn:
            conn.execute('''
                INSERT INTO analysis_results 
                (result_id, session_id, step_id, result_type, result_data_json,
                 file_paths_json, metadata_json, created_at)
                VALUES (?, ?, ?, ?, ?, ?, ?, ?)
            ''', (
                result.result_id,
                result.session_id,
                result.step_id,
                result.result_type,
                json.dumps(result.result_data),
                json.dumps(result.file_paths),
                json.dumps(result.metadata),
                result.created_at
            ))
            conn.commit()
        return result.result_id
    
    def get_session_results(self, session_id: str) -> List[AnalysisResult]:
        """Get all results for a session"""
        with sqlite3.connect(self.db_path) as conn:
            conn.row_factory = sqlite3.Row
            cursor = conn.execute('''
                SELECT * FROM analysis_results WHERE session_id = ? ORDER BY created_at
            ''', (session_id,))
            
            results = []
            for row in cursor.fetchall():
                results.append(AnalysisResult(
                    result_id=row['result_id'],
                    session_id=row['session_id'],
                    step_id=row['step_id'],
                    result_type=row['result_type'],
                    result_data=json.loads(row['result_data_json']) if row['result_data_json'] else {},
                    file_paths=json.loads(row['file_paths_json']) if row['file_paths_json'] else [],
                    metadata=json.loads(row['metadata_json']) if row['metadata_json'] else {},
                    created_at=datetime.fromisoformat(row['created_at']) if row['created_at'] else None
                ))
            return results
    
    def get_recent_sessions(self, user_id: str = "default", limit: int = 10) -> List[StatisticalSession]:
        """Get recent sessions for a user"""
        with sqlite3.connect(self.db_path) as conn:
            conn.row_factory = sqlite3.Row
            cursor = conn.execute('''
                SELECT * FROM sessions WHERE user_id = ? 
                ORDER BY updated_at DESC LIMIT ?
            ''', (user_id, limit))
            
            sessions = []
            for row in cursor.fetchall():
                # Handle analysis_config loading with proper error handling
                analysis_config = None
                if row['analysis_config_json']:
                    try:
                        config_data = json.loads(row['analysis_config_json'])
                        # Ensure analysis_type is present and valid
                        if 'analysis_type' in config_data and config_data['analysis_type']:
                            analysis_config = AnalysisConfiguration.from_dict(config_data)
                    except Exception as e:
                        logger.warning(f"Could not load analysis config for session {row['session_id']}: {e}")
                        
                # Safe access to sql_query column (may not exist in older databases)
                sql_query = ""
                try:
                    sql_query = row['sql_query'] or ""
                except (KeyError, IndexError):
                    sql_query = ""
                
                session = StatisticalSession(
                    session_id=row['session_id'],
                    title=row['title'],
                    description=row['description'],
                    user_id=row['user_id'],
                    status=AnalysisStatus(row['status']) if row['status'] else AnalysisStatus.CREATED,
                    analysis_config=analysis_config,
                    created_at=datetime.fromisoformat(row['created_at']) if row['created_at'] else datetime.now(),
                    updated_at=datetime.fromisoformat(row['updated_at']) if row['updated_at'] else datetime.now(),
                    sql_query=sql_query
                )
                sessions.append(session)
                
            return sessions
    
    def get_all_sessions(self, user_id: str = "default") -> List[StatisticalSession]:
        """Get all sessions for a user"""
        with sqlite3.connect(self.db_path) as conn:
            conn.row_factory = sqlite3.Row
            cursor = conn.execute('''
                SELECT * FROM sessions WHERE user_id = ? 
                ORDER BY created_at DESC
            ''', (user_id,))
            
            sessions = []
            for row in cursor.fetchall():
                # Handle analysis_config loading with proper error handling
                analysis_config = None
                if row['analysis_config_json']:
                    try:
                        config_data = json.loads(row['analysis_config_json'])
                        # Ensure analysis_type is present and valid
                        if 'analysis_type' in config_data and config_data['analysis_type']:
                            analysis_config = AnalysisConfiguration.from_dict(config_data)
                    except Exception as e:
                        logger.warning(f"Could not load analysis config for session {row['session_id']}: {e}")
                
                # Safe access to sql_query column (may not exist in older databases)
                sql_query = ""
                try:
                    sql_query = row['sql_query'] or ""
                except (KeyError, IndexError):
                    sql_query = ""
                
                sessions.append(StatisticalSession(
                    session_id=row['session_id'],
                    user_id=row['user_id'],
                    created_at=datetime.fromisoformat(row['created_at']) if row['created_at'] else None,
                    updated_at=datetime.fromisoformat(row['updated_at']) if row['updated_at'] else None,
                    title=row['title'] or "",
                    description=row['description'] or "",
                    data_source=DataSource(**json.loads(row['data_source_json'])) if row['data_source_json'] else None,
                    analysis_config=analysis_config,
                    status=AnalysisStatus(row['status']),
                    sql_query=sql_query
                ))
            return sessions

    def delete_session(self, session_id: str) -> bool:
        """Delete analysis session"""
        try:
            with sqlite3.connect(self.db_path) as conn:
                cursor = conn.execute('DELETE FROM sessions WHERE session_id = ?', (session_id,))
                return cursor.rowcount > 0
        except Exception as e:
            logger.error(f"Error deleting session {session_id}: {str(e)}")
            return False

    def delete_session_steps(self, session_id: str) -> bool:
        """Delete all analysis steps for a session"""
        try:
            with sqlite3.connect(self.db_path) as conn:
                cursor = conn.execute('DELETE FROM analysis_steps WHERE session_id = ?', (session_id,))
                return True
        except Exception as e:
            logger.error(f"Error deleting session steps {session_id}: {str(e)}")
            return False

    def delete_session_results(self, session_id: str) -> bool:
        """Delete all analysis results for a session"""
        try:
            with sqlite3.connect(self.db_path) as conn:
                cursor = conn.execute('DELETE FROM analysis_results WHERE session_id = ?', (session_id,))
                return True
        except Exception as e:
            logger.error(f"Error deleting session results {session_id}: {str(e)}")
            return False

    def delete_step(self, step_id: str) -> bool:
        """Delete a specific analysis step"""
        try:
            with sqlite3.connect(self.db_path) as conn:
                cursor = conn.execute('DELETE FROM analysis_steps WHERE step_id = ?', (step_id,))
                return cursor.rowcount > 0
        except Exception as e:
            logger.error(f"Error deleting step {step_id}: {str(e)}")
            return False

    def delete_result(self, result_id: str) -> bool:
        """Delete a specific analysis result"""
        try:
            with sqlite3.connect(self.db_path) as conn:
                cursor = conn.execute('DELETE FROM analysis_results WHERE result_id = ?', (result_id,))
                return cursor.rowcount > 0
        except Exception as e:
            logger.error(f"Error deleting result {result_id}: {str(e)}")
            return False

Parameters

Name Type Default Kind
bases - -

Parameter Details

bases: Parameter of type

Return Value

Returns unspecified type

Class Interface

Methods

__init__(self, db_url)

Purpose: Internal method: init

Parameters:

  • db_url: Type: str

Returns: None

init_database(self)

Purpose: Initialize database with required tables

Returns: None

create_session(self, session) -> str

Purpose: Create new analysis session

Parameters:

  • session: Type: StatisticalSession

Returns: Returns str

get_session(self, session_id) -> Optional[StatisticalSession]

Purpose: Retrieve session by ID

Parameters:

  • session_id: Type: str

Returns: Returns Optional[StatisticalSession]

update_session_status(self, session_id, status)

Purpose: Update session status

Parameters:

  • session_id: Type: str
  • status: Type: AnalysisStatus

Returns: None

update_session(self, session)

Purpose: Update session data

Parameters:

  • session: Type: StatisticalSession

Returns: None

add_analysis_step(self, step) -> str

Purpose: Add analysis step

Parameters:

  • step: Type: AnalysisStep

Returns: Returns str

update_step(self, step)

Purpose: Update analysis step with execution results and metadata

Parameters:

  • step: Type: AnalysisStep

Returns: None

get_session_steps(self, session_id) -> List[AnalysisStep]

Purpose: Get all steps for a session

Parameters:

  • session_id: Type: str

Returns: Returns List[AnalysisStep]

add_analysis_result(self, result) -> str

Purpose: Add analysis result

Parameters:

  • result: Type: AnalysisResult

Returns: Returns str

get_session_results(self, session_id) -> List[AnalysisResult]

Purpose: Get all results for a session

Parameters:

  • session_id: Type: str

Returns: Returns List[AnalysisResult]

get_recent_sessions(self, user_id, limit) -> List[StatisticalSession]

Purpose: Get recent sessions for a user

Parameters:

  • user_id: Type: str
  • limit: Type: int

Returns: Returns List[StatisticalSession]

get_all_sessions(self, user_id) -> List[StatisticalSession]

Purpose: Get all sessions for a user

Parameters:

  • user_id: Type: str

Returns: Returns List[StatisticalSession]

delete_session(self, session_id) -> bool

Purpose: Delete analysis session

Parameters:

  • session_id: Type: str

Returns: Returns bool

delete_session_steps(self, session_id) -> bool

Purpose: Delete all analysis steps for a session

Parameters:

  • session_id: Type: str

Returns: Returns bool

delete_session_results(self, session_id) -> bool

Purpose: Delete all analysis results for a session

Parameters:

  • session_id: Type: str

Returns: Returns bool

delete_step(self, step_id) -> bool

Purpose: Delete a specific analysis step

Parameters:

  • step_id: Type: str

Returns: Returns bool

delete_result(self, result_id) -> bool

Purpose: Delete a specific analysis result

Parameters:

  • result_id: Type: str

Returns: Returns bool

Required Imports

import uuid
import json
import sqlite3
import logging
from datetime import datetime

Usage Example

# Example usage:
# result = DatabaseManager(bases)

Related Versions

Other versions of this component:

Similar Components

AI-powered semantic similarity - components with related functionality:

  • class DatabaseManager 79.6% similar

    SQLite database manager for SmartStat application that handles persistence of statistical analysis sessions, steps, and results.

    From: /tf/active/vicechatdev/smartstat/models.py
  • class SmartStatService 62.6% similar

    Service for running SmartStat analysis sessions in Vice AI

    From: /tf/active/vicechatdev/vice_ai/smartstat_service.py
  • class SmartStatConfig 53.2% similar

    Configuration class for SmartStat service that manages directory paths and API keys for various LLM providers integrated into Vice AI.

    From: /tf/active/vicechatdev/vice_ai/new_app.py
  • function main_v62 51.7% similar

    Demonstrates the SmartStat SQL Workflow by loading a database schema, initializing a SQL query generator, and generating SQL queries from natural language requests for various laboratory data analysis scenarios.

    From: /tf/active/vicechatdev/smartstat/demo_sql_workflow.py
← Back to Browse