class DatabaseManager_v1
SQLite database manager for persistent storage
/tf/active/vicechatdev/vice_ai/models.py
603 - 1684
moderate
Purpose
SQLite database manager for persistent storage
Source Code
class DatabaseManager:
"""SQLite database manager for persistent storage"""
def __init__(self, db_path: str = '/tf/active/vice_ai/complex_documents.db'):
self.db_path = db_path
self.conn = None
self.init_database()
def _get_connection(self):
"""Get database connection"""
if self.conn is None:
self.conn = sqlite3.connect(self.db_path, check_same_thread=False)
return self.conn
def init_database(self):
"""Initialize database schema"""
conn = self._get_connection()
cursor = conn.cursor()
# TextSections table
cursor.execute('''
CREATE TABLE IF NOT EXISTS text_sections (
id TEXT PRIMARY KEY,
owner TEXT NOT NULL,
title TEXT NOT NULL,
section_type TEXT NOT NULL,
level INTEGER DEFAULT 1,
current_content TEXT DEFAULT '',
current_version_id TEXT,
status TEXT DEFAULT 'draft',
created_at TIMESTAMP,
updated_at TIMESTAMP,
chat_configuration TEXT, -- JSON
chat_messages TEXT, -- JSON
chat_session_id TEXT, -- Link to ChatSession
last_references TEXT, -- JSON
tags TEXT, -- JSON
metadata TEXT -- JSON
)
''')
# TextSection versions table
cursor.execute('''
CREATE TABLE IF NOT EXISTS text_section_versions (
version_id TEXT PRIMARY KEY,
text_section_id TEXT NOT NULL,
content TEXT NOT NULL,
timestamp TIMESTAMP,
author TEXT NOT NULL,
change_summary TEXT,
generated_by_ai BOOLEAN DEFAULT 0,
FOREIGN KEY (text_section_id) REFERENCES text_sections (id)
)
''')
# DataSections table
cursor.execute('''
CREATE TABLE IF NOT EXISTS data_sections (
id TEXT PRIMARY KEY,
owner TEXT NOT NULL,
title TEXT NOT NULL,
description TEXT DEFAULT '',
current_content TEXT DEFAULT '',
status TEXT DEFAULT 'draft',
created_at TIMESTAMP,
updated_at TIMESTAMP,
analysis_session_id TEXT,
dataset_info TEXT, -- JSON
generated_plots TEXT, -- JSON array of plot paths
analysis_conclusions TEXT,
tags TEXT, -- JSON
metadata TEXT -- JSON
)
''')
# Documents table
cursor.execute('''
CREATE TABLE IF NOT EXISTS documents (
id TEXT PRIMARY KEY,
owner TEXT NOT NULL,
title TEXT NOT NULL,
description TEXT DEFAULT '',
sections TEXT, -- JSON
current_version_id TEXT,
version_number INTEGER DEFAULT 1,
created_at TIMESTAMP,
updated_at TIMESTAMP,
tags TEXT, -- JSON
metadata TEXT, -- JSON
shared_with TEXT -- JSON
)
''')
# Document versions table
cursor.execute('''
CREATE TABLE IF NOT EXISTS document_versions (
version_id TEXT PRIMARY KEY,
document_id TEXT NOT NULL,
title TEXT NOT NULL,
description TEXT DEFAULT '',
sections TEXT, -- JSON snapshot of sections at this version
timestamp TIMESTAMP,
author TEXT NOT NULL,
change_summary TEXT,
version_number INTEGER DEFAULT 1,
FOREIGN KEY (document_id) REFERENCES documents (id)
)
''')
# DocumentSections table (linking table for document-section relationships)
cursor.execute('''
CREATE TABLE IF NOT EXISTS document_sections (
id TEXT PRIMARY KEY,
document_id TEXT NOT NULL,
section_id TEXT NOT NULL,
section_type TEXT NOT NULL DEFAULT 'TEXT',
position INTEGER NOT NULL,
parent_id TEXT, -- For hierarchical structure
is_copy BOOLEAN DEFAULT 0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (document_id) REFERENCES documents (id),
FOREIGN KEY (parent_id) REFERENCES document_sections (id)
)
''')
# ChatSessions table
cursor.execute('''
CREATE TABLE IF NOT EXISTS chat_sessions (
id TEXT PRIMARY KEY,
section_id TEXT NOT NULL,
document_id TEXT NOT NULL,
messages TEXT, -- JSON
context_documents TEXT, -- JSON
chat_references TEXT, -- JSON (renamed from references)
config TEXT, -- JSON
created_at TIMESTAMP,
updated_at TIMESTAMP,
FOREIGN KEY (section_id) REFERENCES text_sections (id),
FOREIGN KEY (document_id) REFERENCES documents (id)
)
''')
# Create indexes
cursor.execute('CREATE INDEX IF NOT EXISTS idx_text_sections_owner ON text_sections (owner)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_documents_owner ON documents (owner)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_versions_section ON text_section_versions (text_section_id)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_document_versions_doc ON document_versions (document_id)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_document_versions_timestamp ON document_versions (document_id, timestamp)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_document_sections_doc ON document_sections (document_id)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_document_sections_section ON document_sections (section_id)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_document_sections_position ON document_sections (document_id, position)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_chat_sessions_section ON chat_sessions (section_id)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_chat_sessions_document ON chat_sessions (document_id)')
# Migrate existing document_sections table to new schema
self._migrate_document_sections_schema(cursor)
# Migrate existing schema if needed
self._migrate_text_sections_schema(cursor)
self._migrate_documents_schema(cursor)
conn.commit()
def _migrate_text_sections_schema(self, cursor):
"""Migrate text_sections table to include new columns if they don't exist"""
try:
# Get current column information
cursor.execute("PRAGMA table_info(text_sections)")
columns = [row[1] for row in cursor.fetchall()]
# Add missing columns
if 'chat_session_id' not in columns:
cursor.execute('ALTER TABLE text_sections ADD COLUMN chat_session_id TEXT')
print("Added chat_session_id column to text_sections")
if 'analysis_session_id' not in columns:
cursor.execute('ALTER TABLE text_sections ADD COLUMN analysis_session_id TEXT')
print("Added analysis_session_id column to text_sections")
if 'last_references' not in columns:
cursor.execute('ALTER TABLE text_sections ADD COLUMN last_references TEXT')
print("Added last_references column to text_sections")
if 'tags' not in columns:
cursor.execute('ALTER TABLE text_sections ADD COLUMN tags TEXT')
print("Added tags column to text_sections")
if 'metadata' not in columns:
cursor.execute('ALTER TABLE text_sections ADD COLUMN metadata TEXT')
print("Added metadata column to text_sections")
except Exception as e:
print(f"Warning: Schema migration failed: {e}")
def _migrate_documents_schema(self, cursor):
"""Migrate documents table to include versioning columns if they don't exist"""
try:
# Get current column information
cursor.execute("PRAGMA table_info(documents)")
columns = [row[1] for row in cursor.fetchall()]
# Add missing columns
if 'current_version_id' not in columns:
cursor.execute('ALTER TABLE documents ADD COLUMN current_version_id TEXT')
print("Added current_version_id column to documents")
if 'version_number' not in columns:
cursor.execute('ALTER TABLE documents ADD COLUMN version_number INTEGER DEFAULT 1')
print("Added version_number column to documents")
except Exception as e:
print(f"Warning: Documents schema migration failed: {e}")
def _migrate_document_sections_schema(self, cursor):
"""Migrate document_sections table to new schema with section_id and section_type"""
try:
# Get current column information
cursor.execute("PRAGMA table_info(document_sections)")
columns = [row[1] for row in cursor.fetchall()]
# Check if we need to migrate from old schema
if 'text_section_id' in columns and 'section_id' not in columns:
print("Migrating document_sections table to new schema...")
# Create new table with updated schema
cursor.execute('''
CREATE TABLE document_sections_new (
id TEXT PRIMARY KEY,
document_id TEXT NOT NULL,
section_id TEXT NOT NULL,
section_type TEXT NOT NULL DEFAULT 'TEXT',
position INTEGER NOT NULL,
parent_id TEXT,
is_copy BOOLEAN DEFAULT FALSE,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (document_id) REFERENCES documents (id)
)
''')
# Migrate existing data
cursor.execute('''
INSERT INTO document_sections_new
(id, document_id, section_id, section_type, position, parent_id, is_copy, created_at)
SELECT id, document_id, text_section_id, 'TEXT', position, parent_id, is_copy, created_at
FROM document_sections
''')
# Drop old table and rename new one
cursor.execute('DROP TABLE document_sections')
cursor.execute('ALTER TABLE document_sections_new RENAME TO document_sections')
print("Document sections table migration completed")
except Exception as e:
print(f"Warning: Document sections schema migration failed: {e}")
def save_text_section(self, text_section: TextSection) -> bool:
"""Save or update a text section"""
try:
conn = self._get_connection()
cursor = conn.cursor()
cursor.execute('''
INSERT OR REPLACE INTO text_sections (
id, owner, title, section_type, level, current_content,
current_version_id, status, created_at, updated_at,
chat_configuration, chat_messages, chat_session_id, analysis_session_id,
last_references, tags, metadata
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (
text_section.id,
text_section.owner,
text_section.title,
text_section.section_type.value,
text_section.level,
text_section.current_content,
text_section.current_version_id,
text_section.status.value,
text_section.created_at.isoformat(),
text_section.updated_at.isoformat(),
json.dumps(text_section.chat_configuration.to_dict()),
json.dumps([msg.to_dict() for msg in text_section.chat_messages]),
text_section.chat_session_id,
text_section.analysis_session_id,
json.dumps(text_section.last_references),
json.dumps(text_section.tags),
json.dumps(text_section.metadata)
))
conn.commit()
return True
except Exception as e:
print(f"Error saving text section: {e}")
return False
def get_text_section(self, section_id: str) -> Optional[TextSection]:
"""Retrieve a text section by ID"""
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute('SELECT * FROM text_sections WHERE id = ?', (section_id,))
row = cursor.fetchone()
if row:
return self._row_to_text_section(row)
return None
except Exception as e:
print(f"Error retrieving text section: {e}")
return None
def get_user_text_sections(self, owner: str) -> List[TextSection]:
"""Get all text sections for a user"""
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute('SELECT * FROM text_sections WHERE owner = ? ORDER BY updated_at DESC', (owner,))
rows = cursor.fetchall()
text_sections = []
for i, row in enumerate(rows):
try:
section = self._row_to_text_section(row)
text_sections.append(section)
except Exception as e:
print(f"ERROR: Failed to process text section {row[0]} ({row[2]}): {e}")
continue
return text_sections
except Exception as e:
print(f"Error retrieving user text sections: {e}")
return []
def save_text_section_version(self, version: TextSectionVersion, text_section_id: str) -> bool:
"""Save a version of text section content"""
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute('''
INSERT INTO text_section_versions (
version_id, text_section_id, content, timestamp, author,
change_summary, generated_by_ai
) VALUES (?, ?, ?, ?, ?, ?, ?)
''', (
version.version_id,
text_section_id,
version.content,
version.timestamp.isoformat(),
version.author,
version.change_summary,
version.generated_by_ai
))
conn.commit()
return True
except Exception as e:
print(f"Error saving text section version: {e}")
return False
def get_text_section_versions(self, text_section_id: str) -> List[TextSectionVersion]:
"""Get all versions for a text section"""
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute('''
SELECT * FROM text_section_versions
WHERE text_section_id = ?
ORDER BY timestamp DESC
''', (text_section_id,))
rows = cursor.fetchall()
return [self._row_to_version(row) for row in rows]
except Exception as e:
print(f"Error retrieving text section versions: {e}")
return []
def get_documents_containing_section(self, text_section_id: str) -> List[Document]:
"""Get all documents that contain a specific text section"""
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute('''
SELECT DISTINCT d.* FROM documents d
JOIN document_sections ds ON d.id = ds.document_id
WHERE ds.text_section_id = ?
ORDER BY d.updated_at DESC
''', (text_section_id,))
rows = cursor.fetchall()
documents = []
for row in rows:
try:
document = self._row_to_document(row)
documents.append(document)
except Exception as e:
print(f"ERROR: Failed to process document {row[0]}: {e}")
continue
return documents
except Exception as e:
print(f"Error retrieving documents containing section: {e}")
return []
# ============================================================================
# DATA SECTION METHODS
# ============================================================================
def save_data_section(self, data_section: DataSection) -> bool:
"""Save or update a data section"""
try:
conn = self._get_connection()
cursor = conn.cursor()
cursor.execute('''
INSERT OR REPLACE INTO data_sections (
id, owner, title, description, current_content, status,
created_at, updated_at, analysis_session_id, dataset_info,
generated_plots, analysis_conclusions, tags, metadata
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (
data_section.id,
data_section.owner,
data_section.title,
data_section.description,
data_section.current_content,
data_section.status.value,
data_section.created_at.isoformat(),
data_section.updated_at.isoformat(),
data_section.analysis_session_id,
json.dumps(data_section.dataset_info),
json.dumps(data_section.generated_plots),
data_section.analysis_conclusions,
json.dumps(data_section.tags),
json.dumps(data_section.metadata)
))
conn.commit()
return True
except Exception as e:
print(f"Error saving data section: {e}")
return False
def get_data_section(self, section_id: str) -> Optional[DataSection]:
"""Retrieve a data section by ID"""
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute('SELECT * FROM data_sections WHERE id = ?', (section_id,))
row = cursor.fetchone()
if row:
return self._row_to_data_section(row)
return None
except Exception as e:
print(f"Error retrieving data section: {e}")
return None
def get_user_data_sections(self, owner: str) -> List[DataSection]:
"""Get all data sections for a user"""
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute('''
SELECT * FROM data_sections
WHERE owner = ?
ORDER BY updated_at DESC
''', (owner,))
rows = cursor.fetchall()
return [self._row_to_data_section(row) for row in rows]
except Exception as e:
print(f"Error getting user data sections: {e}")
return []
def _row_to_data_section(self, row) -> DataSection:
"""Convert database row to DataSection object"""
try:
# Handle potentially NULL JSON fields
dataset_info = {}
if row[9]: # dataset_info column
try:
dataset_info = json.loads(row[9])
except json.JSONDecodeError:
dataset_info = {}
generated_plots = []
if row[10]: # generated_plots column
try:
generated_plots = json.loads(row[10])
except json.JSONDecodeError:
generated_plots = []
tags = []
if row[12]: # tags column
try:
tags = json.loads(row[12])
except json.JSONDecodeError:
tags = []
metadata = {}
if row[13]: # metadata column
try:
metadata = json.loads(row[13])
except json.JSONDecodeError:
metadata = {}
except Exception as e:
# Set all to safe defaults if there's any unexpected error
dataset_info = {}
generated_plots = []
tags = []
metadata = {}
return DataSection(
id=row[0],
owner=row[1],
title=row[2],
description=row[3] or "",
current_content=row[4] or "",
status=ContentStatus(row[5]),
created_at=datetime.fromisoformat(row[6]),
updated_at=datetime.fromisoformat(row[7]),
analysis_session_id=row[8],
dataset_info=dataset_info,
generated_plots=generated_plots,
analysis_conclusions=row[11] or "",
tags=tags,
metadata=metadata
)
def save_document(self, document: Document) -> bool:
"""Save or update a document"""
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute('''
INSERT OR REPLACE INTO documents (
id, owner, title, description, sections, created_at, updated_at,
tags, metadata, shared_with
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (
document.id,
document.owner,
document.title,
document.description,
json.dumps([section.to_dict() for section in document.sections]),
document.created_at.isoformat(),
document.updated_at.isoformat(),
json.dumps(document.tags),
json.dumps(document.metadata),
json.dumps(document.shared_with)
))
conn.commit()
return True
except Exception as e:
print(f"Error saving document: {e}")
return False
def get_document(self, document_id: str) -> Optional[Document]:
"""Retrieve a document by ID"""
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute('SELECT * FROM documents WHERE id = ?', (document_id,))
row = cursor.fetchone()
if row:
return self._row_to_document(row)
return None
except Exception as e:
print(f"Error retrieving document: {e}")
return None
def get_user_documents(self, owner: str) -> List[Document]:
"""Get all documents for a user"""
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute('SELECT * FROM documents WHERE owner = ? ORDER BY updated_at DESC', (owner,))
rows = cursor.fetchall()
return [self._row_to_document(row) for row in rows]
except Exception as e:
print(f"Error retrieving user documents: {e}")
return []
def delete_text_section(self, section_id: str) -> bool:
"""Delete a text section and its versions, including all document references"""
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
# First, find which documents contain this section and update their sections JSON
cursor.execute('SELECT document_id FROM document_sections WHERE section_id = ? AND section_type = ?',
(section_id, 'TEXT'))
affected_docs = [row[0] for row in cursor.fetchall()]
print(f"📄 Deleting text section {section_id} - found in {len(affected_docs)} documents")
# Delete versions first
cursor.execute('DELETE FROM text_section_versions WHERE text_section_id = ?', (section_id,))
print(f"✅ Deleted versions for section {section_id}")
# Delete chat sessions
cursor.execute('DELETE FROM chat_sessions WHERE section_id = ?', (section_id,))
print(f"✅ Deleted chat sessions for section {section_id}")
# Delete references from document_sections table
cursor.execute('DELETE FROM document_sections WHERE section_id = ? AND section_type = ?',
(section_id, 'TEXT'))
print(f"✅ Deleted document_sections references for section {section_id}")
# Update the sections JSON in each affected document
for doc_id in affected_docs:
cursor.execute('SELECT sections FROM documents WHERE id = ?', (doc_id,))
row = cursor.fetchone()
if row:
sections_data = json.loads(row[0])
# Remove the deleted section from the sections list
updated_sections = [s for s in sections_data if s.get('section_id') != section_id]
# Update the document with cleaned sections list
cursor.execute('UPDATE documents SET sections = ?, updated_at = ? WHERE id = ?',
(json.dumps(updated_sections), datetime.now().isoformat(), doc_id))
print(f"✅ Updated sections JSON in {len(affected_docs)} documents")
# Delete text section
cursor.execute('DELETE FROM text_sections WHERE id = ?', (section_id,))
print(f"✅ Deleted text section {section_id}")
conn.commit()
print(f"✅ Successfully deleted text section {section_id}")
return True
except Exception as e:
import traceback
print(f"❌ Error deleting text section {section_id}: {e}")
print(f"❌ Traceback: {traceback.format_exc()}")
return False
def delete_data_section(self, section_id: str) -> bool:
"""Delete a data section and its references from documents"""
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
# First, find which documents contain this section and update their sections JSON
cursor.execute('SELECT document_id FROM document_sections WHERE section_id = ? AND section_type = ?',
(section_id, SectionType.DATA.value))
affected_docs = [row[0] for row in cursor.fetchall()]
# Delete references from document_sections table
cursor.execute('DELETE FROM document_sections WHERE section_id = ? AND section_type = ?',
(section_id, SectionType.DATA.value))
# Update the sections JSON in each affected document
for doc_id in affected_docs:
cursor.execute('SELECT sections FROM documents WHERE id = ?', (doc_id,))
row = cursor.fetchone()
if row:
sections_data = json.loads(row[0])
# Remove the deleted section from the sections list
updated_sections = [s for s in sections_data if s.get('section_id') != section_id]
# Update the document with cleaned sections list
cursor.execute('UPDATE documents SET sections = ?, updated_at = ? WHERE id = ?',
(json.dumps(updated_sections), datetime.now().isoformat(), doc_id))
# Delete the data section
cursor.execute('DELETE FROM data_sections WHERE id = ?', (section_id,))
conn.commit()
return True
except Exception as e:
print(f"Error deleting data section: {e}")
return False
def save_chat_session(self, chat_session: ChatSession) -> bool:
"""Save or update a chat session"""
try:
conn = self._get_connection()
cursor = conn.cursor()
cursor.execute('''
INSERT OR REPLACE INTO chat_sessions (
id, section_id, document_id, messages, context_documents,
chat_references, config, created_at, updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (
chat_session.id,
chat_session.section_id,
chat_session.document_id,
json.dumps(chat_session.messages),
json.dumps(chat_session.context_documents),
json.dumps(chat_session.references),
json.dumps(chat_session.config.to_dict() if chat_session.config else {}),
chat_session.created_at.isoformat(),
chat_session.updated_at.isoformat()
))
conn.commit()
return True
except Exception as e:
print(f"Error saving chat session: {e}")
return False
def get_chat_session(self, session_id: str) -> Optional[ChatSession]:
"""Retrieve a chat session by ID"""
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute('SELECT * FROM chat_sessions WHERE id = ?', (session_id,))
row = cursor.fetchone()
if row:
return self._row_to_chat_session(row)
return None
except Exception as e:
print(f"Error retrieving chat session: {e}")
return None
def get_chat_session_by_section(self, section_id: str) -> Optional[ChatSession]:
"""Retrieve a chat session by section ID"""
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute('SELECT * FROM chat_sessions WHERE section_id = ?', (section_id,))
row = cursor.fetchone()
if row:
return self._row_to_chat_session(row)
return None
except Exception as e:
print(f"Error retrieving chat session by section: {e}")
return None
def delete_document(self, document_id: str) -> bool:
"""Delete a document and its associated document sections"""
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
# First delete document sections (linking table)
cursor.execute('DELETE FROM document_sections WHERE document_id = ?', (document_id,))
# Then delete the document itself
cursor.execute('DELETE FROM documents WHERE id = ?', (document_id,))
conn.commit()
return True
except Exception as e:
print(f"Error deleting document: {e}")
return False
def save_document_version(self, version: DocumentVersion) -> bool:
"""Save a document version"""
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute('''
INSERT INTO document_versions (
version_id, document_id, title, description, sections,
timestamp, author, change_summary, version_number
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (
version.version_id,
version.document_id,
version.title,
version.description,
json.dumps(version.sections),
version.timestamp.isoformat(),
version.author,
version.change_summary,
version.version_number
))
conn.commit()
return True
except Exception as e:
print(f"Error saving document version: {e}")
return False
def save_document_section(self, doc_section: DocumentSection, document_id: str) -> bool:
"""Save a document section reference to the document_sections table"""
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute('''
INSERT OR REPLACE INTO document_sections (
id, document_id, section_id, section_type, position, parent_id, is_copy, created_at
) VALUES (?, ?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP)
''', (
doc_section.id,
document_id,
doc_section.section_id,
doc_section.section_type.value,
doc_section.position,
doc_section.parent_id,
doc_section.is_copy
))
conn.commit()
return True
except Exception as e:
print(f"Error saving document section: {e}")
return False
def sync_document_sections_to_table(self) -> bool:
"""Sync existing JSON document sections to the document_sections table"""
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
# Clear existing data
cursor.execute('DELETE FROM document_sections')
# Get all documents with sections
cursor.execute('SELECT id, sections FROM documents WHERE sections IS NOT NULL AND sections != ""')
documents = cursor.fetchall()
for doc_id, sections_json in documents:
if sections_json:
try:
sections = json.loads(sections_json)
for section_data in sections:
# Handle both old and new format
section_id = section_data.get('section_id') or section_data.get('text_section_id')
section_type = section_data.get('section_type', 'TEXT')
if section_id: # Only sync if we have a valid section_id
cursor.execute('''
INSERT INTO document_sections (
id, document_id, section_id, section_type, position, parent_id, is_copy, created_at
) VALUES (?, ?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP)
''', (
section_data.get('id'),
doc_id,
section_id,
section_type,
section_data.get('position', 0),
section_data.get('parent_id'),
section_data.get('is_copy', False)
))
except json.JSONDecodeError as e:
print(f"Error parsing sections JSON for document {doc_id}: {e}")
continue
conn.commit()
return True
except Exception as e:
print(f"Error syncing document sections: {e}")
return False
def get_document_versions(self, document_id: str) -> List[Dict]:
"""Get all versions for a document"""
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute('''
SELECT * FROM document_versions
WHERE document_id = ?
ORDER BY timestamp DESC
''', (document_id,))
rows = cursor.fetchall()
versions = []
for row in rows:
try:
version_dict = {
'version_id': row[0],
'document_id': row[1],
'title': row[2],
'description': row[3],
'sections': json.loads(row[4]) if row[4] else [],
'timestamp': row[5],
'author': row[6],
'change_summary': row[7],
'version_number': row[8]
}
versions.append(version_dict)
except Exception as e:
print(f"Error processing document version {row[0]}: {e}")
continue
return versions
except Exception as e:
print(f"Error retrieving document versions: {e}")
return []
def _row_to_text_section(self, row) -> TextSection:
"""Convert database row to TextSection object"""
try:
# Handle potentially NULL JSON fields with graceful fallbacks
chat_config_data = {}
if row[10]:
try:
chat_config_data = json.loads(row[10])
except json.JSONDecodeError:
chat_config_data = {} # Fallback to empty dict
chat_messages_data = []
if row[11]:
try:
chat_messages_data = json.loads(row[11])
except json.JSONDecodeError:
chat_messages_data = [] # Fallback to empty list
last_refs_data = []
if row[14]: # Updated: last_references is now at index 14
try:
last_refs_data = json.loads(row[14])
except json.JSONDecodeError:
last_refs_data = [] # Fallback to empty list
tags_data = []
if row[15]: # Updated: tags is now at index 15
try:
tags_data = json.loads(row[15])
except json.JSONDecodeError:
tags_data = [] # Fallback to empty list
metadata_data = {}
if row[16]: # Updated: metadata is now at index 16
try:
metadata_data = json.loads(row[16])
except json.JSONDecodeError:
# Check if it's a plain UUID string (common corrupted data case)
if len(row[16]) == 36 and '-' in row[16]: # Updated: metadata is now at index 16
# Treat as legacy chat_session_id, use empty dict for metadata
metadata_data = {}
else:
metadata_data = {} # Fallback to empty dict
except Exception as e:
# Set all to safe defaults if there's any unexpected error
chat_config_data = {}
chat_messages_data = []
last_refs_data = []
tags_data = []
metadata_data = {}
# Handle chat_session_id - ensure it's a string or None
chat_session_id = row[12] # chat_session_id is still at index 12
# Handle analysis_session_id - ensure it's a string or None
analysis_session_id = row[13] if len(row) > 13 else None # analysis_session_id is at index 13
# Clean up corrupted data
if chat_session_id is not None:
if isinstance(chat_session_id, str):
# Check for corrupted string values
corrupted_values = ['{}', '[]', 'null', '']
if chat_session_id in corrupted_values:
chat_session_id = None
else:
# Invalid type (e.g., serialized object), set to None
chat_session_id = None
# Clean up analysis_session_id
if analysis_session_id is not None:
if isinstance(analysis_session_id, str):
# Check for corrupted string values
corrupted_values = ['{}', '[]', 'null', '']
if analysis_session_id in corrupted_values:
analysis_session_id = None
else:
# Invalid type, set to None
analysis_session_id = None
return TextSection(
id=row[0],
owner=row[1],
title=row[2],
section_type=SectionType(row[3]),
level=row[4],
current_content=row[5] or "",
current_version_id=row[6] or "",
status=ContentStatus(row[7]),
created_at=datetime.fromisoformat(row[8]),
updated_at=datetime.fromisoformat(row[9]),
chat_configuration=ChatConfiguration.from_dict(chat_config_data) if chat_config_data else ChatConfiguration(),
chat_messages=[ChatMessage.from_dict(msg) for msg in chat_messages_data],
chat_session_id=chat_session_id,
analysis_session_id=analysis_session_id,
last_references=last_refs_data,
tags=tags_data,
metadata=metadata_data
)
def _row_to_version(self, row) -> TextSectionVersion:
"""Convert database row to TextSectionVersion object"""
return TextSectionVersion(
version_id=row[0],
content=row[2],
timestamp=datetime.fromisoformat(row[3]),
author=row[4],
change_summary=row[5],
generated_by_ai=bool(row[6])
)
def _row_to_document(self, row) -> Document:
"""Convert database row to Document object"""
# Load sections and sort them by position to maintain document order
sections_data = json.loads(row[4])
sections = [DocumentSection.from_dict(s) for s in sections_data]
sections.sort(key=lambda s: s.position) # Sort by position to maintain document order
return Document(
id=row[0],
owner=row[1],
title=row[2],
description=row[3],
sections=sections,
created_at=datetime.fromisoformat(row[5]),
updated_at=datetime.fromisoformat(row[6]),
tags=json.loads(row[7]),
metadata=json.loads(row[8]),
shared_with=json.loads(row[9])
)
def _row_to_chat_session(self, row) -> ChatSession:
"""Convert database row to ChatSession object"""
try:
config_data = json.loads(row[6]) if row[6] else {}
config = ChatConfiguration.from_dict(config_data) if config_data else ChatConfiguration()
return ChatSession(
id=row[0],
section_id=row[1],
document_id=row[2],
messages=json.loads(row[3]) if row[3] else [],
context_documents=json.loads(row[4]) if row[4] else [],
references=json.loads(row[5]) if row[5] else [],
config=config,
created_at=datetime.fromisoformat(row[7]),
updated_at=datetime.fromisoformat(row[8])
)
except Exception as e:
print(f"Error converting row to chat session: {e}")
return None
def verify_tables(self) -> bool:
"""Verify that all required tables exist"""
try:
conn = self._get_connection()
cursor = conn.cursor()
# Check if all required tables exist
cursor.execute("""
SELECT name FROM sqlite_master
WHERE type='table' AND name IN ('text_sections', 'text_section_versions', 'documents')
""")
tables = [row[0] for row in cursor.fetchall()]
required_tables = ['text_sections', 'text_section_versions', 'documents']
return all(table in tables for table in required_tables)
except Exception as e:
print(f"Error verifying tables: {e}")
return False
def get_all_text_sections(self) -> List[TextSection]:
"""Get all text sections from database"""
try:
conn = self._get_connection()
cursor = conn.cursor()
cursor.execute("SELECT * FROM text_sections")
rows = cursor.fetchall()
return [self._row_to_text_section(row) for row in rows]
except Exception as e:
print(f"Error getting all text sections: {e}")
return []
def get_all_documents(self) -> List[Document]:
"""Get all documents from database"""
try:
conn = self._get_connection()
cursor = conn.cursor()
cursor.execute("SELECT * FROM documents")
rows = cursor.fetchall()
return [self._row_to_document(row) for row in rows]
except Exception as e:
print(f"Error getting all documents: {e}")
return []
Parameters
| Name | Type | Default | Kind |
|---|---|---|---|
bases |
- | - |
Parameter Details
bases: Parameter of type
Return Value
Returns unspecified type
Class Interface
Methods
__init__(self, db_path)
Purpose: Internal method: init
Parameters:
db_path: Type: str
Returns: None
_get_connection(self)
Purpose: Get database connection
Returns: None
init_database(self)
Purpose: Initialize database schema
Returns: None
_migrate_text_sections_schema(self, cursor)
Purpose: Migrate text_sections table to include new columns if they don't exist
Parameters:
cursor: Parameter
Returns: None
_migrate_documents_schema(self, cursor)
Purpose: Migrate documents table to include versioning columns if they don't exist
Parameters:
cursor: Parameter
Returns: None
_migrate_document_sections_schema(self, cursor)
Purpose: Migrate document_sections table to new schema with section_id and section_type
Parameters:
cursor: Parameter
Returns: None
save_text_section(self, text_section) -> bool
Purpose: Save or update a text section
Parameters:
text_section: Type: TextSection
Returns: Returns bool
get_text_section(self, section_id) -> Optional[TextSection]
Purpose: Retrieve a text section by ID
Parameters:
section_id: Type: str
Returns: Returns Optional[TextSection]
get_user_text_sections(self, owner) -> List[TextSection]
Purpose: Get all text sections for a user
Parameters:
owner: Type: str
Returns: Returns List[TextSection]
save_text_section_version(self, version, text_section_id) -> bool
Purpose: Save a version of text section content
Parameters:
version: Type: TextSectionVersiontext_section_id: Type: str
Returns: Returns bool
get_text_section_versions(self, text_section_id) -> List[TextSectionVersion]
Purpose: Get all versions for a text section
Parameters:
text_section_id: Type: str
Returns: Returns List[TextSectionVersion]
get_documents_containing_section(self, text_section_id) -> List[Document]
Purpose: Get all documents that contain a specific text section
Parameters:
text_section_id: Type: str
Returns: Returns List[Document]
save_data_section(self, data_section) -> bool
Purpose: Save or update a data section
Parameters:
data_section: Type: DataSection
Returns: Returns bool
get_data_section(self, section_id) -> Optional[DataSection]
Purpose: Retrieve a data section by ID
Parameters:
section_id: Type: str
Returns: Returns Optional[DataSection]
get_user_data_sections(self, owner) -> List[DataSection]
Purpose: Get all data sections for a user
Parameters:
owner: Type: str
Returns: Returns List[DataSection]
_row_to_data_section(self, row) -> DataSection
Purpose: Convert database row to DataSection object
Parameters:
row: Parameter
Returns: Returns DataSection
save_document(self, document) -> bool
Purpose: Save or update a document
Parameters:
document: Type: Document
Returns: Returns bool
get_document(self, document_id) -> Optional[Document]
Purpose: Retrieve a document by ID
Parameters:
document_id: Type: str
Returns: Returns Optional[Document]
get_user_documents(self, owner) -> List[Document]
Purpose: Get all documents for a user
Parameters:
owner: Type: str
Returns: Returns List[Document]
delete_text_section(self, section_id) -> bool
Purpose: Delete a text section and its versions, including all document references
Parameters:
section_id: Type: str
Returns: Returns bool
delete_data_section(self, section_id) -> bool
Purpose: Delete a data section and its references from documents
Parameters:
section_id: Type: str
Returns: Returns bool
save_chat_session(self, chat_session) -> bool
Purpose: Save or update a chat session
Parameters:
chat_session: Type: ChatSession
Returns: Returns bool
get_chat_session(self, session_id) -> Optional[ChatSession]
Purpose: Retrieve a chat session by ID
Parameters:
session_id: Type: str
Returns: Returns Optional[ChatSession]
get_chat_session_by_section(self, section_id) -> Optional[ChatSession]
Purpose: Retrieve a chat session by section ID
Parameters:
section_id: Type: str
Returns: Returns Optional[ChatSession]
delete_document(self, document_id) -> bool
Purpose: Delete a document and its associated document sections
Parameters:
document_id: Type: str
Returns: Returns bool
save_document_version(self, version) -> bool
Purpose: Save a document version
Parameters:
version: Type: DocumentVersion
Returns: Returns bool
save_document_section(self, doc_section, document_id) -> bool
Purpose: Save a document section reference to the document_sections table
Parameters:
doc_section: Type: DocumentSectiondocument_id: Type: str
Returns: Returns bool
sync_document_sections_to_table(self) -> bool
Purpose: Sync existing JSON document sections to the document_sections table
Returns: Returns bool
get_document_versions(self, document_id) -> List[Dict]
Purpose: Get all versions for a document
Parameters:
document_id: Type: str
Returns: Returns List[Dict]
_row_to_text_section(self, row) -> TextSection
Purpose: Convert database row to TextSection object
Parameters:
row: Parameter
Returns: Returns TextSection
_row_to_version(self, row) -> TextSectionVersion
Purpose: Convert database row to TextSectionVersion object
Parameters:
row: Parameter
Returns: Returns TextSectionVersion
_row_to_document(self, row) -> Document
Purpose: Convert database row to Document object
Parameters:
row: Parameter
Returns: Returns Document
_row_to_chat_session(self, row) -> ChatSession
Purpose: Convert database row to ChatSession object
Parameters:
row: Parameter
Returns: Returns ChatSession
verify_tables(self) -> bool
Purpose: Verify that all required tables exist
Returns: Returns bool
get_all_text_sections(self) -> List[TextSection]
Purpose: Get all text sections from database
Returns: Returns List[TextSection]
get_all_documents(self) -> List[Document]
Purpose: Get all documents from database
Returns: Returns List[Document]
Required Imports
import uuid
import json
from datetime import datetime
from typing import List
from typing import Dict
Usage Example
# Example usage:
# result = DatabaseManager(bases)
Tags
Similar Components
AI-powered semantic similarity - components with related functionality:
-
class DatabaseManager 73.9% similar
-
class ManualRelationshipManager 52.9% similar
-
class Neo4jManager 46.7% similar
-
class SessionManager 45.8% similar