class DocumentProcessor_v8
Process different document types for indexing
/tf/active/vicechatdev/docchat/document_indexer.py
230 - 787
moderate
Purpose
Process different document types for indexing
Source Code
class DocumentProcessor:
"""Process different document types for indexing"""
# Supported file extensions
WORD_EXTENSIONS = ['.doc', '.docx', '.docm', '.dot', '.dotx', '.dotm', '.rtf', '.odt']
PPT_EXTENSIONS = ['.ppt', '.pptx', '.pptm', '.pot', '.potx', '.potm', '.pps', '.ppsx', '.odp']
EXCEL_EXTENSIONS = ['.xls', '.xlsx', '.xlsm', '.xlt', '.xltx', '.xltm', '.xlsb', '.ods']
PDF_EXTENSIONS = ['.pdf']
TEXT_EXTENSIONS = ['.txt', '.md']
def __init__(self, temp_dir: Optional[Path] = None):
"""
Initialize document processor
Args:
temp_dir: Directory for temporary files
"""
self.temp_dir = temp_dir or Path(tempfile.mkdtemp())
self.temp_dir.mkdir(parents=True, exist_ok=True)
self.tokenizer = tiktoken.get_encoding("cl100k_base")
# Max tokens per chunk (conservative for embeddings)
self.max_chunk_tokens = 1000
self.chunk_overlap_tokens = 100
def _chunk_large_text(self, text: str, metadata: dict = None) -> List[Dict[str, Any]]:
"""
Split large text into smaller chunks with overlap
Args:
text: Text to chunk
metadata: Base metadata to include in all chunks
Returns:
List of chunk dictionaries
"""
if metadata is None:
metadata = {}
tokens = self.tokenizer.encode(text)
# If text is small enough, return as single chunk
if len(tokens) <= self.max_chunk_tokens:
return [{
'text': text,
'type': 'text',
'metadata': metadata.copy()
}]
# Split into overlapping chunks
chunks = []
start = 0
chunk_num = 0
while start < len(tokens):
end = min(start + self.max_chunk_tokens, len(tokens))
chunk_tokens = tokens[start:end]
chunk_text = self.tokenizer.decode(chunk_tokens)
chunk_metadata = metadata.copy()
chunk_metadata['chunk_part'] = chunk_num
chunk_metadata['total_chunks'] = -1 # Will be updated after
chunks.append({
'text': chunk_text,
'type': 'text',
'metadata': chunk_metadata
})
chunk_num += 1
# Move forward with overlap
start = end - self.chunk_overlap_tokens if end < len(tokens) else end
# Update total chunks count
for chunk in chunks:
chunk['metadata']['total_chunks'] = len(chunks)
logger.debug(f"Split large text ({len(tokens)} tokens) into {len(chunks)} chunks")
return chunks
def _should_use_ocr(self, extracted_text: str) -> bool:
"""
Determine if OCR should be used based on the quality of extracted text.
Based on document_processor.py best practices.
"""
if not extracted_text or len(extracted_text.strip()) < 100:
return True
# Check for signs of poor text extraction (lots of garbled characters)
garbled_chars = sum(1 for c in extracted_text if ord(c) > 127 and c not in 'àáâãäåæçèéêëìíîïðñòóôõöøùúûüýþÿ')
garbled_ratio = garbled_chars / len(extracted_text) if extracted_text else 0
if garbled_ratio > 0.1: # More than 10% garbled characters
logger.debug(f"High garbled character ratio ({garbled_ratio:.2%}), considering OCR")
return True
# Check for very short lines (sign of poor extraction)
lines = extracted_text.split('\n')
short_lines = sum(1 for line in lines if len(line.strip()) < 10 and line.strip())
if len(lines) > 10 and short_lines / len(lines) > 0.5:
logger.debug("Many short lines detected, considering OCR")
return True
return False
def _extract_text_with_ocr(self, file_path: Path, max_pages: int = None) -> Optional[str]:
"""
Extract text from PDF using OCR when standard methods fail.
Uses pytesseract (if available) or EasyOCR as fallback.
Based on document_processor.py dual-engine approach.
Args:
file_path: Path to PDF file
max_pages: Maximum number of pages to OCR (uses config.OCR_MAX_PAGES if not specified)
Returns:
Extracted text or None if failed
"""
if not config.OCR_ENABLED:
logger.debug("OCR is disabled in configuration")
return None
if not OCR_AVAILABLE:
logger.warning("OCR libraries not available for fallback text extraction")
return None
if not TESSERACT_AVAILABLE and not EASYOCR_AVAILABLE:
logger.warning("Neither Tesseract nor EasyOCR available - OCR extraction unavailable")
return None
if max_pages is None:
max_pages = config.OCR_MAX_PAGES
# Log which OCR engine will be used
if TESSERACT_AVAILABLE:
logger.debug(f"Attempting OCR text extraction using Tesseract for: {file_path.name}")
else:
logger.debug(f"Attempting OCR text extraction using EasyOCR for: {file_path.name}")
logger.debug(f"Attempting OCR text extraction for: {file_path.name}")
try:
# Convert PDF pages to images
images = convert_from_path(str(file_path), dpi=300, first_page=1, last_page=max_pages)
logger.debug(f"Converted PDF to {len(images)} images for OCR (max {max_pages} pages)")
all_text = []
for i, image in enumerate(images):
logger.debug(f"OCR processing page {i+1}/{len(images)}")
# Method 1: Try pytesseract first (usually faster) - only if available
if TESSERACT_AVAILABLE:
try:
text_pytesseract = pytesseract.image_to_string(image, lang='eng', config='--psm 6')
if text_pytesseract and len(text_pytesseract.strip()) > 50: # Reasonable amount of text
all_text.append(f"=== Page {i+1} ===\n{text_pytesseract}")
continue
else:
logger.debug(f"Tesseract extracted insufficient text from page {i+1}")
except Exception as e:
logger.warning(f"Tesseract failed on page {i+1}: {e}")
# Method 2: Try EasyOCR (works without tesseract installation)
if EASYOCR_AVAILABLE:
try:
# Convert PIL image to numpy array for easyocr
import numpy as np
image_array = np.array(image)
# Initialize easyocr reader (cache it for efficiency)
if not hasattr(self, '_easyocr_reader'):
logger.debug("Initializing EasyOCR reader (CPU mode, first use may take time)")
self._easyocr_reader = easyocr.Reader(['en'], gpu=False, verbose=False)
results = self._easyocr_reader.readtext(image_array)
text_easyocr = '\n'.join([result[1] for result in results if result[2] > 0.5]) # confidence > 0.5
if text_easyocr and len(text_easyocr.strip()) > 50:
all_text.append(f"=== Page {i+1} ===\n{text_easyocr}")
else:
logger.debug(f"EasyOCR extracted insufficient text from page {i+1}")
except Exception as e:
logger.warning(f"EasyOCR failed on page {i+1}: {e}")
if all_text:
full_text = '\n\n'.join(all_text)
ocr_method = "Tesseract" if TESSERACT_AVAILABLE else "EasyOCR"
logger.debug(f"OCR ({ocr_method}) extracted {len(full_text)} characters from {len(all_text)} pages")
return full_text
else:
logger.warning("OCR failed to extract meaningful text from any pages")
return None
except Exception as e:
logger.error(f"OCR processing failed for {file_path.name}: {e}")
import traceback
logger.error(f"Traceback: {traceback.format_exc()}")
return None
def _get_file_extension(self, file_path: Path) -> str:
"""Get lowercase file extension"""
return file_path.suffix.lower()
def _get_file_type(self, file_path: Path) -> str:
"""Determine file type from extension"""
ext = self._get_file_extension(file_path)
if ext in self.WORD_EXTENSIONS:
return "word"
elif ext in self.PPT_EXTENSIONS:
return "powerpoint"
elif ext in self.EXCEL_EXTENSIONS:
return "excel"
elif ext in self.PDF_EXTENSIONS:
return "pdf"
elif ext in self.TEXT_EXTENSIONS:
return "text"
else:
return "unknown"
def _convert_to_pdf(self, input_file: Path) -> Optional[Path]:
"""Convert document to PDF using LibreOffice"""
output_pdf = self.temp_dir / f"{input_file.stem}.pdf"
try:
# Use LibreOffice to convert
cmd = [
'libreoffice',
'--headless',
'--convert-to', 'pdf',
'--outdir', str(self.temp_dir),
str(input_file)
]
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=300 # Increased from 120 to 300 seconds (5 minutes)
)
if result.returncode == 0 and output_pdf.exists():
logger.debug(f"Successfully converted {input_file.name} to PDF")
return output_pdf
else:
logger.error(f"LibreOffice conversion failed: {result.stderr}")
return None
except subprocess.TimeoutExpired:
logger.error(f"Conversion timeout for {input_file.name}")
return None
except Exception as e:
logger.error(f"Error converting {input_file.name}: {e}")
return None
def _process_pdf_document(self, file_path: Path) -> Dict[str, Any]:
"""Process PDF using llmsherpa with timeout protection and fallback"""
logger.debug(f"Processing PDF: {file_path}")
# Get file size for logging
file_size_mb = file_path.stat().st_size / (1024 * 1024)
logger.debug(f"PDF size: {file_size_mb:.2f} MB")
try:
# Try llmsherpa with timeout protection using threading
def llmsherpa_parse():
logger.debug(f"Attempting LLMSherpa parsing (timeout: {config.PDF_PROCESSING_TIMEOUT}s)...")
pdf_reader = LayoutPDFReader(config.LLMSHERPA_API_URL)
doc = pdf_reader.read_pdf(str(file_path))
# Extract chunks
chunks = []
for chunk in doc.chunks():
chunk_text = chunk.to_context_text()
if chunk_text and len(chunk_text.strip()) > 50:
chunks.append({
'text': chunk_text,
'type': 'text',
'metadata': {
'page': getattr(chunk, 'page_idx', 0),
'section': getattr(chunk, 'tag', 'content')
}
})
return chunks
chunks = run_with_timeout(llmsherpa_parse, timeout_duration=config.PDF_PROCESSING_TIMEOUT)
# Check if we got meaningful content
if len(chunks) > 0:
logger.debug(f"LLMSherpa successfully extracted {len(chunks)} chunks")
return {'chunks': chunks, 'success': True}
else:
logger.warning(f"LLMSherpa extracted 0 chunks (likely scanned/image PDF), falling back to PyPDF2")
except TimeoutException as te:
logger.warning(f"LLMSherpa timed out after {config.PDF_PROCESSING_TIMEOUT}s, falling back to PyPDF2: {te}")
except Exception as e:
logger.warning(f"LLMSherpa failed, falling back to PyPDF2: {e}")
# Fallback to PyPDF2 with chunking for large PDFs
try:
logger.debug("Using PyPDF2 fallback for PDF extraction...")
# Suppress PyPDF2 warnings for cleaner logs
pypdf_logger = logging.getLogger('PyPDF2')
original_level = pypdf_logger.level
pypdf_logger.setLevel(logging.ERROR)
chunks = []
try:
with open(file_path, 'rb') as f:
pdf_reader = PyPDF2.PdfReader(f)
total_pages = len(pdf_reader.pages)
logger.debug(f"PDF has {total_pages} pages")
# Process pages in batches to avoid memory issues
batch_size = 50 # Process 50 pages at a time
for batch_start in range(0, total_pages, batch_size):
batch_end = min(batch_start + batch_size, total_pages)
logger.debug(f"Processing pages {batch_start + 1} to {batch_end}...")
for page_num in range(batch_start, batch_end):
try:
page = pdf_reader.pages[page_num]
text = page.extract_text()
if text and len(text.strip()) > 50:
# Chunk large pages to avoid token limit issues
page_metadata = {
'page': page_num + 1,
'section': 'content'
}
page_chunks = self._chunk_large_text(text, page_metadata)
chunks.extend(page_chunks)
except Exception as page_error:
logger.warning(f"Failed to extract page {page_num + 1}: {page_error}")
continue
finally:
# Restore original logging level
pypdf_logger.setLevel(original_level)
# Check if we got meaningful content
if chunks and len(chunks) > 0:
logger.debug(f"PyPDF2 successfully extracted {len(chunks)} chunks from {total_pages} pages")
# Check quality and consider OCR if needed
all_text = ' '.join([chunk['text'] for chunk in chunks])
if self._should_use_ocr(all_text) and OCR_AVAILABLE:
logger.debug("PyPDF2 text quality poor, attempting OCR enhancement")
ocr_text = self._extract_text_with_ocr(file_path)
if ocr_text and len(ocr_text) > len(all_text):
logger.debug("OCR produced better results, using OCR text")
# Convert OCR text to chunks
ocr_chunks = self._chunk_large_text(
ocr_text,
metadata={'extraction_method': 'OCR', 'section': 'content'}
)
return {'chunks': ocr_chunks, 'success': True}
return {'chunks': chunks, 'success': True}
else:
# No text extracted by PyPDF2 - try OCR as last resort
logger.warning("PyPDF2 extracted no content")
if OCR_AVAILABLE:
logger.debug("Attempting OCR as final fallback for scanned PDF")
ocr_text = self._extract_text_with_ocr(file_path)
if ocr_text:
logger.info(f"OCR successfully extracted text from scanned PDF")
ocr_chunks = self._chunk_large_text(
ocr_text,
metadata={'extraction_method': 'OCR', 'section': 'content'}
)
return {'chunks': ocr_chunks, 'success': True}
else:
logger.error("OCR also failed to extract content")
else:
logger.error("No OCR available - scanned PDF cannot be processed")
return {'chunks': [], 'success': False, 'error': 'No content extracted - likely scanned PDF without OCR'}
except Exception as e2:
error_msg = str(e2)
logger.error(f"PDF processing failed completely: {error_msg}")
# Check for specific error types
if 'not been decrypted' in error_msg or 'encrypted' in error_msg.lower():
logger.warning(f"Skipping encrypted/password-protected PDF: {file_path.name}")
return {'chunks': [], 'success': False, 'error': 'PDF is encrypted or password-protected'}
try:
import traceback
logger.error(f"Traceback: {traceback.format_exc()}")
except:
logger.error(f"Could not format traceback")
return {'chunks': [], 'success': False, 'error': error_msg}
def _process_word_document(self, file_path: Path) -> Dict[str, Any]:
"""Process Word document"""
logger.info(f"Processing Word document: {file_path}")
# Convert to PDF first
pdf_path = self._convert_to_pdf(file_path)
if pdf_path:
return self._process_pdf_document(pdf_path)
# Fallback to direct text extraction
try:
doc = DocxDocument(str(file_path))
text = "\n\n".join([para.text for para in doc.paragraphs if para.text.strip()])
return {
'chunks': [{
'text': text,
'type': 'text',
'metadata': {'section': 'content'}
}],
'success': True
}
except Exception as e:
logger.error(f"Word processing failed: {e}")
return {'chunks': [], 'success': False, 'error': str(e)}
def _process_powerpoint(self, file_path: Path) -> Dict[str, Any]:
"""Process PowerPoint presentation"""
logger.debug(f"Processing PowerPoint: {file_path}")
try:
prs = pptx.Presentation(str(file_path))
chunks = []
for slide_num, slide in enumerate(prs.slides):
slide_text = []
# Extract text from shapes
for shape in slide.shapes:
if hasattr(shape, "text") and shape.text.strip():
slide_text.append(shape.text.strip())
if slide_text:
chunks.append({
'text': "\n".join(slide_text),
'type': 'text',
'metadata': {
'slide': slide_num + 1,
'section': 'slide'
}
})
return {'chunks': chunks, 'success': True}
except Exception as e:
logger.error(f"PowerPoint processing failed: {e}")
# Try conversion to PDF as fallback
pdf_path = self._convert_to_pdf(file_path)
if pdf_path:
return self._process_pdf_document(pdf_path)
return {'chunks': [], 'success': False, 'error': str(e)}
def _process_excel(self, file_path: Path) -> Dict[str, Any]:
"""Process Excel spreadsheet"""
logger.debug(f"Processing Excel: {file_path}")
try:
# Try pandas first
excel_file = pd.ExcelFile(str(file_path))
chunks = []
for sheet_name in excel_file.sheet_names:
df = pd.read_excel(excel_file, sheet_name=sheet_name)
# Convert to markdown table
if not df.empty:
markdown_table = df.to_markdown(index=False)
chunks.append({
'text': f"## Sheet: {sheet_name}\n\n{markdown_table}",
'type': 'table',
'metadata': {
'sheet': sheet_name,
'rows': len(df),
'columns': len(df.columns)
}
})
return {'chunks': chunks, 'success': True}
except Exception as e:
logger.error(f"Excel processing failed: {e}")
return {'chunks': [], 'success': False, 'error': str(e)}
def _process_text_file(self, file_path: Path) -> Dict[str, Any]:
"""Process plain text or markdown file"""
logger.info(f"Processing text file: {file_path}")
try:
with open(file_path, 'r', encoding='utf-8') as f:
text = f.read()
return {
'chunks': [{
'text': text,
'type': 'text',
'metadata': {'section': 'content'}
}],
'success': True
}
except Exception as e:
logger.error(f"Text file processing failed: {e}")
return {'chunks': [], 'success': False, 'error': str(e)}
def process_document(self, file_path: Path) -> Dict[str, Any]:
"""
Process a document and extract chunks
Args:
file_path: Path to document
Returns:
Dictionary with chunks and metadata
"""
# Check file size and log warning for large files
file_size_mb = file_path.stat().st_size / (1024 * 1024)
if file_size_mb > 100:
logger.warning(f"Processing large file ({file_size_mb:.2f} MB): {file_path.name}")
file_type = self._get_file_type(file_path)
if file_type == "pdf":
result = self._process_pdf_document(file_path)
elif file_type == "word":
result = self._process_word_document(file_path)
elif file_type == "powerpoint":
result = self._process_powerpoint(file_path)
elif file_type == "excel":
result = self._process_excel(file_path)
elif file_type == "text":
result = self._process_text_file(file_path)
else:
logger.error(f"Unsupported file type: {file_path}")
return {'chunks': [], 'success': False, 'error': 'Unsupported file type'}
# Add file metadata to all chunks
if result.get('success'):
for chunk in result.get('chunks', []):
chunk['metadata']['file_name'] = file_path.name
chunk['metadata']['file_path'] = str(file_path)
chunk['metadata']['source'] = str(file_path) # Add 'source' for compatibility
chunk['metadata']['file_type'] = file_type
return result
Parameters
| Name | Type | Default | Kind |
|---|---|---|---|
bases |
- | - |
Parameter Details
bases: Parameter of type
Return Value
Returns unspecified type
Class Interface
Methods
__init__(self, temp_dir)
Purpose: Initialize document processor Args: temp_dir: Directory for temporary files
Parameters:
temp_dir: Type: Optional[Path]
Returns: None
_chunk_large_text(self, text, metadata) -> List[Dict[str, Any]]
Purpose: Split large text into smaller chunks with overlap Args: text: Text to chunk metadata: Base metadata to include in all chunks Returns: List of chunk dictionaries
Parameters:
text: Type: strmetadata: Type: dict
Returns: Returns List[Dict[str, Any]]
_should_use_ocr(self, extracted_text) -> bool
Purpose: Determine if OCR should be used based on the quality of extracted text. Based on document_processor.py best practices.
Parameters:
extracted_text: Type: str
Returns: Returns bool
_extract_text_with_ocr(self, file_path, max_pages) -> Optional[str]
Purpose: Extract text from PDF using OCR when standard methods fail. Uses pytesseract (if available) or EasyOCR as fallback. Based on document_processor.py dual-engine approach. Args: file_path: Path to PDF file max_pages: Maximum number of pages to OCR (uses config.OCR_MAX_PAGES if not specified) Returns: Extracted text or None if failed
Parameters:
file_path: Type: Pathmax_pages: Type: int
Returns: Returns Optional[str]
_get_file_extension(self, file_path) -> str
Purpose: Get lowercase file extension
Parameters:
file_path: Type: Path
Returns: Returns str
_get_file_type(self, file_path) -> str
Purpose: Determine file type from extension
Parameters:
file_path: Type: Path
Returns: Returns str
_convert_to_pdf(self, input_file) -> Optional[Path]
Purpose: Convert document to PDF using LibreOffice
Parameters:
input_file: Type: Path
Returns: Returns Optional[Path]
_process_pdf_document(self, file_path) -> Dict[str, Any]
Purpose: Process PDF using llmsherpa with timeout protection and fallback
Parameters:
file_path: Type: Path
Returns: Returns Dict[str, Any]
_process_word_document(self, file_path) -> Dict[str, Any]
Purpose: Process Word document
Parameters:
file_path: Type: Path
Returns: Returns Dict[str, Any]
_process_powerpoint(self, file_path) -> Dict[str, Any]
Purpose: Process PowerPoint presentation
Parameters:
file_path: Type: Path
Returns: Returns Dict[str, Any]
_process_excel(self, file_path) -> Dict[str, Any]
Purpose: Process Excel spreadsheet
Parameters:
file_path: Type: Path
Returns: Returns Dict[str, Any]
_process_text_file(self, file_path) -> Dict[str, Any]
Purpose: Process plain text or markdown file
Parameters:
file_path: Type: Path
Returns: Returns Dict[str, Any]
process_document(self, file_path) -> Dict[str, Any]
Purpose: Process a document and extract chunks Args: file_path: Path to document Returns: Dictionary with chunks and metadata
Parameters:
file_path: Type: Path
Returns: Returns Dict[str, Any]
Required Imports
import os
import logging
import tempfile
import subprocess
from pathlib import Path
Usage Example
# Example usage:
# result = DocumentProcessor(bases)
Tags
Similar Components
AI-powered semantic similarity - components with related functionality:
-
class DocumentProcessor_v6 81.2% similar
-
class DocumentProcessor_v5 80.4% similar
-
class DocumentProcessor 67.5% similar
-
class DocumentProcessor_v7 67.3% similar
-
class DocumentProcessor_v4 64.1% similar