class DatabaseManager
SQLite database manager for SmartStat application that handles persistence of statistical analysis sessions, steps, and results.
/tf/active/vicechatdev/smartstat/models.py
198 - 576
moderate
Purpose
DatabaseManager provides a complete data persistence layer for the SmartStat statistical analysis application. It manages three main entities: sessions (analysis workflows), analysis_steps (individual computation steps), and analysis_results (outputs from steps). The class handles database initialization, CRUD operations for all entities, and maintains referential integrity through foreign key relationships. It follows the vice_ai patterns for database management and includes comprehensive error handling and logging.
Source Code
class DatabaseManager:
"""Database manager for SmartStat following vice_ai patterns"""
def __init__(self, db_url: str = "smartstat.db"):
# Parse database URL to get actual path
if db_url.startswith('sqlite:///'):
self.db_path = db_url[10:] # Remove 'sqlite:///' prefix
elif db_url.startswith('sqlite://'):
self.db_path = db_url[9:] # Remove 'sqlite://' prefix
else:
self.db_path = db_url
# Ensure parent directory exists
from pathlib import Path
Path(self.db_path).parent.mkdir(parents=True, exist_ok=True)
self.init_database()
def init_database(self):
"""Initialize database with required tables"""
with sqlite3.connect(self.db_path) as conn:
conn.execute('''
CREATE TABLE IF NOT EXISTS sessions (
session_id TEXT PRIMARY KEY,
user_id TEXT NOT NULL,
created_at TIMESTAMP,
updated_at TIMESTAMP,
title TEXT,
description TEXT,
data_source_json TEXT,
analysis_config_json TEXT,
status TEXT
)
''')
conn.execute('''
CREATE TABLE IF NOT EXISTS analysis_steps (
step_id TEXT PRIMARY KEY,
session_id TEXT,
step_number INTEGER,
step_type TEXT,
input_data_json TEXT,
generated_script TEXT,
execution_output TEXT,
execution_error TEXT,
execution_success BOOLEAN,
created_at TIMESTAMP,
FOREIGN KEY (session_id) REFERENCES sessions (session_id)
)
''')
conn.execute('''
CREATE TABLE IF NOT EXISTS analysis_results (
result_id TEXT PRIMARY KEY,
session_id TEXT,
step_id TEXT,
result_type TEXT,
result_data_json TEXT,
file_paths_json TEXT,
metadata_json TEXT,
created_at TIMESTAMP,
FOREIGN KEY (session_id) REFERENCES sessions (session_id),
FOREIGN KEY (step_id) REFERENCES analysis_steps (step_id)
)
''')
conn.commit()
def create_session(self, session: StatisticalSession) -> str:
"""Create new analysis session"""
try:
import logging
logger = logging.getLogger(__name__)
logger.info(f"DatabaseManager.create_session called for session: {session.session_id}")
with sqlite3.connect(self.db_path) as conn:
logger.info(f"Database connection established")
# Prepare data for insertion
data_source_json = json.dumps(session.data_source.to_dict()) if session.data_source else None
analysis_config_json = json.dumps(session.analysis_config.to_dict()) if session.analysis_config else None
status_value = session.status.value
logger.info(f"Prepared data: status={status_value}, data_source={data_source_json is not None}, analysis_config={analysis_config_json is not None}")
try:
conn.execute('''
INSERT INTO sessions
(session_id, user_id, created_at, updated_at, title, description,
data_source_json, analysis_config_json, status)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (
session.session_id,
session.user_id,
session.created_at,
session.updated_at,
session.title,
session.description,
data_source_json,
analysis_config_json,
status_value
))
conn.commit()
logger.info(f"Session {session.session_id} inserted successfully")
except Exception as insert_error:
logger.error(f"Database insert error: {str(insert_error)}")
raise insert_error
return session.session_id
except Exception as e:
import logging
logger = logging.getLogger(__name__)
logger.error(f"Error in create_session: {str(e)}")
logger.error(f"Exception type: {type(e)}")
import traceback
logger.error(f"Traceback: {traceback.format_exc()}")
raise e
def get_session(self, session_id: str) -> Optional[StatisticalSession]:
"""Retrieve session by ID"""
with sqlite3.connect(self.db_path) as conn:
conn.row_factory = sqlite3.Row
cursor = conn.execute('''
SELECT * FROM sessions WHERE session_id = ?
''', (session_id,))
row = cursor.fetchone()
if row:
return StatisticalSession(
session_id=row['session_id'],
user_id=row['user_id'],
created_at=datetime.fromisoformat(row['created_at']) if row['created_at'] else None,
updated_at=datetime.fromisoformat(row['updated_at']) if row['updated_at'] else None,
title=row['title'] or "",
description=row['description'] or "",
data_source=DataSource.from_dict(json.loads(row['data_source_json'])) if row['data_source_json'] and row['data_source_json'] != '{}' else None,
analysis_config=AnalysisConfiguration.from_dict(json.loads(row['analysis_config_json'])) if row['analysis_config_json'] and row['analysis_config_json'] != '{}' else None,
status=AnalysisStatus(row['status'])
)
return None
def update_session_status(self, session_id: str, status: AnalysisStatus):
"""Update session status"""
with sqlite3.connect(self.db_path) as conn:
conn.execute('''
UPDATE sessions SET status = ?, updated_at = ? WHERE session_id = ?
''', (status.value, datetime.now(), session_id))
conn.commit()
def update_session(self, session: StatisticalSession):
"""Update session data"""
with sqlite3.connect(self.db_path) as conn:
conn.execute('''
UPDATE sessions SET
updated_at = ?, title = ?, description = ?,
data_source_json = ?, analysis_config_json = ?, status = ?
WHERE session_id = ?
''', (
session.updated_at,
session.title,
session.description,
json.dumps(session.data_source.to_dict()) if session.data_source else "{}",
json.dumps(session.analysis_config.to_dict()) if session.analysis_config else "{}",
session.status.value,
session.session_id
))
conn.commit()
def add_analysis_step(self, step: AnalysisStep) -> str:
"""Add analysis step"""
with sqlite3.connect(self.db_path) as conn:
conn.execute('''
INSERT INTO analysis_steps
(step_id, session_id, step_number, step_type, input_data_json,
generated_script, execution_output, execution_error, execution_success, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (
step.step_id,
step.session_id,
step.step_number,
step.step_type,
json.dumps(step.input_data),
step.generated_script,
step.execution_output,
step.execution_error,
step.execution_success,
step.created_at
))
conn.commit()
return step.step_id
def get_session_steps(self, session_id: str) -> List[AnalysisStep]:
"""Get all steps for a session"""
with sqlite3.connect(self.db_path) as conn:
conn.row_factory = sqlite3.Row
cursor = conn.execute('''
SELECT * FROM analysis_steps WHERE session_id = ? ORDER BY step_number
''', (session_id,))
steps = []
for row in cursor.fetchall():
steps.append(AnalysisStep(
step_id=row['step_id'],
session_id=row['session_id'],
step_number=row['step_number'],
step_type=row['step_type'],
input_data=json.loads(row['input_data_json']) if row['input_data_json'] else {},
generated_script=row['generated_script'] or "",
execution_output=row['execution_output'] or "",
execution_error=row['execution_error'] or "",
execution_success=bool(row['execution_success']),
created_at=datetime.fromisoformat(row['created_at']) if row['created_at'] else None
))
return steps
def add_analysis_result(self, result: AnalysisResult) -> str:
"""Add analysis result"""
with sqlite3.connect(self.db_path) as conn:
conn.execute('''
INSERT INTO analysis_results
(result_id, session_id, step_id, result_type, result_data_json,
file_paths_json, metadata_json, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
''', (
result.result_id,
result.session_id,
result.step_id,
result.result_type,
json.dumps(result.result_data),
json.dumps(result.file_paths),
json.dumps(result.metadata),
result.created_at
))
conn.commit()
return result.result_id
def get_session_results(self, session_id: str) -> List[AnalysisResult]:
"""Get all results for a session"""
with sqlite3.connect(self.db_path) as conn:
conn.row_factory = sqlite3.Row
cursor = conn.execute('''
SELECT * FROM analysis_results WHERE session_id = ? ORDER BY created_at
''', (session_id,))
results = []
for row in cursor.fetchall():
results.append(AnalysisResult(
result_id=row['result_id'],
session_id=row['session_id'],
step_id=row['step_id'],
result_type=row['result_type'],
result_data=json.loads(row['result_data_json']) if row['result_data_json'] else {},
file_paths=json.loads(row['file_paths_json']) if row['file_paths_json'] else [],
metadata=json.loads(row['metadata_json']) if row['metadata_json'] else {},
created_at=datetime.fromisoformat(row['created_at']) if row['created_at'] else None
))
return results
def get_recent_sessions(self, user_id: str = "default", limit: int = 10) -> List[StatisticalSession]:
"""Get recent sessions for a user"""
with sqlite3.connect(self.db_path) as conn:
conn.row_factory = sqlite3.Row
cursor = conn.execute('''
SELECT * FROM sessions WHERE user_id = ?
ORDER BY updated_at DESC LIMIT ?
''', (user_id, limit))
sessions = []
for row in cursor.fetchall():
# Handle analysis_config loading with proper error handling
analysis_config = None
if row['analysis_config_json']:
try:
config_data = json.loads(row['analysis_config_json'])
# Ensure analysis_type is present and valid
if 'analysis_type' in config_data and config_data['analysis_type']:
analysis_config = AnalysisConfiguration.from_dict(config_data)
except Exception as e:
logger.warning(f"Could not load analysis config for session {row['session_id']}: {e}")
session = StatisticalSession(
session_id=row['session_id'],
title=row['title'],
description=row['description'],
user_id=row['user_id'],
status=AnalysisStatus(row['status']) if row['status'] else AnalysisStatus.CREATED,
analysis_config=analysis_config,
created_at=datetime.fromisoformat(row['created_at']) if row['created_at'] else datetime.now(),
updated_at=datetime.fromisoformat(row['updated_at']) if row['updated_at'] else datetime.now()
)
sessions.append(session)
return sessions
def get_all_sessions(self, user_id: str = "default") -> List[StatisticalSession]:
"""Get all sessions for a user"""
with sqlite3.connect(self.db_path) as conn:
conn.row_factory = sqlite3.Row
cursor = conn.execute('''
SELECT * FROM sessions WHERE user_id = ?
ORDER BY created_at DESC
''', (user_id,))
sessions = []
for row in cursor.fetchall():
# Handle analysis_config loading with proper error handling
analysis_config = None
if row['analysis_config_json']:
try:
config_data = json.loads(row['analysis_config_json'])
# Ensure analysis_type is present and valid
if 'analysis_type' in config_data and config_data['analysis_type']:
analysis_config = AnalysisConfiguration.from_dict(config_data)
except Exception as e:
logger.warning(f"Could not load analysis config for session {row['session_id']}: {e}")
sessions.append(StatisticalSession(
session_id=row['session_id'],
user_id=row['user_id'],
created_at=datetime.fromisoformat(row['created_at']) if row['created_at'] else None,
updated_at=datetime.fromisoformat(row['updated_at']) if row['updated_at'] else None,
title=row['title'] or "",
description=row['description'] or "",
data_source=DataSource(**json.loads(row['data_source_json'])) if row['data_source_json'] else None,
analysis_config=analysis_config,
status=AnalysisStatus(row['status'])
))
return sessions
def delete_session(self, session_id: str) -> bool:
"""Delete analysis session"""
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.execute('DELETE FROM sessions WHERE session_id = ?', (session_id,))
return cursor.rowcount > 0
except Exception as e:
logger.error(f"Error deleting session {session_id}: {str(e)}")
return False
def delete_session_steps(self, session_id: str) -> bool:
"""Delete all analysis steps for a session"""
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.execute('DELETE FROM analysis_steps WHERE session_id = ?', (session_id,))
return True
except Exception as e:
logger.error(f"Error deleting session steps {session_id}: {str(e)}")
return False
def delete_session_results(self, session_id: str) -> bool:
"""Delete all analysis results for a session"""
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.execute('DELETE FROM analysis_results WHERE session_id = ?', (session_id,))
return True
except Exception as e:
logger.error(f"Error deleting session results {session_id}: {str(e)}")
return False
def delete_step(self, step_id: str) -> bool:
"""Delete a specific analysis step"""
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.execute('DELETE FROM analysis_steps WHERE step_id = ?', (step_id,))
return cursor.rowcount > 0
except Exception as e:
logger.error(f"Error deleting step {step_id}: {str(e)}")
return False
def delete_result(self, result_id: str) -> bool:
"""Delete a specific analysis result"""
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.execute('DELETE FROM analysis_results WHERE result_id = ?', (result_id,))
return cursor.rowcount > 0
except Exception as e:
logger.error(f"Error deleting result {result_id}: {str(e)}")
return False
Parameters
| Name | Type | Default | Kind |
|---|---|---|---|
bases |
- | - |
Parameter Details
db_url: Database connection string or file path. Accepts 'sqlite:///' or 'sqlite://' prefixed URLs or plain file paths. Defaults to 'smartstat.db' in the current directory. The parent directory is automatically created if it doesn't exist.
Return Value
Constructor returns a DatabaseManager instance with initialized database connection. Methods return: create_session/add_analysis_step/add_analysis_result return string IDs; get_session returns Optional[StatisticalSession]; get_session_steps/get_session_results/get_recent_sessions/get_all_sessions return Lists; update methods return None; delete methods return bool indicating success.
Class Interface
Methods
__init__(self, db_url: str = 'smartstat.db')
Purpose: Initialize database manager, parse URL, create parent directories, and initialize database schema
Parameters:
db_url: Database connection string or file path, defaults to 'smartstat.db'
Returns: None (constructor)
init_database(self)
Purpose: Create database tables (sessions, analysis_steps, analysis_results) if they don't exist
Returns: None
create_session(self, session: StatisticalSession) -> str
Purpose: Insert a new statistical analysis session into the database with comprehensive error logging
Parameters:
session: StatisticalSession object containing all session data including user_id, title, description, data_source, analysis_config, and status
Returns: String session_id of the created session
get_session(self, session_id: str) -> Optional[StatisticalSession]
Purpose: Retrieve a single session by its ID, reconstructing all nested objects from JSON
Parameters:
session_id: Unique identifier for the session to retrieve
Returns: StatisticalSession object if found, None otherwise
update_session_status(self, session_id: str, status: AnalysisStatus)
Purpose: Update only the status field of a session and set updated_at timestamp
Parameters:
session_id: ID of session to updatestatus: New AnalysisStatus enum value
Returns: None
update_session(self, session: StatisticalSession)
Purpose: Update all fields of an existing session including title, description, data_source, analysis_config, and status
Parameters:
session: StatisticalSession object with updated data
Returns: None
add_analysis_step(self, step: AnalysisStep) -> str
Purpose: Insert a new analysis step record including generated script, execution output, and success status
Parameters:
step: AnalysisStep object containing step_id, session_id, step_number, step_type, input_data, generated_script, execution details
Returns: String step_id of the created step
get_session_steps(self, session_id: str) -> List[AnalysisStep]
Purpose: Retrieve all analysis steps for a session ordered by step_number
Parameters:
session_id: ID of session whose steps to retrieve
Returns: List of AnalysisStep objects ordered by step_number
add_analysis_result(self, result: AnalysisResult) -> str
Purpose: Insert a new analysis result record with result data, file paths, and metadata
Parameters:
result: AnalysisResult object containing result_id, session_id, step_id, result_type, result_data, file_paths, metadata
Returns: String result_id of the created result
get_session_results(self, session_id: str) -> List[AnalysisResult]
Purpose: Retrieve all analysis results for a session ordered by created_at timestamp
Parameters:
session_id: ID of session whose results to retrieve
Returns: List of AnalysisResult objects ordered by creation time
get_recent_sessions(self, user_id: str = 'default', limit: int = 10) -> List[StatisticalSession]
Purpose: Retrieve most recent sessions for a user with pagination support, includes error handling for malformed analysis_config
Parameters:
user_id: User identifier to filter sessions, defaults to 'default'limit: Maximum number of sessions to return, defaults to 10
Returns: List of StatisticalSession objects ordered by updated_at descending
get_all_sessions(self, user_id: str = 'default') -> List[StatisticalSession]
Purpose: Retrieve all sessions for a user ordered by creation date, includes error handling for malformed analysis_config
Parameters:
user_id: User identifier to filter sessions, defaults to 'default'
Returns: List of all StatisticalSession objects for the user ordered by created_at descending
delete_session(self, session_id: str) -> bool
Purpose: Delete a session record from the database
Parameters:
session_id: ID of session to delete
Returns: True if session was deleted (rowcount > 0), False otherwise or on error
delete_session_steps(self, session_id: str) -> bool
Purpose: Delete all analysis steps associated with a session
Parameters:
session_id: ID of session whose steps to delete
Returns: True if operation succeeded, False on error
delete_session_results(self, session_id: str) -> bool
Purpose: Delete all analysis results associated with a session
Parameters:
session_id: ID of session whose results to delete
Returns: True if operation succeeded, False on error
delete_step(self, step_id: str) -> bool
Purpose: Delete a specific analysis step by its ID
Parameters:
step_id: ID of step to delete
Returns: True if step was deleted (rowcount > 0), False otherwise or on error
delete_result(self, result_id: str) -> bool
Purpose: Delete a specific analysis result by its ID
Parameters:
result_id: ID of result to delete
Returns: True if result was deleted (rowcount > 0), False otherwise or on error
Attributes
| Name | Type | Description | Scope |
|---|---|---|---|
db_path |
str | Parsed file system path to the SQLite database file, extracted from db_url parameter | instance |
Dependencies
sqlite3jsonloggingdatetimetypingpathlibuuiddataclassesenumtraceback
Required Imports
import sqlite3
import json
import logging
from datetime import datetime
from typing import List, Dict, Optional, Any
from pathlib import Path
Conditional/Optional Imports
These imports are only needed under specific conditions:
from pathlib import Path
Condition: used in __init__ to ensure parent directory exists
Required (conditional)import traceback
Condition: used in create_session error handling for detailed logging
Required (conditional)Usage Example
# Initialize database manager
db = DatabaseManager('sqlite:///data/smartstat.db')
# Create a new session
session = StatisticalSession(
session_id='sess_123',
user_id='user_001',
title='My Analysis',
description='Testing analysis',
status=AnalysisStatus.CREATED
)
session_id = db.create_session(session)
# Retrieve session
retrieved = db.get_session(session_id)
# Update session status
db.update_session_status(session_id, AnalysisStatus.RUNNING)
# Add analysis step
step = AnalysisStep(
step_id='step_001',
session_id=session_id,
step_number=1,
step_type='data_load',
input_data={'file': 'data.csv'},
execution_success=True
)
db.add_analysis_step(step)
# Get all steps for session
steps = db.get_session_steps(session_id)
# Add result
result = AnalysisResult(
result_id='res_001',
session_id=session_id,
step_id='step_001',
result_type='summary',
result_data={'mean': 42.5}
)
db.add_analysis_result(result)
# Get recent sessions
recent = db.get_recent_sessions(user_id='user_001', limit=5)
# Delete session and related data
db.delete_session_results(session_id)
db.delete_session_steps(session_id)
db.delete_session(session_id)
Best Practices
- Always call init_database() during initialization to ensure tables exist (automatically done in __init__)
- Use context managers (with statements) for all database operations to ensure proper connection handling
- Handle Optional return types from get_session() - it returns None if session not found
- Delete related data in correct order: results first, then steps, then session to maintain referential integrity
- The class automatically creates parent directories for the database file
- JSON serialization is handled internally for complex objects (DataSource, AnalysisConfiguration)
- All timestamps are stored as ISO format strings and converted to datetime objects on retrieval
- Error handling includes detailed logging with traceback information for debugging
- The database uses foreign key constraints - ensure parent records exist before creating child records
- Use get_recent_sessions() with limit parameter for pagination instead of loading all sessions
- The class handles both 'sqlite:///' and 'sqlite://' URL prefixes as well as plain file paths
Tags
Similar Components
AI-powered semantic similarity - components with related functionality:
-
class DatabaseManager_v1 79.6% similar
-
class DatabaseManager_v1 73.9% similar
-
class SmartStatSession 63.8% similar
-
function inspect_database 55.4% similar
-
function main_v62 55.2% similar