🔍 Code Extractor

class DocumentProcessor_v8

Maturity: 25

Process different document types for indexing

File:
/tf/active/vicechatdev/docchat/document_indexer.py
Lines:
230 - 787
Complexity:
moderate

Purpose

Process different document types for indexing

Source Code

class DocumentProcessor:
    """Process different document types for indexing"""
    
    # Supported file extensions
    WORD_EXTENSIONS = ['.doc', '.docx', '.docm', '.dot', '.dotx', '.dotm', '.rtf', '.odt']
    PPT_EXTENSIONS = ['.ppt', '.pptx', '.pptm', '.pot', '.potx', '.potm', '.pps', '.ppsx', '.odp']
    EXCEL_EXTENSIONS = ['.xls', '.xlsx', '.xlsm', '.xlt', '.xltx', '.xltm', '.xlsb', '.ods']
    PDF_EXTENSIONS = ['.pdf']
    TEXT_EXTENSIONS = ['.txt', '.md']
    
    def __init__(self, temp_dir: Optional[Path] = None):
        """
        Initialize document processor
        
        Args:
            temp_dir: Directory for temporary files
        """
        self.temp_dir = temp_dir or Path(tempfile.mkdtemp())
        self.temp_dir.mkdir(parents=True, exist_ok=True)
        self.tokenizer = tiktoken.get_encoding("cl100k_base")
        # Max tokens per chunk (conservative for embeddings)
        self.max_chunk_tokens = 1000
        self.chunk_overlap_tokens = 100
        
    def _chunk_large_text(self, text: str, metadata: dict = None) -> List[Dict[str, Any]]:
        """
        Split large text into smaller chunks with overlap
        
        Args:
            text: Text to chunk
            metadata: Base metadata to include in all chunks
            
        Returns:
            List of chunk dictionaries
        """
        if metadata is None:
            metadata = {}
            
        tokens = self.tokenizer.encode(text)
        
        # If text is small enough, return as single chunk
        if len(tokens) <= self.max_chunk_tokens:
            return [{
                'text': text,
                'type': 'text',
                'metadata': metadata.copy()
            }]
        
        # Split into overlapping chunks
        chunks = []
        start = 0
        chunk_num = 0
        
        while start < len(tokens):
            end = min(start + self.max_chunk_tokens, len(tokens))
            chunk_tokens = tokens[start:end]
            chunk_text = self.tokenizer.decode(chunk_tokens)
            
            chunk_metadata = metadata.copy()
            chunk_metadata['chunk_part'] = chunk_num
            chunk_metadata['total_chunks'] = -1  # Will be updated after
            
            chunks.append({
                'text': chunk_text,
                'type': 'text',
                'metadata': chunk_metadata
            })
            
            chunk_num += 1
            # Move forward with overlap
            start = end - self.chunk_overlap_tokens if end < len(tokens) else end
        
        # Update total chunks count
        for chunk in chunks:
            chunk['metadata']['total_chunks'] = len(chunks)
        
        logger.debug(f"Split large text ({len(tokens)} tokens) into {len(chunks)} chunks")
        return chunks
    
    def _should_use_ocr(self, extracted_text: str) -> bool:
        """
        Determine if OCR should be used based on the quality of extracted text.
        Based on document_processor.py best practices.
        """
        if not extracted_text or len(extracted_text.strip()) < 100:
            return True
            
        # Check for signs of poor text extraction (lots of garbled characters)
        garbled_chars = sum(1 for c in extracted_text if ord(c) > 127 and c not in 'àáâãäåæçèéêëìíîïðñòóôõöøùúûüýþÿ')
        garbled_ratio = garbled_chars / len(extracted_text) if extracted_text else 0
        
        if garbled_ratio > 0.1:  # More than 10% garbled characters
            logger.debug(f"High garbled character ratio ({garbled_ratio:.2%}), considering OCR")
            return True
            
        # Check for very short lines (sign of poor extraction)
        lines = extracted_text.split('\n')
        short_lines = sum(1 for line in lines if len(line.strip()) < 10 and line.strip())
        if len(lines) > 10 and short_lines / len(lines) > 0.5:
            logger.debug("Many short lines detected, considering OCR")
            return True
            
        return False
    
    def _extract_text_with_ocr(self, file_path: Path, max_pages: int = None) -> Optional[str]:
        """
        Extract text from PDF using OCR when standard methods fail.
        Uses pytesseract (if available) or EasyOCR as fallback.
        Based on document_processor.py dual-engine approach.
        
        Args:
            file_path: Path to PDF file
            max_pages: Maximum number of pages to OCR (uses config.OCR_MAX_PAGES if not specified)
        
        Returns:
            Extracted text or None if failed
        """
        if not config.OCR_ENABLED:
            logger.debug("OCR is disabled in configuration")
            return None
        
        if not OCR_AVAILABLE:
            logger.warning("OCR libraries not available for fallback text extraction")
            return None
        
        if not TESSERACT_AVAILABLE and not EASYOCR_AVAILABLE:
            logger.warning("Neither Tesseract nor EasyOCR available - OCR extraction unavailable")
            return None
        
        if max_pages is None:
            max_pages = config.OCR_MAX_PAGES
        
        # Log which OCR engine will be used
        if TESSERACT_AVAILABLE:
            logger.debug(f"Attempting OCR text extraction using Tesseract for: {file_path.name}")
        else:
            logger.debug(f"Attempting OCR text extraction using EasyOCR for: {file_path.name}")
            
        logger.debug(f"Attempting OCR text extraction for: {file_path.name}")
        
        try:
            # Convert PDF pages to images
            images = convert_from_path(str(file_path), dpi=300, first_page=1, last_page=max_pages)
            logger.debug(f"Converted PDF to {len(images)} images for OCR (max {max_pages} pages)")
            
            all_text = []
            
            for i, image in enumerate(images):
                logger.debug(f"OCR processing page {i+1}/{len(images)}")
                
                # Method 1: Try pytesseract first (usually faster) - only if available
                if TESSERACT_AVAILABLE:
                    try:
                        text_pytesseract = pytesseract.image_to_string(image, lang='eng', config='--psm 6')
                        if text_pytesseract and len(text_pytesseract.strip()) > 50:  # Reasonable amount of text
                            all_text.append(f"=== Page {i+1} ===\n{text_pytesseract}")
                            continue
                        else:
                            logger.debug(f"Tesseract extracted insufficient text from page {i+1}")
                    except Exception as e:
                        logger.warning(f"Tesseract failed on page {i+1}: {e}")
                
                # Method 2: Try EasyOCR (works without tesseract installation)
                if EASYOCR_AVAILABLE:
                    try:
                        # Convert PIL image to numpy array for easyocr
                        import numpy as np
                        image_array = np.array(image)
                        
                        # Initialize easyocr reader (cache it for efficiency)
                        if not hasattr(self, '_easyocr_reader'):
                            logger.debug("Initializing EasyOCR reader (CPU mode, first use may take time)")
                            self._easyocr_reader = easyocr.Reader(['en'], gpu=False, verbose=False)
                        
                        results = self._easyocr_reader.readtext(image_array)
                        text_easyocr = '\n'.join([result[1] for result in results if result[2] > 0.5])  # confidence > 0.5
                        
                        if text_easyocr and len(text_easyocr.strip()) > 50:
                            all_text.append(f"=== Page {i+1} ===\n{text_easyocr}")
                        else:
                            logger.debug(f"EasyOCR extracted insufficient text from page {i+1}")
                            
                    except Exception as e:
                        logger.warning(f"EasyOCR failed on page {i+1}: {e}")
            
            if all_text:
                full_text = '\n\n'.join(all_text)
                ocr_method = "Tesseract" if TESSERACT_AVAILABLE else "EasyOCR"
                logger.debug(f"OCR ({ocr_method}) extracted {len(full_text)} characters from {len(all_text)} pages")
                return full_text
            else:
                logger.warning("OCR failed to extract meaningful text from any pages")
                return None
                
        except Exception as e:
            logger.error(f"OCR processing failed for {file_path.name}: {e}")
            import traceback
            logger.error(f"Traceback: {traceback.format_exc()}")
            return None
        
    def _get_file_extension(self, file_path: Path) -> str:
        """Get lowercase file extension"""
        return file_path.suffix.lower()
    
    def _get_file_type(self, file_path: Path) -> str:
        """Determine file type from extension"""
        ext = self._get_file_extension(file_path)
        
        if ext in self.WORD_EXTENSIONS:
            return "word"
        elif ext in self.PPT_EXTENSIONS:
            return "powerpoint"
        elif ext in self.EXCEL_EXTENSIONS:
            return "excel"
        elif ext in self.PDF_EXTENSIONS:
            return "pdf"
        elif ext in self.TEXT_EXTENSIONS:
            return "text"
        else:
            return "unknown"
    
    def _convert_to_pdf(self, input_file: Path) -> Optional[Path]:
        """Convert document to PDF using LibreOffice"""
        output_pdf = self.temp_dir / f"{input_file.stem}.pdf"
        
        try:
            # Use LibreOffice to convert
            cmd = [
                'libreoffice',
                '--headless',
                '--convert-to', 'pdf',
                '--outdir', str(self.temp_dir),
                str(input_file)
            ]
            
            result = subprocess.run(
                cmd,
                capture_output=True,
                text=True,
                timeout=300  # Increased from 120 to 300 seconds (5 minutes)
            )
            
            if result.returncode == 0 and output_pdf.exists():
                logger.debug(f"Successfully converted {input_file.name} to PDF")
                return output_pdf
            else:
                logger.error(f"LibreOffice conversion failed: {result.stderr}")
                return None
                
        except subprocess.TimeoutExpired:
            logger.error(f"Conversion timeout for {input_file.name}")
            return None
        except Exception as e:
            logger.error(f"Error converting {input_file.name}: {e}")
            return None
    
    def _process_pdf_document(self, file_path: Path) -> Dict[str, Any]:
        """Process PDF using llmsherpa with timeout protection and fallback"""
        logger.debug(f"Processing PDF: {file_path}")
        
        # Get file size for logging
        file_size_mb = file_path.stat().st_size / (1024 * 1024)
        logger.debug(f"PDF size: {file_size_mb:.2f} MB")
        
        try:
            # Try llmsherpa with timeout protection using threading
            def llmsherpa_parse():
                logger.debug(f"Attempting LLMSherpa parsing (timeout: {config.PDF_PROCESSING_TIMEOUT}s)...")
                pdf_reader = LayoutPDFReader(config.LLMSHERPA_API_URL)
                doc = pdf_reader.read_pdf(str(file_path))
                
                # Extract chunks
                chunks = []
                for chunk in doc.chunks():
                    chunk_text = chunk.to_context_text()
                    if chunk_text and len(chunk_text.strip()) > 50:
                        chunks.append({
                            'text': chunk_text,
                            'type': 'text',
                            'metadata': {
                                'page': getattr(chunk, 'page_idx', 0),
                                'section': getattr(chunk, 'tag', 'content')
                            }
                        })
                return chunks
            
            chunks = run_with_timeout(llmsherpa_parse, timeout_duration=config.PDF_PROCESSING_TIMEOUT)
            
            # Check if we got meaningful content
            if len(chunks) > 0:
                logger.debug(f"LLMSherpa successfully extracted {len(chunks)} chunks")
                return {'chunks': chunks, 'success': True}
            else:
                logger.warning(f"LLMSherpa extracted 0 chunks (likely scanned/image PDF), falling back to PyPDF2")
        
        except TimeoutException as te:
            logger.warning(f"LLMSherpa timed out after {config.PDF_PROCESSING_TIMEOUT}s, falling back to PyPDF2: {te}")
            
        except Exception as e:
            logger.warning(f"LLMSherpa failed, falling back to PyPDF2: {e}")
            
        # Fallback to PyPDF2 with chunking for large PDFs
        try:
            logger.debug("Using PyPDF2 fallback for PDF extraction...")
            
            # Suppress PyPDF2 warnings for cleaner logs
            pypdf_logger = logging.getLogger('PyPDF2')
            original_level = pypdf_logger.level
            pypdf_logger.setLevel(logging.ERROR)
            
            chunks = []
            
            try:
                with open(file_path, 'rb') as f:
                    pdf_reader = PyPDF2.PdfReader(f)
                    total_pages = len(pdf_reader.pages)
                    logger.debug(f"PDF has {total_pages} pages")
                    
                    # Process pages in batches to avoid memory issues
                    batch_size = 50  # Process 50 pages at a time
                    for batch_start in range(0, total_pages, batch_size):
                        batch_end = min(batch_start + batch_size, total_pages)
                        logger.debug(f"Processing pages {batch_start + 1} to {batch_end}...")
                        
                        for page_num in range(batch_start, batch_end):
                            try:
                                page = pdf_reader.pages[page_num]
                                text = page.extract_text()
                                
                                if text and len(text.strip()) > 50:
                                    # Chunk large pages to avoid token limit issues
                                    page_metadata = {
                                        'page': page_num + 1,
                                        'section': 'content'
                                    }
                                    page_chunks = self._chunk_large_text(text, page_metadata)
                                    chunks.extend(page_chunks)
                            except Exception as page_error:
                                logger.warning(f"Failed to extract page {page_num + 1}: {page_error}")
                                continue
            finally:
                # Restore original logging level
                pypdf_logger.setLevel(original_level)
            
            # Check if we got meaningful content
            if chunks and len(chunks) > 0:
                logger.debug(f"PyPDF2 successfully extracted {len(chunks)} chunks from {total_pages} pages")
                
                # Check quality and consider OCR if needed
                all_text = ' '.join([chunk['text'] for chunk in chunks])
                if self._should_use_ocr(all_text) and OCR_AVAILABLE:
                    logger.debug("PyPDF2 text quality poor, attempting OCR enhancement")
                    ocr_text = self._extract_text_with_ocr(file_path)
                    
                    if ocr_text and len(ocr_text) > len(all_text):
                        logger.debug("OCR produced better results, using OCR text")
                        # Convert OCR text to chunks
                        ocr_chunks = self._chunk_large_text(
                            ocr_text,
                            metadata={'extraction_method': 'OCR', 'section': 'content'}
                        )
                        return {'chunks': ocr_chunks, 'success': True}
                
                return {'chunks': chunks, 'success': True}
            else:
                # No text extracted by PyPDF2 - try OCR as last resort
                logger.warning("PyPDF2 extracted no content")
                
                if OCR_AVAILABLE:
                    logger.debug("Attempting OCR as final fallback for scanned PDF")
                    ocr_text = self._extract_text_with_ocr(file_path)
                    
                    if ocr_text:
                        logger.info(f"OCR successfully extracted text from scanned PDF")
                        ocr_chunks = self._chunk_large_text(
                            ocr_text,
                            metadata={'extraction_method': 'OCR', 'section': 'content'}
                        )
                        return {'chunks': ocr_chunks, 'success': True}
                    else:
                        logger.error("OCR also failed to extract content")
                else:
                    logger.error("No OCR available - scanned PDF cannot be processed")
                
                return {'chunks': [], 'success': False, 'error': 'No content extracted - likely scanned PDF without OCR'}
            
        except Exception as e2:
            error_msg = str(e2)
            logger.error(f"PDF processing failed completely: {error_msg}")
            
            # Check for specific error types
            if 'not been decrypted' in error_msg or 'encrypted' in error_msg.lower():
                logger.warning(f"Skipping encrypted/password-protected PDF: {file_path.name}")
                return {'chunks': [], 'success': False, 'error': 'PDF is encrypted or password-protected'}
            
            try:
                import traceback
                logger.error(f"Traceback: {traceback.format_exc()}")
            except:
                logger.error(f"Could not format traceback")
                
            return {'chunks': [], 'success': False, 'error': error_msg}
    
    def _process_word_document(self, file_path: Path) -> Dict[str, Any]:
        """Process Word document"""
        logger.info(f"Processing Word document: {file_path}")
        
        # Convert to PDF first
        pdf_path = self._convert_to_pdf(file_path)
        if pdf_path:
            return self._process_pdf_document(pdf_path)
        
        # Fallback to direct text extraction
        try:
            doc = DocxDocument(str(file_path))
            text = "\n\n".join([para.text for para in doc.paragraphs if para.text.strip()])
            
            return {
                'chunks': [{
                    'text': text,
                    'type': 'text',
                    'metadata': {'section': 'content'}
                }],
                'success': True
            }
        except Exception as e:
            logger.error(f"Word processing failed: {e}")
            return {'chunks': [], 'success': False, 'error': str(e)}
    
    def _process_powerpoint(self, file_path: Path) -> Dict[str, Any]:
        """Process PowerPoint presentation"""
        logger.debug(f"Processing PowerPoint: {file_path}")
        
        try:
            prs = pptx.Presentation(str(file_path))
            chunks = []
            
            for slide_num, slide in enumerate(prs.slides):
                slide_text = []
                
                # Extract text from shapes
                for shape in slide.shapes:
                    if hasattr(shape, "text") and shape.text.strip():
                        slide_text.append(shape.text.strip())
                
                if slide_text:
                    chunks.append({
                        'text': "\n".join(slide_text),
                        'type': 'text',
                        'metadata': {
                            'slide': slide_num + 1,
                            'section': 'slide'
                        }
                    })
            
            return {'chunks': chunks, 'success': True}
            
        except Exception as e:
            logger.error(f"PowerPoint processing failed: {e}")
            
            # Try conversion to PDF as fallback
            pdf_path = self._convert_to_pdf(file_path)
            if pdf_path:
                return self._process_pdf_document(pdf_path)
                
            return {'chunks': [], 'success': False, 'error': str(e)}
    
    def _process_excel(self, file_path: Path) -> Dict[str, Any]:
        """Process Excel spreadsheet"""
        logger.debug(f"Processing Excel: {file_path}")
        
        try:
            # Try pandas first
            excel_file = pd.ExcelFile(str(file_path))
            chunks = []
            
            for sheet_name in excel_file.sheet_names:
                df = pd.read_excel(excel_file, sheet_name=sheet_name)
                
                # Convert to markdown table
                if not df.empty:
                    markdown_table = df.to_markdown(index=False)
                    chunks.append({
                        'text': f"## Sheet: {sheet_name}\n\n{markdown_table}",
                        'type': 'table',
                        'metadata': {
                            'sheet': sheet_name,
                            'rows': len(df),
                            'columns': len(df.columns)
                        }
                    })
            
            return {'chunks': chunks, 'success': True}
            
        except Exception as e:
            logger.error(f"Excel processing failed: {e}")
            return {'chunks': [], 'success': False, 'error': str(e)}
    
    def _process_text_file(self, file_path: Path) -> Dict[str, Any]:
        """Process plain text or markdown file"""
        logger.info(f"Processing text file: {file_path}")
        
        try:
            with open(file_path, 'r', encoding='utf-8') as f:
                text = f.read()
            
            return {
                'chunks': [{
                    'text': text,
                    'type': 'text',
                    'metadata': {'section': 'content'}
                }],
                'success': True
            }
        except Exception as e:
            logger.error(f"Text file processing failed: {e}")
            return {'chunks': [], 'success': False, 'error': str(e)}
    
    def process_document(self, file_path: Path) -> Dict[str, Any]:
        """
        Process a document and extract chunks
        
        Args:
            file_path: Path to document
            
        Returns:
            Dictionary with chunks and metadata
        """
        # Check file size and log warning for large files
        file_size_mb = file_path.stat().st_size / (1024 * 1024)
        if file_size_mb > 100:
            logger.warning(f"Processing large file ({file_size_mb:.2f} MB): {file_path.name}")
        
        file_type = self._get_file_type(file_path)
        
        if file_type == "pdf":
            result = self._process_pdf_document(file_path)
        elif file_type == "word":
            result = self._process_word_document(file_path)
        elif file_type == "powerpoint":
            result = self._process_powerpoint(file_path)
        elif file_type == "excel":
            result = self._process_excel(file_path)
        elif file_type == "text":
            result = self._process_text_file(file_path)
        else:
            logger.error(f"Unsupported file type: {file_path}")
            return {'chunks': [], 'success': False, 'error': 'Unsupported file type'}
        
        # Add file metadata to all chunks
        if result.get('success'):
            for chunk in result.get('chunks', []):
                chunk['metadata']['file_name'] = file_path.name
                chunk['metadata']['file_path'] = str(file_path)
                chunk['metadata']['source'] = str(file_path)  # Add 'source' for compatibility
                chunk['metadata']['file_type'] = file_type
        
        return result

