🔍 Code Extractor

class DatabaseManager_v1

Maturity: 25

SQLite database manager for persistent storage

File:
/tf/active/vicechatdev/vice_ai/models.py
Lines:
603 - 1684
Complexity:
moderate

Purpose

SQLite database manager for persistent storage

Source Code

class DatabaseManager:
    """SQLite database manager for persistent storage"""
    
    def __init__(self, db_path: str = '/tf/active/vice_ai/complex_documents.db'):
        self.db_path = db_path
        self.conn = None
        self.init_database()
    
    def _get_connection(self):
        """Get database connection"""
        if self.conn is None:
            self.conn = sqlite3.connect(self.db_path, check_same_thread=False)
        return self.conn
    
    def init_database(self):
        """Initialize database schema"""
        conn = self._get_connection()
        cursor = conn.cursor()
        
        # TextSections table
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS text_sections (
                id TEXT PRIMARY KEY,
                owner TEXT NOT NULL,
                title TEXT NOT NULL,
                section_type TEXT NOT NULL,
                level INTEGER DEFAULT 1,
                current_content TEXT DEFAULT '',
                current_version_id TEXT,
                status TEXT DEFAULT 'draft',
                created_at TIMESTAMP,
                updated_at TIMESTAMP,
                chat_configuration TEXT,  -- JSON
                chat_messages TEXT,       -- JSON
                chat_session_id TEXT,     -- Link to ChatSession
                last_references TEXT,     -- JSON
                tags TEXT,               -- JSON
                metadata TEXT            -- JSON
            )
        ''')
        
        # TextSection versions table
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS text_section_versions (
                version_id TEXT PRIMARY KEY,
                text_section_id TEXT NOT NULL,
                content TEXT NOT NULL,
                timestamp TIMESTAMP,
                author TEXT NOT NULL,
                change_summary TEXT,
                generated_by_ai BOOLEAN DEFAULT 0,
                FOREIGN KEY (text_section_id) REFERENCES text_sections (id)
            )
        ''')
        
        # DataSections table
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS data_sections (
                id TEXT PRIMARY KEY,
                owner TEXT NOT NULL,
                title TEXT NOT NULL,
                description TEXT DEFAULT '',
                current_content TEXT DEFAULT '',
                status TEXT DEFAULT 'draft',
                created_at TIMESTAMP,
                updated_at TIMESTAMP,
                analysis_session_id TEXT,
                dataset_info TEXT,          -- JSON
                generated_plots TEXT,       -- JSON array of plot paths
                analysis_conclusions TEXT,
                tags TEXT,                  -- JSON
                metadata TEXT               -- JSON
            )
        ''')
        
        # Documents table
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS documents (
                id TEXT PRIMARY KEY,
                owner TEXT NOT NULL,
                title TEXT NOT NULL,
                description TEXT DEFAULT '',
                sections TEXT,           -- JSON
                current_version_id TEXT,
                version_number INTEGER DEFAULT 1,
                created_at TIMESTAMP,
                updated_at TIMESTAMP,
                tags TEXT,              -- JSON
                metadata TEXT,          -- JSON
                shared_with TEXT        -- JSON
            )
        ''')
        
        # Document versions table
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS document_versions (
                version_id TEXT PRIMARY KEY,
                document_id TEXT NOT NULL,
                title TEXT NOT NULL,
                description TEXT DEFAULT '',
                sections TEXT,          -- JSON snapshot of sections at this version
                timestamp TIMESTAMP,
                author TEXT NOT NULL,
                change_summary TEXT,
                version_number INTEGER DEFAULT 1,
                FOREIGN KEY (document_id) REFERENCES documents (id)
            )
        ''')
        
        # DocumentSections table (linking table for document-section relationships)
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS document_sections (
                id TEXT PRIMARY KEY,
                document_id TEXT NOT NULL,
                section_id TEXT NOT NULL,
                section_type TEXT NOT NULL DEFAULT 'TEXT',
                position INTEGER NOT NULL,
                parent_id TEXT,         -- For hierarchical structure
                is_copy BOOLEAN DEFAULT 0,
                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                FOREIGN KEY (document_id) REFERENCES documents (id),
                FOREIGN KEY (parent_id) REFERENCES document_sections (id)
            )
        ''')
        
        # ChatSessions table
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS chat_sessions (
                id TEXT PRIMARY KEY,
                section_id TEXT NOT NULL,
                document_id TEXT NOT NULL,
                messages TEXT,           -- JSON
                context_documents TEXT,  -- JSON
                chat_references TEXT,    -- JSON (renamed from references)
                config TEXT,             -- JSON
                created_at TIMESTAMP,
                updated_at TIMESTAMP,
                FOREIGN KEY (section_id) REFERENCES text_sections (id),
                FOREIGN KEY (document_id) REFERENCES documents (id)
            )
        ''')
        
        # Create indexes
        cursor.execute('CREATE INDEX IF NOT EXISTS idx_text_sections_owner ON text_sections (owner)')
        cursor.execute('CREATE INDEX IF NOT EXISTS idx_documents_owner ON documents (owner)')
        cursor.execute('CREATE INDEX IF NOT EXISTS idx_versions_section ON text_section_versions (text_section_id)')
        cursor.execute('CREATE INDEX IF NOT EXISTS idx_document_versions_doc ON document_versions (document_id)')
        cursor.execute('CREATE INDEX IF NOT EXISTS idx_document_versions_timestamp ON document_versions (document_id, timestamp)')
        cursor.execute('CREATE INDEX IF NOT EXISTS idx_document_sections_doc ON document_sections (document_id)')
        cursor.execute('CREATE INDEX IF NOT EXISTS idx_document_sections_section ON document_sections (section_id)')
        cursor.execute('CREATE INDEX IF NOT EXISTS idx_document_sections_position ON document_sections (document_id, position)')
        cursor.execute('CREATE INDEX IF NOT EXISTS idx_chat_sessions_section ON chat_sessions (section_id)')
        cursor.execute('CREATE INDEX IF NOT EXISTS idx_chat_sessions_document ON chat_sessions (document_id)')
        
        # Migrate existing document_sections table to new schema
        self._migrate_document_sections_schema(cursor)
        # Migrate existing schema if needed
        self._migrate_text_sections_schema(cursor)
        self._migrate_documents_schema(cursor)
        
        conn.commit()
    
    def _migrate_text_sections_schema(self, cursor):
        """Migrate text_sections table to include new columns if they don't exist"""
        try:
            # Get current column information
            cursor.execute("PRAGMA table_info(text_sections)")
            columns = [row[1] for row in cursor.fetchall()]
            
            # Add missing columns
            if 'chat_session_id' not in columns:
                cursor.execute('ALTER TABLE text_sections ADD COLUMN chat_session_id TEXT')
                print("Added chat_session_id column to text_sections")
            
            if 'analysis_session_id' not in columns:
                cursor.execute('ALTER TABLE text_sections ADD COLUMN analysis_session_id TEXT')
                print("Added analysis_session_id column to text_sections")
            
            if 'last_references' not in columns:
                cursor.execute('ALTER TABLE text_sections ADD COLUMN last_references TEXT')
                print("Added last_references column to text_sections")
            
            if 'tags' not in columns:
                cursor.execute('ALTER TABLE text_sections ADD COLUMN tags TEXT')
                print("Added tags column to text_sections")
            
            if 'metadata' not in columns:
                cursor.execute('ALTER TABLE text_sections ADD COLUMN metadata TEXT')
                print("Added metadata column to text_sections")
                
        except Exception as e:
            print(f"Warning: Schema migration failed: {e}")
    
    def _migrate_documents_schema(self, cursor):
        """Migrate documents table to include versioning columns if they don't exist"""
        try:
            # Get current column information
            cursor.execute("PRAGMA table_info(documents)")
            columns = [row[1] for row in cursor.fetchall()]
            
            # Add missing columns
            if 'current_version_id' not in columns:
                cursor.execute('ALTER TABLE documents ADD COLUMN current_version_id TEXT')
                print("Added current_version_id column to documents")
            
            if 'version_number' not in columns:
                cursor.execute('ALTER TABLE documents ADD COLUMN version_number INTEGER DEFAULT 1')
                print("Added version_number column to documents")
                
        except Exception as e:
            print(f"Warning: Documents schema migration failed: {e}")
    
    def _migrate_document_sections_schema(self, cursor):
        """Migrate document_sections table to new schema with section_id and section_type"""
        try:
            # Get current column information
            cursor.execute("PRAGMA table_info(document_sections)")
            columns = [row[1] for row in cursor.fetchall()]
            
            # Check if we need to migrate from old schema
            if 'text_section_id' in columns and 'section_id' not in columns:
                print("Migrating document_sections table to new schema...")
                
                # Create new table with updated schema
                cursor.execute('''
                    CREATE TABLE document_sections_new (
                        id TEXT PRIMARY KEY,
                        document_id TEXT NOT NULL,
                        section_id TEXT NOT NULL,
                        section_type TEXT NOT NULL DEFAULT 'TEXT',
                        position INTEGER NOT NULL,
                        parent_id TEXT,
                        is_copy BOOLEAN DEFAULT FALSE,
                        created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                        FOREIGN KEY (document_id) REFERENCES documents (id)
                    )
                ''')
                
                # Migrate existing data
                cursor.execute('''
                    INSERT INTO document_sections_new 
                    (id, document_id, section_id, section_type, position, parent_id, is_copy, created_at)
                    SELECT id, document_id, text_section_id, 'TEXT', position, parent_id, is_copy, created_at
                    FROM document_sections
                ''')
                
                # Drop old table and rename new one
                cursor.execute('DROP TABLE document_sections')
                cursor.execute('ALTER TABLE document_sections_new RENAME TO document_sections')
                
                print("Document sections table migration completed")
                
        except Exception as e:
            print(f"Warning: Document sections schema migration failed: {e}")
    
    def save_text_section(self, text_section: TextSection) -> bool:
        """Save or update a text section"""
        try:
            conn = self._get_connection()
            cursor = conn.cursor()
            
            cursor.execute('''
                INSERT OR REPLACE INTO text_sections (
                    id, owner, title, section_type, level, current_content, 
                    current_version_id, status, created_at, updated_at,
                    chat_configuration, chat_messages, chat_session_id, analysis_session_id, 
                    last_references, tags, metadata
                ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
            ''', (
                text_section.id,
                text_section.owner,
                text_section.title,
                text_section.section_type.value,
                text_section.level,
                text_section.current_content,
                text_section.current_version_id,
                text_section.status.value,
                text_section.created_at.isoformat(),
                text_section.updated_at.isoformat(),
                json.dumps(text_section.chat_configuration.to_dict()),
                json.dumps([msg.to_dict() for msg in text_section.chat_messages]),
                text_section.chat_session_id,
                text_section.analysis_session_id,
                json.dumps(text_section.last_references),
                json.dumps(text_section.tags),
                json.dumps(text_section.metadata)
            ))
            
            conn.commit()
            return True
        except Exception as e:
            print(f"Error saving text section: {e}")
            return False
    
    def get_text_section(self, section_id: str) -> Optional[TextSection]:
        """Retrieve a text section by ID"""
        try:
            with sqlite3.connect(self.db_path) as conn:
                cursor = conn.cursor()
                cursor.execute('SELECT * FROM text_sections WHERE id = ?', (section_id,))
                row = cursor.fetchone()
                
                if row:
                    return self._row_to_text_section(row)
                return None
        except Exception as e:
            print(f"Error retrieving text section: {e}")
            return None
    
    def get_user_text_sections(self, owner: str) -> List[TextSection]:
        """Get all text sections for a user"""
        try:
            with sqlite3.connect(self.db_path) as conn:
                cursor = conn.cursor()
                cursor.execute('SELECT * FROM text_sections WHERE owner = ? ORDER BY updated_at DESC', (owner,))
                rows = cursor.fetchall()
                
                text_sections = []
                for i, row in enumerate(rows):
                    try:
                        section = self._row_to_text_section(row)
                        text_sections.append(section)
                    except Exception as e:
                        print(f"ERROR: Failed to process text section {row[0]} ({row[2]}): {e}")
                        continue
                
                return text_sections
        except Exception as e:
            print(f"Error retrieving user text sections: {e}")
            return []
    
    def save_text_section_version(self, version: TextSectionVersion, text_section_id: str) -> bool:
        """Save a version of text section content"""
        try:
            with sqlite3.connect(self.db_path) as conn:
                cursor = conn.cursor()
                
                cursor.execute('''
                    INSERT INTO text_section_versions (
                        version_id, text_section_id, content, timestamp, author, 
                        change_summary, generated_by_ai
                    ) VALUES (?, ?, ?, ?, ?, ?, ?)
                ''', (
                    version.version_id,
                    text_section_id,
                    version.content,
                    version.timestamp.isoformat(),
                    version.author,
                    version.change_summary,
                    version.generated_by_ai
                ))
                
                conn.commit()
                return True
        except Exception as e:
            print(f"Error saving text section version: {e}")
            return False
    
    def get_text_section_versions(self, text_section_id: str) -> List[TextSectionVersion]:
        """Get all versions for a text section"""
        try:
            with sqlite3.connect(self.db_path) as conn:
                cursor = conn.cursor()
                cursor.execute('''
                    SELECT * FROM text_section_versions 
                    WHERE text_section_id = ? 
                    ORDER BY timestamp DESC
                ''', (text_section_id,))
                rows = cursor.fetchall()
                
                return [self._row_to_version(row) for row in rows]
        except Exception as e:
            print(f"Error retrieving text section versions: {e}")
            return []
    
    def get_documents_containing_section(self, text_section_id: str) -> List[Document]:
        """Get all documents that contain a specific text section"""
        try:
            with sqlite3.connect(self.db_path) as conn:
                cursor = conn.cursor()
                cursor.execute('''
                    SELECT DISTINCT d.* FROM documents d
                    JOIN document_sections ds ON d.id = ds.document_id
                    WHERE ds.text_section_id = ?
                    ORDER BY d.updated_at DESC
                ''', (text_section_id,))
                rows = cursor.fetchall()
                
                documents = []
                for row in rows:
                    try:
                        document = self._row_to_document(row)
                        documents.append(document)
                    except Exception as e:
                        print(f"ERROR: Failed to process document {row[0]}: {e}")
                        continue
                
                return documents
        except Exception as e:
            print(f"Error retrieving documents containing section: {e}")
            return []
    
    # ============================================================================
    # DATA SECTION METHODS
    # ============================================================================
    
    def save_data_section(self, data_section: DataSection) -> bool:
        """Save or update a data section"""
        try:
            conn = self._get_connection()
            cursor = conn.cursor()
            
            cursor.execute('''
                INSERT OR REPLACE INTO data_sections (
                    id, owner, title, description, current_content, status,
                    created_at, updated_at, analysis_session_id, dataset_info,
                    generated_plots, analysis_conclusions, tags, metadata
                ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
            ''', (
                data_section.id,
                data_section.owner,
                data_section.title,
                data_section.description,
                data_section.current_content,
                data_section.status.value,
                data_section.created_at.isoformat(),
                data_section.updated_at.isoformat(),
                data_section.analysis_session_id,
                json.dumps(data_section.dataset_info),
                json.dumps(data_section.generated_plots),
                data_section.analysis_conclusions,
                json.dumps(data_section.tags),
                json.dumps(data_section.metadata)
            ))
            
            conn.commit()
            return True
        except Exception as e:
            print(f"Error saving data section: {e}")
            return False
    
    def get_data_section(self, section_id: str) -> Optional[DataSection]:
        """Retrieve a data section by ID"""
        try:
            with sqlite3.connect(self.db_path) as conn:
                cursor = conn.cursor()
                cursor.execute('SELECT * FROM data_sections WHERE id = ?', (section_id,))
                row = cursor.fetchone()
                
                if row:
                    return self._row_to_data_section(row)
                return None
        except Exception as e:
            print(f"Error retrieving data section: {e}")
            return None
    
    def get_user_data_sections(self, owner: str) -> List[DataSection]:
        """Get all data sections for a user"""
        try:
            with sqlite3.connect(self.db_path) as conn:
                cursor = conn.cursor()
                cursor.execute('''
                    SELECT * FROM data_sections 
                    WHERE owner = ? 
                    ORDER BY updated_at DESC
                ''', (owner,))
                rows = cursor.fetchall()
                return [self._row_to_data_section(row) for row in rows]
        except Exception as e:
            print(f"Error getting user data sections: {e}")
            return []
    
    def _row_to_data_section(self, row) -> DataSection:
        """Convert database row to DataSection object"""
        try:
            # Handle potentially NULL JSON fields
            dataset_info = {}
            if row[9]:  # dataset_info column
                try:
                    dataset_info = json.loads(row[9])
                except json.JSONDecodeError:
                    dataset_info = {}
            
            generated_plots = []
            if row[10]:  # generated_plots column
                try:
                    generated_plots = json.loads(row[10])
                except json.JSONDecodeError:
                    generated_plots = []
            
            tags = []
            if row[12]:  # tags column
                try:
                    tags = json.loads(row[12])
                except json.JSONDecodeError:
                    tags = []
            
            metadata = {}
            if row[13]:  # metadata column
                try:
                    metadata = json.loads(row[13])
                except json.JSONDecodeError:
                    metadata = {}
        
        except Exception as e:
            # Set all to safe defaults if there's any unexpected error
            dataset_info = {}
            generated_plots = []
            tags = []
            metadata = {}

        return DataSection(
            id=row[0],
            owner=row[1],
            title=row[2],
            description=row[3] or "",
            current_content=row[4] or "",
            status=ContentStatus(row[5]),
            created_at=datetime.fromisoformat(row[6]),
            updated_at=datetime.fromisoformat(row[7]),
            analysis_session_id=row[8],
            dataset_info=dataset_info,
            generated_plots=generated_plots,
            analysis_conclusions=row[11] or "",
            tags=tags,
            metadata=metadata
        )

    def save_document(self, document: Document) -> bool:
        """Save or update a document"""
        try:
            with sqlite3.connect(self.db_path) as conn:
                cursor = conn.cursor()
                
                cursor.execute('''
                    INSERT OR REPLACE INTO documents (
                        id, owner, title, description, sections, created_at, updated_at,
                        tags, metadata, shared_with
                    ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
                ''', (
                    document.id,
                    document.owner,
                    document.title,
                    document.description,
                    json.dumps([section.to_dict() for section in document.sections]),
                    document.created_at.isoformat(),
                    document.updated_at.isoformat(),
                    json.dumps(document.tags),
                    json.dumps(document.metadata),
                    json.dumps(document.shared_with)
                ))
                
                conn.commit()
                return True
        except Exception as e:
            print(f"Error saving document: {e}")
            return False
    
    def get_document(self, document_id: str) -> Optional[Document]:
        """Retrieve a document by ID"""
        try:
            with sqlite3.connect(self.db_path) as conn:
                cursor = conn.cursor()
                cursor.execute('SELECT * FROM documents WHERE id = ?', (document_id,))
                row = cursor.fetchone()
                
                if row:
                    return self._row_to_document(row)
                return None
        except Exception as e:
            print(f"Error retrieving document: {e}")
            return None
    
    def get_user_documents(self, owner: str) -> List[Document]:
        """Get all documents for a user"""
        try:
            with sqlite3.connect(self.db_path) as conn:
                cursor = conn.cursor()
                cursor.execute('SELECT * FROM documents WHERE owner = ? ORDER BY updated_at DESC', (owner,))
                rows = cursor.fetchall()
                
                return [self._row_to_document(row) for row in rows]
        except Exception as e:
            print(f"Error retrieving user documents: {e}")
            return []
    
    def delete_text_section(self, section_id: str) -> bool:
        """Delete a text section and its versions, including all document references"""
        try:
            with sqlite3.connect(self.db_path) as conn:
                cursor = conn.cursor()
                
                # First, find which documents contain this section and update their sections JSON
                cursor.execute('SELECT document_id FROM document_sections WHERE section_id = ? AND section_type = ?', 
                             (section_id, 'TEXT'))
                affected_docs = [row[0] for row in cursor.fetchall()]
                print(f"📄 Deleting text section {section_id} - found in {len(affected_docs)} documents")
                
                # Delete versions first
                cursor.execute('DELETE FROM text_section_versions WHERE text_section_id = ?', (section_id,))
                print(f"✅ Deleted versions for section {section_id}")
                
                # Delete chat sessions
                cursor.execute('DELETE FROM chat_sessions WHERE section_id = ?', (section_id,))
                print(f"✅ Deleted chat sessions for section {section_id}")
                
                # Delete references from document_sections table
                cursor.execute('DELETE FROM document_sections WHERE section_id = ? AND section_type = ?', 
                             (section_id, 'TEXT'))
                print(f"✅ Deleted document_sections references for section {section_id}")
                
                # Update the sections JSON in each affected document
                for doc_id in affected_docs:
                    cursor.execute('SELECT sections FROM documents WHERE id = ?', (doc_id,))
                    row = cursor.fetchone()
                    if row:
                        sections_data = json.loads(row[0])
                        # Remove the deleted section from the sections list
                        updated_sections = [s for s in sections_data if s.get('section_id') != section_id]
                        # Update the document with cleaned sections list
                        cursor.execute('UPDATE documents SET sections = ?, updated_at = ? WHERE id = ?',
                                     (json.dumps(updated_sections), datetime.now().isoformat(), doc_id))
                print(f"✅ Updated sections JSON in {len(affected_docs)} documents")
                
                # Delete text section
                cursor.execute('DELETE FROM text_sections WHERE id = ?', (section_id,))
                print(f"✅ Deleted text section {section_id}")
                
                conn.commit()
                print(f"✅ Successfully deleted text section {section_id}")
                return True
        except Exception as e:
            import traceback
            print(f"❌ Error deleting text section {section_id}: {e}")
            print(f"❌ Traceback: {traceback.format_exc()}")
            return False
    
    def delete_data_section(self, section_id: str) -> bool:
        """Delete a data section and its references from documents"""
        try:
            with sqlite3.connect(self.db_path) as conn:
                cursor = conn.cursor()
                
                # First, find which documents contain this section and update their sections JSON
                cursor.execute('SELECT document_id FROM document_sections WHERE section_id = ? AND section_type = ?',
                             (section_id, SectionType.DATA.value))
                affected_docs = [row[0] for row in cursor.fetchall()]
                
                # Delete references from document_sections table
                cursor.execute('DELETE FROM document_sections WHERE section_id = ? AND section_type = ?', 
                             (section_id, SectionType.DATA.value))
                
                # Update the sections JSON in each affected document
                for doc_id in affected_docs:
                    cursor.execute('SELECT sections FROM documents WHERE id = ?', (doc_id,))
                    row = cursor.fetchone()
                    if row:
                        sections_data = json.loads(row[0])
                        # Remove the deleted section from the sections list
                        updated_sections = [s for s in sections_data if s.get('section_id') != section_id]
                        # Update the document with cleaned sections list
                        cursor.execute('UPDATE documents SET sections = ?, updated_at = ? WHERE id = ?',
                                     (json.dumps(updated_sections), datetime.now().isoformat(), doc_id))
                
                # Delete the data section
                cursor.execute('DELETE FROM data_sections WHERE id = ?', (section_id,))
                
                conn.commit()
                return True
        except Exception as e:
            print(f"Error deleting data section: {e}")
            return False
    
    def save_chat_session(self, chat_session: ChatSession) -> bool:
        """Save or update a chat session"""
        try:
            conn = self._get_connection()
            cursor = conn.cursor()
            
            cursor.execute('''
                INSERT OR REPLACE INTO chat_sessions (
                    id, section_id, document_id, messages, context_documents,
                    chat_references, config, created_at, updated_at
                ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
            ''', (
                chat_session.id,
                chat_session.section_id,
                chat_session.document_id,
                json.dumps(chat_session.messages),
                json.dumps(chat_session.context_documents),
                json.dumps(chat_session.references),
                json.dumps(chat_session.config.to_dict() if chat_session.config else {}),
                chat_session.created_at.isoformat(),
                chat_session.updated_at.isoformat()
            ))
            
            conn.commit()
            return True
        except Exception as e:
            print(f"Error saving chat session: {e}")
            return False
    
    def get_chat_session(self, session_id: str) -> Optional[ChatSession]:
        """Retrieve a chat session by ID"""
        try:
            with sqlite3.connect(self.db_path) as conn:
                cursor = conn.cursor()
                cursor.execute('SELECT * FROM chat_sessions WHERE id = ?', (session_id,))
                row = cursor.fetchone()
                
                if row:
                    return self._row_to_chat_session(row)
                return None
        except Exception as e:
            print(f"Error retrieving chat session: {e}")
            return None
    
    def get_chat_session_by_section(self, section_id: str) -> Optional[ChatSession]:
        """Retrieve a chat session by section ID"""
        try:
            with sqlite3.connect(self.db_path) as conn:
                cursor = conn.cursor()
                cursor.execute('SELECT * FROM chat_sessions WHERE section_id = ?', (section_id,))
                row = cursor.fetchone()
                
                if row:
                    return self._row_to_chat_session(row)
                return None
        except Exception as e:
            print(f"Error retrieving chat session by section: {e}")
            return None
    
    def delete_document(self, document_id: str) -> bool:
        """Delete a document and its associated document sections"""
        try:
            with sqlite3.connect(self.db_path) as conn:
                cursor = conn.cursor()
                
                # First delete document sections (linking table)
                cursor.execute('DELETE FROM document_sections WHERE document_id = ?', (document_id,))
                
                # Then delete the document itself
                cursor.execute('DELETE FROM documents WHERE id = ?', (document_id,))
                
                conn.commit()
                return True
        except Exception as e:
            print(f"Error deleting document: {e}")
            return False
    
    def save_document_version(self, version: DocumentVersion) -> bool:
        """Save a document version"""
        try:
            with sqlite3.connect(self.db_path) as conn:
                cursor = conn.cursor()
                
                cursor.execute('''
                    INSERT INTO document_versions (
                        version_id, document_id, title, description, sections, 
                        timestamp, author, change_summary, version_number
                    ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
                ''', (
                    version.version_id,
                    version.document_id,
                    version.title,
                    version.description,
                    json.dumps(version.sections),
                    version.timestamp.isoformat(),
                    version.author,
                    version.change_summary,
                    version.version_number
                ))
                
                conn.commit()
                return True
        except Exception as e:
            print(f"Error saving document version: {e}")
            return False
    
    def save_document_section(self, doc_section: DocumentSection, document_id: str) -> bool:
        """Save a document section reference to the document_sections table"""
        try:
            with sqlite3.connect(self.db_path) as conn:
                cursor = conn.cursor()
                
                cursor.execute('''
                    INSERT OR REPLACE INTO document_sections (
                        id, document_id, section_id, section_type, position, parent_id, is_copy, created_at
                    ) VALUES (?, ?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP)
                ''', (
                    doc_section.id,
                    document_id,
                    doc_section.section_id,
                    doc_section.section_type.value,
                    doc_section.position,
                    doc_section.parent_id,
                    doc_section.is_copy
                ))
                
                conn.commit()
                return True
        except Exception as e:
            print(f"Error saving document section: {e}")
            return False

    def sync_document_sections_to_table(self) -> bool:
        """Sync existing JSON document sections to the document_sections table"""
        try:
            with sqlite3.connect(self.db_path) as conn:
                cursor = conn.cursor()
                
                # Clear existing data
                cursor.execute('DELETE FROM document_sections')
                
                # Get all documents with sections
                cursor.execute('SELECT id, sections FROM documents WHERE sections IS NOT NULL AND sections != ""')
                documents = cursor.fetchall()
                
                for doc_id, sections_json in documents:
                    if sections_json:
                        try:
                            sections = json.loads(sections_json)
                            for section_data in sections:
                                # Handle both old and new format
                                section_id = section_data.get('section_id') or section_data.get('text_section_id')
                                section_type = section_data.get('section_type', 'TEXT')
                                
                                if section_id:  # Only sync if we have a valid section_id
                                    cursor.execute('''
                                        INSERT INTO document_sections (
                                            id, document_id, section_id, section_type, position, parent_id, is_copy, created_at
                                        ) VALUES (?, ?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP)
                                    ''', (
                                        section_data.get('id'),
                                        doc_id,
                                        section_id,
                                        section_type,
                                        section_data.get('position', 0),
                                        section_data.get('parent_id'),
                                        section_data.get('is_copy', False)
                                    ))
                        except json.JSONDecodeError as e:
                            print(f"Error parsing sections JSON for document {doc_id}: {e}")
                            continue
                
                conn.commit()
                return True
        except Exception as e:
            print(f"Error syncing document sections: {e}")
            return False
    
    def get_document_versions(self, document_id: str) -> List[Dict]:
        """Get all versions for a document"""
        try:
            with sqlite3.connect(self.db_path) as conn:
                cursor = conn.cursor()
                cursor.execute('''
                    SELECT * FROM document_versions 
                    WHERE document_id = ? 
                    ORDER BY timestamp DESC
                ''', (document_id,))
                rows = cursor.fetchall()
                
                versions = []
                for row in rows:
                    try:
                        version_dict = {
                            'version_id': row[0],
                            'document_id': row[1],
                            'title': row[2],
                            'description': row[3],
                            'sections': json.loads(row[4]) if row[4] else [],
                            'timestamp': row[5],
                            'author': row[6],
                            'change_summary': row[7],
                            'version_number': row[8]
                        }
                        versions.append(version_dict)
                    except Exception as e:
                        print(f"Error processing document version {row[0]}: {e}")
                        continue
                
                return versions
        except Exception as e:
            print(f"Error retrieving document versions: {e}")
            return []
    
    def _row_to_text_section(self, row) -> TextSection:
        """Convert database row to TextSection object"""
        try:
            # Handle potentially NULL JSON fields with graceful fallbacks
            chat_config_data = {}
            if row[10]:
                try:
                    chat_config_data = json.loads(row[10])
                except json.JSONDecodeError:
                    chat_config_data = {}  # Fallback to empty dict
            
            chat_messages_data = []
            if row[11]:
                try:
                    chat_messages_data = json.loads(row[11])
                except json.JSONDecodeError:
                    chat_messages_data = []  # Fallback to empty list
            
            last_refs_data = []
            if row[14]:  # Updated: last_references is now at index 14
                try:
                    last_refs_data = json.loads(row[14])
                except json.JSONDecodeError:
                    last_refs_data = []  # Fallback to empty list

            tags_data = []
            if row[15]:  # Updated: tags is now at index 15
                try:
                    tags_data = json.loads(row[15])
                except json.JSONDecodeError:
                    tags_data = []  # Fallback to empty list

            metadata_data = {}
            if row[16]:  # Updated: metadata is now at index 16
                try:
                    metadata_data = json.loads(row[16])
                except json.JSONDecodeError:
                    # Check if it's a plain UUID string (common corrupted data case)
                    if len(row[16]) == 36 and '-' in row[16]:  # Updated: metadata is now at index 16
                        # Treat as legacy chat_session_id, use empty dict for metadata
                        metadata_data = {}
                    else:
                        metadata_data = {}  # Fallback to empty dict
        except Exception as e:
            # Set all to safe defaults if there's any unexpected error
            chat_config_data = {}
            chat_messages_data = []
            last_refs_data = []
            tags_data = []
            metadata_data = {}
        
        # Handle chat_session_id - ensure it's a string or None
        chat_session_id = row[12]  # chat_session_id is still at index 12
        
        # Handle analysis_session_id - ensure it's a string or None
        analysis_session_id = row[13] if len(row) > 13 else None  # analysis_session_id is at index 13
        
        # Clean up corrupted data
        if chat_session_id is not None:
            if isinstance(chat_session_id, str):
                # Check for corrupted string values
                corrupted_values = ['{}', '[]', 'null', '']
                if chat_session_id in corrupted_values:
                    chat_session_id = None
            else:
                # Invalid type (e.g., serialized object), set to None
                chat_session_id = None

        # Clean up analysis_session_id
        if analysis_session_id is not None:
            if isinstance(analysis_session_id, str):
                # Check for corrupted string values
                corrupted_values = ['{}', '[]', 'null', '']
                if analysis_session_id in corrupted_values:
                    analysis_session_id = None
            else:
                # Invalid type, set to None
                analysis_session_id = None

        return TextSection(
            id=row[0],
            owner=row[1],
            title=row[2],
            section_type=SectionType(row[3]),
            level=row[4],
            current_content=row[5] or "",
            current_version_id=row[6] or "",
            status=ContentStatus(row[7]),
            created_at=datetime.fromisoformat(row[8]),
            updated_at=datetime.fromisoformat(row[9]),
            chat_configuration=ChatConfiguration.from_dict(chat_config_data) if chat_config_data else ChatConfiguration(),
            chat_messages=[ChatMessage.from_dict(msg) for msg in chat_messages_data],
            chat_session_id=chat_session_id,
            analysis_session_id=analysis_session_id,
            last_references=last_refs_data,
            tags=tags_data,
            metadata=metadata_data
        )
    
    def _row_to_version(self, row) -> TextSectionVersion:
        """Convert database row to TextSectionVersion object"""
        return TextSectionVersion(
            version_id=row[0],
            content=row[2],
            timestamp=datetime.fromisoformat(row[3]),
            author=row[4],
            change_summary=row[5],
            generated_by_ai=bool(row[6])
        )
    
    def _row_to_document(self, row) -> Document:
        """Convert database row to Document object"""
        # Load sections and sort them by position to maintain document order
        sections_data = json.loads(row[4])
        sections = [DocumentSection.from_dict(s) for s in sections_data]
        sections.sort(key=lambda s: s.position)  # Sort by position to maintain document order
        
        return Document(
            id=row[0],
            owner=row[1],
            title=row[2],
            description=row[3],
            sections=sections,
            created_at=datetime.fromisoformat(row[5]),
            updated_at=datetime.fromisoformat(row[6]),
            tags=json.loads(row[7]),
            metadata=json.loads(row[8]),
            shared_with=json.loads(row[9])
        )
    
    def _row_to_chat_session(self, row) -> ChatSession:
        """Convert database row to ChatSession object"""
        try:
            config_data = json.loads(row[6]) if row[6] else {}
            config = ChatConfiguration.from_dict(config_data) if config_data else ChatConfiguration()
            
            return ChatSession(
                id=row[0],
                section_id=row[1],
                document_id=row[2],
                messages=json.loads(row[3]) if row[3] else [],
                context_documents=json.loads(row[4]) if row[4] else [],
                references=json.loads(row[5]) if row[5] else [],
                config=config,
                created_at=datetime.fromisoformat(row[7]),
                updated_at=datetime.fromisoformat(row[8])
            )
        except Exception as e:
            print(f"Error converting row to chat session: {e}")
            return None
    
    def verify_tables(self) -> bool:
        """Verify that all required tables exist"""
        try:
            conn = self._get_connection()
            cursor = conn.cursor()
            
            # Check if all required tables exist
            cursor.execute("""
                SELECT name FROM sqlite_master 
                WHERE type='table' AND name IN ('text_sections', 'text_section_versions', 'documents')
            """)
            
            tables = [row[0] for row in cursor.fetchall()]
            required_tables = ['text_sections', 'text_section_versions', 'documents']
            
            return all(table in tables for table in required_tables)
            
        except Exception as e:
            print(f"Error verifying tables: {e}")
            return False
    
    def get_all_text_sections(self) -> List[TextSection]:
        """Get all text sections from database"""
        try:
            conn = self._get_connection()
            cursor = conn.cursor()
            cursor.execute("SELECT * FROM text_sections")
            rows = cursor.fetchall()
            return [self._row_to_text_section(row) for row in rows]
        except Exception as e:
            print(f"Error getting all text sections: {e}")
            return []
    
    def get_all_documents(self) -> List[Document]:
        """Get all documents from database"""
        try:
            conn = self._get_connection()
            cursor = conn.cursor()
            cursor.execute("SELECT * FROM documents")
            rows = cursor.fetchall()
            return [self._row_to_document(row) for row in rows]
        except Exception as e:
            print(f"Error getting all documents: {e}")
            return []

Parameters

Name Type Default Kind
bases - -

Parameter Details

bases: Parameter of type

Return Value

Returns unspecified type

Class Interface

Methods

__init__(self, db_path)

Purpose: Internal method: init

Parameters:

  • db_path: Type: str

Returns: None

_get_connection(self)

Purpose: Get database connection

Returns: None

init_database(self)

Purpose: Initialize database schema

Returns: None

_migrate_text_sections_schema(self, cursor)

Purpose: Migrate text_sections table to include new columns if they don't exist

Parameters:

  • cursor: Parameter

Returns: None

_migrate_documents_schema(self, cursor)

Purpose: Migrate documents table to include versioning columns if they don't exist

Parameters:

  • cursor: Parameter

Returns: None

_migrate_document_sections_schema(self, cursor)

Purpose: Migrate document_sections table to new schema with section_id and section_type

Parameters:

  • cursor: Parameter

Returns: None

save_text_section(self, text_section) -> bool

Purpose: Save or update a text section

Parameters:

  • text_section: Type: TextSection

Returns: Returns bool

get_text_section(self, section_id) -> Optional[TextSection]

Purpose: Retrieve a text section by ID

Parameters:

  • section_id: Type: str

Returns: Returns Optional[TextSection]

get_user_text_sections(self, owner) -> List[TextSection]

Purpose: Get all text sections for a user

Parameters:

  • owner: Type: str

Returns: Returns List[TextSection]

save_text_section_version(self, version, text_section_id) -> bool

Purpose: Save a version of text section content

Parameters:

  • version: Type: TextSectionVersion
  • text_section_id: Type: str

Returns: Returns bool

get_text_section_versions(self, text_section_id) -> List[TextSectionVersion]

Purpose: Get all versions for a text section

Parameters:

  • text_section_id: Type: str

Returns: Returns List[TextSectionVersion]

get_documents_containing_section(self, text_section_id) -> List[Document]

Purpose: Get all documents that contain a specific text section

Parameters:

  • text_section_id: Type: str

Returns: Returns List[Document]

save_data_section(self, data_section) -> bool

Purpose: Save or update a data section

Parameters:

  • data_section: Type: DataSection

Returns: Returns bool

get_data_section(self, section_id) -> Optional[DataSection]

Purpose: Retrieve a data section by ID

Parameters:

  • section_id: Type: str

Returns: Returns Optional[DataSection]

get_user_data_sections(self, owner) -> List[DataSection]

Purpose: Get all data sections for a user

Parameters:

  • owner: Type: str

Returns: Returns List[DataSection]

_row_to_data_section(self, row) -> DataSection

Purpose: Convert database row to DataSection object

Parameters:

  • row: Parameter

Returns: Returns DataSection

save_document(self, document) -> bool

Purpose: Save or update a document

Parameters:

  • document: Type: Document

Returns: Returns bool

get_document(self, document_id) -> Optional[Document]

Purpose: Retrieve a document by ID

Parameters:

  • document_id: Type: str

Returns: Returns Optional[Document]

get_user_documents(self, owner) -> List[Document]

Purpose: Get all documents for a user

Parameters:

  • owner: Type: str

Returns: Returns List[Document]

delete_text_section(self, section_id) -> bool

Purpose: Delete a text section and its versions, including all document references

Parameters:

  • section_id: Type: str

Returns: Returns bool

delete_data_section(self, section_id) -> bool

Purpose: Delete a data section and its references from documents

Parameters:

  • section_id: Type: str

Returns: Returns bool

save_chat_session(self, chat_session) -> bool

Purpose: Save or update a chat session

Parameters:

  • chat_session: Type: ChatSession

Returns: Returns bool

get_chat_session(self, session_id) -> Optional[ChatSession]

Purpose: Retrieve a chat session by ID

Parameters:

  • session_id: Type: str

Returns: Returns Optional[ChatSession]

get_chat_session_by_section(self, section_id) -> Optional[ChatSession]

Purpose: Retrieve a chat session by section ID

Parameters:

  • section_id: Type: str

Returns: Returns Optional[ChatSession]

delete_document(self, document_id) -> bool

Purpose: Delete a document and its associated document sections

Parameters:

  • document_id: Type: str

Returns: Returns bool

save_document_version(self, version) -> bool

Purpose: Save a document version

Parameters:

  • version: Type: DocumentVersion

Returns: Returns bool

save_document_section(self, doc_section, document_id) -> bool

Purpose: Save a document section reference to the document_sections table

Parameters:

  • doc_section: Type: DocumentSection
  • document_id: Type: str

Returns: Returns bool

sync_document_sections_to_table(self) -> bool

Purpose: Sync existing JSON document sections to the document_sections table

Returns: Returns bool

get_document_versions(self, document_id) -> List[Dict]

Purpose: Get all versions for a document

Parameters:

  • document_id: Type: str

Returns: Returns List[Dict]

_row_to_text_section(self, row) -> TextSection

Purpose: Convert database row to TextSection object

Parameters:

  • row: Parameter

Returns: Returns TextSection

_row_to_version(self, row) -> TextSectionVersion

Purpose: Convert database row to TextSectionVersion object

Parameters:

  • row: Parameter

Returns: Returns TextSectionVersion

_row_to_document(self, row) -> Document

Purpose: Convert database row to Document object

Parameters:

  • row: Parameter

Returns: Returns Document

_row_to_chat_session(self, row) -> ChatSession

Purpose: Convert database row to ChatSession object

Parameters:

  • row: Parameter

Returns: Returns ChatSession

verify_tables(self) -> bool

Purpose: Verify that all required tables exist

Returns: Returns bool

get_all_text_sections(self) -> List[TextSection]

Purpose: Get all text sections from database

Returns: Returns List[TextSection]

get_all_documents(self) -> List[Document]

Purpose: Get all documents from database

Returns: Returns List[Document]

Required Imports

import uuid
import json
from datetime import datetime
from typing import List
from typing import Dict

Usage Example

# Example usage:
# result = DatabaseManager(bases)

Related Versions

Other versions of this component:

  • DatabaseManager_v1

    From: /tf/active/vicechatdev/vice_ai/smartstat_models.py | Maturity: N/A

Similar Components

AI-powered semantic similarity - components with related functionality:

  • class DatabaseManager 73.9% similar

    SQLite database manager for SmartStat application that handles persistence of statistical analysis sessions, steps, and results.

    From: /tf/active/vicechatdev/smartstat/models.py
  • class ManualRelationshipManager 52.9% similar

    A class that manages manually defined database relationships with persistent JSON storage, allowing users to add, retrieve, update, and remove relationship definitions between database tables.

    From: /tf/active/vicechatdev/full_smartstat/manual_relationships.py
  • class Neo4jManager 46.7% similar

    A manager class that provides a high-level interface for interacting with Neo4j graph databases, handling connections, queries, node creation, and relationship management.

    From: /tf/active/vicechatdev/QA_updater/knowledge_store/neo4j_manager.py
  • class SessionManager 45.8% similar

    A session management class for Panel applications that provides user authentication and arbitrary key-value storage using Panel's built-in state management system.

    From: /tf/active/vicechatdev/CDocs/auth/session_manager.py
← Back to Browse