class DocumentIndexer
A class for indexing documents into ChromaDB with support for multiple file formats (PDF, Word, PowerPoint, Excel, text files), smart incremental indexing, and document chunk management.
/tf/active/vicechatdev/docchat/document_indexer.py
790 - 1144
complex
Purpose
DocumentIndexer manages the complete lifecycle of document indexing into a ChromaDB vector database. It handles document processing, chunking, embedding generation, and storage with intelligent features like modification-time tracking for incremental updates, duplicate detection, and batch folder indexing. The class integrates with ChromaDB's HNSW index using cosine similarity for efficient semantic search and supports re-indexing modified documents while skipping unchanged ones.
Source Code
class DocumentIndexer:
"""Index documents in ChromaDB"""
def __init__(self, collection_name: str = None, api_key: str = None):
"""
Initialize document indexer
Args:
collection_name: Name of ChromaDB collection
api_key: OpenAI API key
"""
self.collection_name = collection_name or config.CHROMA_COLLECTION_NAME
self.api_key = api_key or config.OPENAI_API_KEY
# Initialize ChromaDB client - External connection to oneco_chroma (same pattern as vice_ai)
logger.info(f"Connecting to ChromaDB at {config.CHROMA_HOST}:{config.CHROMA_PORT}")
self.chroma_client = chromadb.HttpClient(
host=config.CHROMA_HOST,
port=config.CHROMA_PORT
)
# Initialize embedding function
self.embedding_function = DocChatEmbeddingFunction(
api_key=self.api_key,
embed_model_name=config.EMBEDDING_MODEL,
llm_model_name=config.SMALL_LLM_MODEL
)
# Get or create collection with HNSW configuration
# In ChromaDB 1.3+, HNSW with cosine similarity is the default
self.collection = self.chroma_client.get_or_create_collection(
name=self.collection_name,
embedding_function=self.embedding_function,
metadata={"hnsw:space": "cosine", "description": "DocChat document collection"}
)
# Initialize document processor
self.processor = DocumentProcessor()
logger.info(f"Initialized DocumentIndexer with collection: {self.collection_name}")
def is_document_indexed(self, file_path: Path) -> Optional[Dict[str, Any]]:
"""
Check if document is already indexed and if it needs re-indexing
Args:
file_path: Path to document file
Returns:
Dictionary with doc_id and indexed_mtime if document is indexed, None otherwise
"""
try:
# Query for this file path
results = self.collection.get(
where={"file_path": str(file_path)},
limit=1
)
if not results['ids']:
return None
# Get metadata from first chunk
metadata = results['metadatas'][0]
doc_id = metadata.get('doc_id')
indexed_mtime = metadata.get('file_mtime')
# Get current file modification time
current_mtime = os.path.getmtime(file_path)
# Check if file has been modified since indexing
if indexed_mtime and float(indexed_mtime) >= current_mtime:
return {
'doc_id': doc_id,
'indexed_mtime': indexed_mtime,
'needs_reindex': False
}
else:
return {
'doc_id': doc_id,
'indexed_mtime': indexed_mtime,
'needs_reindex': True
}
except Exception as e:
logger.warning(f"Error checking if document is indexed: {e}")
return None
def index_document(self, file_path: Path, force_reindex: bool = False) -> Dict[str, Any]:
"""
Index a single document with smart incremental indexing
Args:
file_path: Path to document file
force_reindex: If True, re-index even if already indexed
Returns:
Dictionary with indexing results
"""
try:
# Check if document is already indexed
existing = self.is_document_indexed(file_path)
if existing and not force_reindex:
if not existing['needs_reindex']:
logger.info(f"Document already indexed and up-to-date: {file_path.name}")
return {
'success': True,
'skipped': True,
'doc_id': existing['doc_id'],
'file_name': file_path.name,
'reason': 'already_indexed'
}
else:
# File has been modified, delete old version first
logger.info(f"Document modified, re-indexing: {file_path.name}")
self.delete_document(existing['doc_id'])
logger.info(f"Indexing document: {file_path}")
# Get file modification time
file_mtime = os.path.getmtime(file_path)
# Process document
result = self.processor.process_document(file_path)
if not result.get('success'):
error_msg = result.get('error', 'Unknown error')
logger.warning(f"Failed to index {file_path.name}: {error_msg}")
return result
chunks = result.get('chunks', [])
if not chunks:
logger.warning(f"No content extracted from {file_path.name} - likely scanned PDF without text layer")
return {
'success': False,
'error': 'No content extracted - possibly scanned/image PDF',
'file_name': file_path.name
}
# Prepare data for ChromaDB
documents = []
metadatas = []
ids = []
doc_id = str(uuid4())
for i, chunk in enumerate(chunks):
chunk_id = f"{doc_id}_{i}"
documents.append(chunk['text'])
metadatas.append({
**chunk['metadata'],
'chunk_index': i,
'doc_id': doc_id,
'chunk_type': chunk['type'],
'file_mtime': str(file_mtime) # Store modification time
})
ids.append(chunk_id)
# Add to collection
try:
self.collection.add(
documents=documents,
metadatas=metadatas,
ids=ids
)
logger.info(f"Successfully indexed {len(chunks)} chunks from {file_path.name}")
return {
'success': True,
'doc_id': doc_id,
'num_chunks': len(chunks),
'file_name': file_path.name,
'reindexed': existing is not None
}
except Exception as e:
logger.error(f"Failed to add to ChromaDB: {e}")
try:
import traceback
logger.error(f"Traceback: {traceback.format_exc()}")
except:
logger.error(f"Could not format traceback")
return {'success': False, 'error': str(e)}
except Exception as e:
# Catch ANY exception to prevent background thread crash
logger.error(f"CRITICAL: Unhandled exception while indexing {file_path.name}: {e}")
try:
import traceback
logger.error(f"Traceback: {traceback.format_exc()}")
except:
logger.error(f"Could not format traceback")
return {
'success': False,
'error': f'Unhandled exception: {str(e)}',
'file_name': file_path.name
}
def index_folder(self, folder_path: Path, recursive: bool = True, force_reindex: bool = False,
progress_callback=None) -> Dict[str, Any]:
"""
Index all documents in a folder with smart incremental indexing
Args:
folder_path: Path to folder
recursive: Whether to recurse into subdirectories
force_reindex: If True, re-index all documents even if already indexed
progress_callback: Optional callback function(current, total, filename) for progress updates
Returns:
Dictionary with indexing results
"""
logger.info(f"Indexing folder: {folder_path} (force_reindex={force_reindex})")
results = {
'success': 0,
'failed': 0,
'skipped': 0,
'reindexed': 0,
'total': 0,
'details': []
}
# Get all supported files
pattern = '**/*' if recursive else '*'
# First, collect all files to get total count
files_to_process = []
for file_path in folder_path.glob(pattern):
if file_path.is_file():
ext = file_path.suffix.lower()
# Check if supported extension
all_extensions = (
self.processor.PDF_EXTENSIONS +
self.processor.WORD_EXTENSIONS +
self.processor.PPT_EXTENSIONS +
self.processor.EXCEL_EXTENSIONS +
self.processor.TEXT_EXTENSIONS
)
if ext in all_extensions:
files_to_process.append(file_path)
total_files = len(files_to_process)
results['total'] = total_files
# Process files with progress updates
for idx, file_path in enumerate(files_to_process, 1):
# Call progress callback if provided
if progress_callback:
progress_callback(idx, total_files, file_path.name)
result = self.index_document(file_path, force_reindex=force_reindex)
if result.get('success'):
if result.get('skipped'):
results['skipped'] += 1
else:
results['success'] += 1
if result.get('reindexed'):
results['reindexed'] += 1
else:
results['failed'] += 1
results['details'].append({
'file': str(file_path),
'result': result
})
# Log summary with details about failed files
logger.info(
f"Folder indexing complete. "
f"New: {results['success'] - results['reindexed']}, "
f"Re-indexed: {results['reindexed']}, "
f"Skipped: {results['skipped']}, "
f"Failed: {results['failed']}"
)
# Log failed files for review
if results['failed'] > 0:
logger.warning(f"=== {results['failed']} files could not be indexed ===")
for detail in results['details']:
if not detail['result'].get('success'):
error = detail['result'].get('error', 'Unknown error')
filename = Path(detail['file']).name
logger.warning(f" - {filename}: {error}")
logger.warning("Note: Scanned PDFs without text layer require OCR processing")
return results
def get_collection_stats(self) -> Dict[str, Any]:
"""Get statistics about the collection"""
count = self.collection.count()
# Get unique documents
if count > 0:
results = self.collection.get()
doc_ids = set(meta.get('doc_id') for meta in results['metadatas'])
file_names = set(meta.get('file_name') for meta in results['metadatas'])
else:
doc_ids = set()
file_names = set()
return {
'total_chunks': count,
'total_documents': len(doc_ids),
'file_names': list(file_names)
}
def delete_document(self, doc_id: str) -> bool:
"""
Delete a document and all its chunks
Args:
doc_id: Document ID
Returns:
True if successful
"""
try:
# Get all chunk IDs for this document
results = self.collection.get(
where={"doc_id": doc_id}
)
if results['ids']:
self.collection.delete(ids=results['ids'])
logger.info(f"Deleted document {doc_id} ({len(results['ids'])} chunks)")
return True
else:
logger.warning(f"No chunks found for document {doc_id}")
return False
except Exception as e:
logger.error(f"Failed to delete document {doc_id}: {e}")
return False
def clear_collection(self) -> bool:
"""Clear all documents from collection"""
try:
# Delete and recreate collection
self.chroma_client.delete_collection(name=self.collection_name)
# In ChromaDB 1.3+, configuration is a dictionary
self.collection = self.chroma_client.create_collection(
name=self.collection_name,
embedding_function=self.embedding_function,
configuration={"hnsw": {"space": "cosine"}},
metadata={"description": "DocChat document collection"}
)
logger.info("Collection cleared")
return True
except Exception as e:
logger.error(f"Failed to clear collection: {e}")
return False
Parameters
| Name | Type | Default | Kind |
|---|---|---|---|
bases |
- | - |
Parameter Details
collection_name: Name of the ChromaDB collection to use for storing document embeddings. If not provided, defaults to config.CHROMA_COLLECTION_NAME. This collection will be created if it doesn't exist.
api_key: OpenAI API key for generating embeddings. If not provided, defaults to config.OPENAI_API_KEY. Required for the embedding function to work.
Return Value
Instantiation returns a DocumentIndexer object with initialized ChromaDB client, collection, embedding function, and document processor. Methods return dictionaries with operation results: index_document returns {'success': bool, 'doc_id': str, 'num_chunks': int, 'skipped': bool, 'reindexed': bool}; index_folder returns {'success': int, 'failed': int, 'skipped': int, 'reindexed': int, 'total': int, 'details': list}; is_document_indexed returns {'doc_id': str, 'indexed_mtime': str, 'needs_reindex': bool} or None; get_collection_stats returns {'total_chunks': int, 'total_documents': int, 'file_names': list}; delete_document and clear_collection return bool.
Class Interface
Methods
__init__(self, collection_name: str = None, api_key: str = None)
Purpose: Initialize the DocumentIndexer with ChromaDB connection, embedding function, and document processor
Parameters:
collection_name: Name of ChromaDB collection (defaults to config.CHROMA_COLLECTION_NAME)api_key: OpenAI API key for embeddings (defaults to config.OPENAI_API_KEY)
Returns: None - initializes instance attributes
is_document_indexed(self, file_path: Path) -> Optional[Dict[str, Any]]
Purpose: Check if a document is already indexed and whether it needs re-indexing based on modification time
Parameters:
file_path: Path object pointing to the document file to check
Returns: Dictionary with 'doc_id', 'indexed_mtime', and 'needs_reindex' keys if document exists, None otherwise
index_document(self, file_path: Path, force_reindex: bool = False) -> Dict[str, Any]
Purpose: Index a single document with smart incremental indexing that skips unchanged files and re-indexes modified ones
Parameters:
file_path: Path object pointing to the document file to indexforce_reindex: If True, re-index even if document is already indexed and unchanged
Returns: Dictionary with 'success' (bool), 'doc_id' (str), 'num_chunks' (int), 'file_name' (str), 'skipped' (bool), 'reindexed' (bool), and 'error' (str) if failed
index_folder(self, folder_path: Path, recursive: bool = True, force_reindex: bool = False, progress_callback=None) -> Dict[str, Any]
Purpose: Index all supported documents in a folder with optional recursion and progress tracking
Parameters:
folder_path: Path object pointing to the folder containing documentsrecursive: If True, recursively process subdirectoriesforce_reindex: If True, re-index all documents even if unchangedprogress_callback: Optional callback function(current, total, filename) called for each file processed
Returns: Dictionary with 'success' (int), 'failed' (int), 'skipped' (int), 'reindexed' (int), 'total' (int), and 'details' (list) containing per-file results
get_collection_stats(self) -> Dict[str, Any]
Purpose: Get statistics about the ChromaDB collection including chunk count and unique documents
Returns: Dictionary with 'total_chunks' (int), 'total_documents' (int), and 'file_names' (list of str)
delete_document(self, doc_id: str) -> bool
Purpose: Delete a document and all its chunks from the collection
Parameters:
doc_id: Unique document identifier (UUID string) to delete
Returns: True if deletion successful, False otherwise
clear_collection(self) -> bool
Purpose: Delete and recreate the entire collection, removing all documents
Returns: True if successful, False otherwise
Attributes
| Name | Type | Description | Scope |
|---|---|---|---|
collection_name |
str | Name of the ChromaDB collection being used | instance |
api_key |
str | OpenAI API key for generating embeddings | instance |
chroma_client |
chromadb.HttpClient | ChromaDB HTTP client connected to the configured server | instance |
embedding_function |
DocChatEmbeddingFunction | Embedding function instance for generating document embeddings | instance |
collection |
chromadb.Collection | ChromaDB collection object with HNSW index and cosine similarity | instance |
processor |
DocumentProcessor | Document processor instance for extracting text and chunks from various file formats | instance |
Dependencies
chromadbopenaitiktokennumpyllmsherpaPyPDF2python-pptxopenpyxlpandaspython-docxpdf2imagePillowpytesseracteasyocrpathlibuuidloggingostempfilesubprocessthreadingwarningssystraceback
Required Imports
import os
import logging
from pathlib import Path
from uuid import uuid4
from typing import Dict, Any, Optional
import chromadb
import config
Conditional/Optional Imports
These imports are only needed under specific conditions:
from llmsherpa.readers import LayoutPDFReader
Condition: used by DocumentProcessor for PDF processing
Required (conditional)import PyPDF2
Condition: used by DocumentProcessor for PDF text extraction
Required (conditional)import pptx
Condition: used by DocumentProcessor for PowerPoint files
Required (conditional)import openpyxl
Condition: used by DocumentProcessor for Excel files
Required (conditional)from docx import Document as DocxDocument
Condition: used by DocumentProcessor for Word documents
Required (conditional)from pdf2image import convert_from_path
Condition: used for OCR processing of scanned PDFs
Optionalimport pytesseract
Condition: used for OCR text extraction
Optionalimport easyocr
Condition: alternative OCR engine
OptionalUsage Example
# Initialize the indexer
indexer = DocumentIndexer(
collection_name='my_documents',
api_key='sk-...'
)
# Check collection stats
stats = indexer.get_collection_stats()
print(f"Total documents: {stats['total_documents']}")
# Index a single document
from pathlib import Path
result = indexer.index_document(Path('document.pdf'))
if result['success']:
print(f"Indexed {result['num_chunks']} chunks")
doc_id = result['doc_id']
# Index entire folder with progress tracking
def progress_callback(current, total, filename):
print(f"Processing {current}/{total}: {filename}")
folder_result = indexer.index_folder(
Path('/path/to/documents'),
recursive=True,
force_reindex=False,
progress_callback=progress_callback
)
print(f"Success: {folder_result['success']}, Failed: {folder_result['failed']}")
# Check if document needs re-indexing
file_path = Path('document.pdf')
status = indexer.is_document_indexed(file_path)
if status and status['needs_reindex']:
print("Document has been modified, re-indexing...")
indexer.index_document(file_path, force_reindex=True)
# Delete a document
indexer.delete_document(doc_id)
# Clear entire collection
indexer.clear_collection()
Best Practices
- Always check is_document_indexed() before indexing to avoid duplicate work unless force_reindex=True is needed
- Use progress_callback with index_folder() for long-running batch operations to provide user feedback
- Handle failed indexing gracefully - the class returns detailed error information in result dictionaries
- Scanned PDFs without text layers will fail to index unless OCR is configured - check error messages for 'No content extracted'
- The class maintains file modification times (mtime) to detect changes - ensure file system timestamps are reliable
- ChromaDB connection is established in __init__ - ensure the server is running before instantiation
- Document chunks are stored with metadata including doc_id, chunk_index, file_path, and file_mtime for tracking
- Use delete_document() before re-indexing modified files to avoid orphaned chunks (handled automatically by index_document)
- Collection uses HNSW index with cosine similarity - appropriate for semantic similarity search
- The class is thread-safe for indexing operations but ChromaDB client connections should not be shared across processes
- Large folder indexing operations can be memory-intensive - consider processing in batches for very large document sets
- Failed files are logged with detailed error messages - review logs to identify problematic documents
- The embedding_function is initialized once and reused - do not modify after initialization
Tags
Similar Components
AI-powered semantic similarity - components with related functionality:
-
function get_document_info 66.1% similar
-
function index_documents_example 61.8% similar
-
class MyEmbeddingFunction_v1 60.9% similar
-
class DocumentProcessor_v7 60.5% similar
-
class DocChatEmbeddingFunction 59.7% similar