Parameters

Name Type Default Kind
bases - -

Parameter Details

bases: Parameter of type

Return Value

Returns unspecified type

Class Interface

Methods

__init__(self, temp_dir)

Purpose: Initialize document processor Args: temp_dir: Directory for temporary files

Parameters:

  • temp_dir: Type: Optional[Path]

Returns: None

_chunk_large_text(self, text, metadata) -> List[Dict[str, Any]]

Purpose: Split large text into smaller chunks with overlap Args: text: Text to chunk metadata: Base metadata to include in all chunks Returns: List of chunk dictionaries

Parameters:

  • text: Type: str
  • metadata: Type: dict

Returns: Returns List[Dict[str, Any]]

_should_use_ocr(self, extracted_text) -> bool

Purpose: Determine if OCR should be used based on the quality of extracted text. Based on document_processor.py best practices.

Parameters:

  • extracted_text: Type: str

Returns: Returns bool

_extract_text_with_ocr(self, file_path, max_pages) -> Optional[str]

Purpose: Extract text from PDF using OCR when standard methods fail. Uses pytesseract (if available) or EasyOCR as fallback. Based on document_processor.py dual-engine approach. Args: file_path: Path to PDF file max_pages: Maximum number of pages to OCR (uses config.OCR_MAX_PAGES if not specified) Returns: Extracted text or None if failed

Parameters:

  • file_path: Type: Path
  • max_pages: Type: int

Returns: Returns Optional[str]

_get_file_extension(self, file_path) -> str

Purpose: Get lowercase file extension

Parameters:

  • file_path: Type: Path

Returns: Returns str

_get_file_type(self, file_path) -> str

Purpose: Determine file type from extension

Parameters:

  • file_path: Type: Path

Returns: Returns str

_convert_to_pdf(self, input_file) -> Optional[Path]

Purpose: Convert document to PDF using LibreOffice

Parameters:

  • input_file: Type: Path

Returns: Returns Optional[Path]

_process_pdf_document(self, file_path) -> Dict[str, Any]

Purpose: Process PDF using llmsherpa with timeout protection and fallback

Parameters:

  • file_path: Type: Path

Returns: Returns Dict[str, Any]

_process_word_document(self, file_path) -> Dict[str, Any]

Purpose: Process Word document

Parameters:

  • file_path: Type: Path

Returns: Returns Dict[str, Any]

_process_powerpoint(self, file_path) -> Dict[str, Any]

Purpose: Process PowerPoint presentation

Parameters:

  • file_path: Type: Path

Returns: Returns Dict[str, Any]

_process_excel(self, file_path) -> Dict[str, Any]

Purpose: Process Excel spreadsheet

Parameters:

  • file_path: Type: Path

Returns: Returns Dict[str, Any]

_process_text_file(self, file_path) -> Dict[str, Any]

Purpose: Process plain text or markdown file

Parameters:

  • file_path: Type: Path

Returns: Returns Dict[str, Any]

process_document(self, file_path) -> Dict[str, Any]

Purpose: Process a document and extract chunks Args: file_path: Path to document Returns: Dictionary with chunks and metadata

Parameters:

  • file_path: Type: Path

Returns: Returns Dict[str, Any]

Required Imports

import os
import logging
import tempfile
import subprocess
from pathlib import Path

Usage Example

# Example usage:
# result = DocumentProcessor(bases)

Similar Components

AI-powered semantic similarity - components with related functionality:

  • class DocumentProcessor_v6 81.2% similar

    Process different document types for RAG context extraction

    From: /tf/active/vicechatdev/offline_docstore_multi.py
  • class DocumentProcessor_v5 80.4% similar

    Process different document types for RAG context extraction

    From: /tf/active/vicechatdev/offline_docstore_multi_vice.py
  • class DocumentProcessor 67.5% similar

    A comprehensive document processing class that converts documents to PDF, adds audit trails, applies security features (watermarks, signatures, hashing), and optionally converts to PDF/A format with document protection.

    From: /tf/active/vicechatdev/document_auditor/src/document_processor.py
  • class DocumentProcessor_v7 67.3% similar

    Lightweight document processor for chat upload functionality

    From: /tf/active/vicechatdev/vice_ai/document_processor.py
  • class DocumentProcessor_v4 64.1% similar

    Handles document processing and text extraction using llmsherpa (same approach as offline_docstore_multi_vice.py).

    From: /tf/active/vicechatdev/docchat/document_processor.py
← Back to Browse