🔍 Code Extractor

class DocumentProcessor_v6

Maturity: 26

Lightweight document processor for chat upload functionality

File:
/tf/active/vicechatdev/vice_ai/document_processor.py
Lines:
97 - 1028
Complexity:
moderate

Purpose

Lightweight document processor for chat upload functionality

Source Code

class DocumentProcessor:
    """Lightweight document processor for chat upload functionality"""
    
    # Supported file extensions by type
    WORD_EXTENSIONS = ['.doc', '.docx', '.docm', '.dot', '.dotx', '.dotm', '.rtf', '.odt']
    PPT_EXTENSIONS = ['.ppt', '.pptx', '.pptm', '.pot', '.potx', '.potm', '.pps', '.ppsx', '.odp']
    EXCEL_EXTENSIONS = ['.xls', '.xlsx', '.xlsm', '.xlt', '.xltx', '.xltm', '.xlsb', '.ods']
    PDF_EXTENSIONS = ['.pdf']
    
    def __init__(self, temp_dir=None, llmsherpa_api_url=None):
        """
        Initialize the document processor
        
        Args:
            temp_dir: Directory for temporary files (optional)
            llmsherpa_api_url: URL for llmsherpa API
        """
        self.temp_dir = Path(temp_dir) if temp_dir else Path(tempfile.mkdtemp())
        self.llmsherpa_api_url = llmsherpa_api_url or "http://llmsherpa:5001/api/parseDocument?renderFormat=all&useNewIndentParser=yes"
        
        # Create temp directory if it doesn't exist
        os.makedirs(self.temp_dir, exist_ok=True)
        
        # Setup extraction debugging directory
        self.extracted_dir = Path(__file__).parent / "extracted"
        os.makedirs(self.extracted_dir, exist_ok=True)
        
        # Setup tiktoken for token counting
        self.tokenizer = tiktoken.get_encoding("cl100k_base")
        
        # Log available processing capabilities
        self._log_available_capabilities()
        
    def _log_available_capabilities(self):
        """Log which document processing libraries are available"""
        capabilities = []
        
        if LLMSHERPA_AVAILABLE:
            capabilities.append("✅ llmsherpa (advanced PDF processing)")
        else:
            capabilities.append("❌ llmsherpa (advanced PDF processing)")
            
        if PYPDF2_AVAILABLE:
            capabilities.append("✅ PyPDF2 (basic PDF processing)")
        else:
            capabilities.append("❌ PyPDF2 (basic PDF processing)")
            
        if PYMUPDF_AVAILABLE:
            capabilities.append("✅ pymupdf (enhanced PDF processing + OCR)")
        else:
            capabilities.append("❌ pymupdf (enhanced PDF processing + OCR)")
            
        if PYTESSERACT_AVAILABLE:
            capabilities.append("✅ pytesseract (OCR text recognition)")
        else:
            capabilities.append("❌ pytesseract (OCR text recognition)")
            
        if PDF2IMAGE_AVAILABLE:
            capabilities.append("✅ pdf2image (PDF to image conversion)")
        else:
            capabilities.append("❌ pdf2image (PDF to image conversion)")
            
        if DOCX_AVAILABLE:
            capabilities.append("✅ python-docx (Word document processing)")
        else:
            capabilities.append("❌ python-docx (Word document processing)")
            
        if PPTX_AVAILABLE:
            capabilities.append("✅ python-pptx (PowerPoint processing)")
        else:
            capabilities.append("❌ python-pptx (PowerPoint processing)")
            
        if PANDAS_AVAILABLE:
            capabilities.append("✅ pandas (Excel processing)")
        else:
            capabilities.append("❌ pandas (Excel processing)")
        
        logger.info("📋 Document Processing Capabilities:")
        for capability in capabilities:
            logger.info(f"   {capability}")
            
    def get_available_formats(self):
        """Get list of supported file formats based on available libraries"""
        formats = []
        
        if LLMSHERPA_AVAILABLE or PYPDF2_AVAILABLE:
            formats.extend(self.PDF_EXTENSIONS)
            
        if DOCX_AVAILABLE:
            formats.append('.docx')
        
        # Always support conversion fallback for Word docs
        formats.extend([ext for ext in self.WORD_EXTENSIONS if ext not in formats])
        
        if PPTX_AVAILABLE:
            formats.append('.pptx')
            
        # Always support conversion fallback for PowerPoint
        formats.extend([ext for ext in self.PPT_EXTENSIONS if ext not in formats])
        
        if PANDAS_AVAILABLE:
            formats.extend(self.EXCEL_EXTENSIONS)
        
        return list(set(formats))  # Remove duplicates
        
    def _save_extraction_debug_log(self, file_path, processed_result, extraction_method):
        """
        Save extraction results to debug log file for performance analysis
        
        Args:
            file_path: Original file path
            processed_result: Result from document processing
            extraction_method: Method used for extraction (e.g., 'llmsherpa', 'pypdf2', 'python-docx')
        """
        try:
            # Create timestamp and sanitized filename
            timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
            original_name = Path(file_path).stem
            # Sanitize filename for filesystem
            safe_name = "".join(c for c in original_name if c.isalnum() or c in (' ', '-', '_')).strip()
            safe_name = safe_name.replace(' ', '_')
            
            # Create debug log filename
            debug_filename = f"{timestamp}_{safe_name}_{extraction_method}.json"
            debug_path = self.extracted_dir / debug_filename
            
            # Prepare debug data
            debug_data = {
                "timestamp": timestamp,
                "original_file": str(file_path),
                "filename": Path(file_path).name,
                "file_size_bytes": Path(file_path).stat().st_size if Path(file_path).exists() else 0,
                "extraction_method": extraction_method,
                "processing_result": {
                    "text_chunks_count": len(processed_result.get('text_chunks', [])),
                    "tables_count": len(processed_result.get('tables', [])),
                    "has_error": 'error' in processed_result,
                    "error_message": processed_result.get('error', None)
                },
                "text_content": {
                    "text_chunks": processed_result.get('text_chunks', []),
                    "tables": processed_result.get('tables', [])
                },
                "metadata": processed_result.get('metadata', {}),
                "extraction_stats": {
                    "total_text_length": sum(len(chunk) for chunk in processed_result.get('text_chunks', [])),
                    "total_table_length": sum(len(table) for table in processed_result.get('tables', [])),
                    "estimated_tokens": self.count_tokens(self.get_combined_text(processed_result))
                }
            }
            
            # Save to JSON file
            with open(debug_path, 'w', encoding='utf-8') as f:
                json.dump(debug_data, f, indent=2, ensure_ascii=False)
            
            logger.info(f"📄 Extraction debug log saved: {debug_filename}")
            logger.info(f"   Method: {extraction_method}")
            logger.info(f"   Text chunks: {debug_data['processing_result']['text_chunks_count']}")
            logger.info(f"   Tables: {debug_data['processing_result']['tables_count']}")
            logger.info(f"   Total text length: {debug_data['extraction_stats']['total_text_length']}")
            logger.info(f"   Estimated tokens: {debug_data['extraction_stats']['estimated_tokens']}")
            
        except Exception as e:
            logger.warning(f"Failed to save extraction debug log: {e}")
        
    def _get_file_extension(self, file_path):
        """Get lowercase file extension including the dot"""
        return Path(file_path).suffix.lower()
    
    def _get_file_type(self, file_path):
        """Determine file type based on extension"""
        ext = self._get_file_extension(file_path)
        
        if ext in self.WORD_EXTENSIONS:
            return "word"
        elif ext in self.PPT_EXTENSIONS:
            return "powerpoint"
        elif ext in self.EXCEL_EXTENSIONS:
            return "excel"
        elif ext in self.PDF_EXTENSIONS:
            return "pdf"
        else:
            return "unknown"
    
    def count_tokens(self, text):
        """Count tokens in text using tiktoken"""
        return len(self.tokenizer.encode(text))
    
    def sanitize_text(self, text):
        """Sanitize text by encoding to UTF-8 with error replacement"""
        if not text:
            return ""
        return text.encode("utf-8", errors="replace").decode("utf-8")
    
    def chunk_text(self, text, max_chunk_size=4000, overlap=200):
        """
        Split text into chunks with overlap for better context preservation
        
        Args:
            text: Text to chunk
            max_chunk_size: Maximum characters per chunk
            overlap: Character overlap between chunks
            
        Returns:
            List of text chunks
        """
        if not text or len(text) <= max_chunk_size:
            return [text] if text else []
        
        chunks = []
        start = 0
        
        while start < len(text):
            # Calculate end position
            end = start + max_chunk_size
            
            # Try to break at sentence boundary if possible
            if end < len(text):
                # Look for sentence endings near the end
                for punct in ['. ', '.\n', '? ', '?\n', '! ', '!\n']:
                    last_sentence = text.rfind(punct, start + max_chunk_size - 200, end)
                    if last_sentence != -1:
                        end = last_sentence + len(punct)
                        break
            
            chunk = text[start:end].strip()
            if chunk:
                chunks.append(chunk)
            
            # Move start position with overlap
            start = max(start + max_chunk_size - overlap, end)
            
        return chunks
    
    def _clean_pdf_text(self, text):
        """
        Clean and normalize PDF extracted text
        Removes excessive whitespace, fixes common OCR issues, and improves readability
        """
        if not text:
            return ""
        
        # Normalize whitespace
        import re
        
        # Remove excessive newlines (more than 2 consecutive)
        text = re.sub(r'\n{3,}', '\n\n', text)
        
        # Fix broken words (common in PDF extraction)
        # Look for single letters followed by space and continue
        text = re.sub(r'\b([a-zA-Z])\s+([a-zA-Z])\b', r'\1\2', text)
        
        # Remove excessive spaces
        text = re.sub(r' {2,}', ' ', text)
        
        # Fix common OCR issues
        text = text.replace('fi', 'fi')
        text = text.replace('fl', 'fl')
        text = text.replace('–', '-')
        text = text.replace('"', '"')
        text = text.replace('"', '"')
        text = text.replace(''', "'")
        text = text.replace(''', "'")
        
        # Remove isolated special characters that are OCR artifacts
        text = re.sub(r'\s+[^\w\s]{1}\s+', ' ', text)
        
        # Ensure proper sentence spacing
        text = re.sub(r'\.([A-Z])', r'. \1', text)
        
        # Remove leading/trailing whitespace from lines
        lines = text.split('\n')
        cleaned_lines = [line.strip() for line in lines if line.strip()]
        
        # Join lines and remove extra spaces
        cleaned_text = '\n'.join(cleaned_lines)
        
        # Final cleanup
        cleaned_text = re.sub(r'\s+', ' ', cleaned_text)
        cleaned_text = cleaned_text.strip()
        
        return cleaned_text
    
    def _ocr_pdf_extraction(self, file_path):
        """
        OCR-based PDF extraction using pymupdf (fitz) or other OCR libraries
        This is a fallback for scanned PDFs or when other methods fail
        """
        text_chunks = []
        tables = []
        
        # Try with pymupdf (fitz) first - good for both text and OCR
        if PYMUPDF_AVAILABLE and fitz:
            try:
                doc = fitz.open(file_path)
                for page_num in range(len(doc)):
                    page = doc.load_page(page_num)
                    
                    # Try text extraction first (for non-scanned pages)
                    text = page.get_text()
                    
                    # If very little text found, try OCR
                    if len(text.strip()) < 50:
                        try:
                            # Get page as image and OCR it
                            mat = fitz.Matrix(2, 2)  # 2x zoom for better OCR
                            pix = page.get_pixmap(matrix=mat)
                            
                            # Try with pytesseract if available
                            if PYTESSERACT_AVAILABLE:
                                try:
                                    import pytesseract
                                    from PIL import Image
                                    import io
                                    
                                    img_data = pix.tobytes("ppm")
                                    img = Image.open(io.BytesIO(img_data))
                                    text = pytesseract.image_to_string(img)
                                except Exception as ocr_error:
                                    logger.warning(f"pytesseract OCR failed for page {page_num}: {ocr_error}")
                                    text = page.get_text()  # Fallback to basic text
                            else:
                                logger.warning("pytesseract not available for OCR")
                                text = page.get_text()  # Fallback to basic text
                        except Exception as ocr_error:
                            logger.warning(f"OCR failed for page {page_num}: {ocr_error}")
                            text = page.get_text()  # Fallback to basic text
                    
                    # Clean and chunk the text
                    if text and len(text.strip()) > 10:
                        clean_text = self._clean_pdf_text(text)
                        if clean_text:
                            page_content = f"Page {page_num + 1}:\n\n{clean_text}"
                            text_chunks.extend(self.chunk_text(page_content))
                    
                    # Try to extract tables using pymupdf
                    try:
                        tables_on_page = page.find_tables()
                        for i, table in enumerate(tables_on_page):
                            table_data = table.extract()
                            if table_data and len(table_data) > 1:
                                markdown_table = self._table_to_markdown(table_data)
                                if markdown_table:
                                    tables.append(f"Page {page_num + 1} Table {i + 1}:\n\n{markdown_table}")
                    except Exception as table_error:
                        logger.warning(f"Table extraction failed for page {page_num}: {table_error}")
                
                doc.close()
                
                if text_chunks:
                    return {"text_chunks": text_chunks, "tables": tables}
                    
            except Exception as e:
                logger.warning(f"pymupdf OCR extraction failed: {e}")
        else:
            logger.warning("pymupdf (fitz) not available for OCR extraction")
        
        # Fallback OCR with pdf2image + pytesseract
        if PDF2IMAGE_AVAILABLE and PYTESSERACT_AVAILABLE:
            try:
                from pdf2image import convert_from_path
                import pytesseract
                
                logger.info("Attempting OCR with pdf2image + pytesseract...")
                
                # Convert PDF to images
                images = convert_from_path(file_path, dpi=200)
                
                for i, image in enumerate(images):
                    try:
                        # Extract text using OCR
                        text = pytesseract.image_to_string(image, lang='eng')
                        
                        if text and len(text.strip()) > 10:
                            clean_text = self._clean_pdf_text(text)
                            if clean_text:
                                page_content = f"Page {i + 1} (OCR):\n\n{clean_text}"
                                text_chunks.extend(self.chunk_text(page_content))
                                
                    except Exception as page_error:
                        logger.warning(f"OCR failed for page {i + 1}: {page_error}")
                
                if text_chunks:
                    return {"text_chunks": text_chunks, "tables": tables}
                    
            except Exception as e:
                logger.warning(f"pdf2image OCR extraction failed: {e}")
        else:
            logger.warning("pdf2image or pytesseract not available for OCR")
        
        return {"text_chunks": [], "tables": []}
    
    def _convert_to_pdf(self, input_file):
        """Convert a document to PDF using LibreOffice"""
        input_path = Path(input_file)
        output_pdf = self.temp_dir / f"{input_path.stem}.pdf"
        
        try:
            # Use LibreOffice to convert to PDF
            cmd = [
                'libreoffice', '--headless', '--convert-to', 'pdf',
                '--outdir', str(self.temp_dir),
                str(input_path.absolute())
            ]
            
            process = subprocess.run(
                cmd,
                capture_output=True,
                text=True,
                timeout=60  # 1 minute timeout
            )
            
            if process.returncode == 0 and output_pdf.exists():
                logger.info(f"Successfully converted to PDF: {output_pdf}")
                return output_pdf
            else:
                logger.error(f"LibreOffice conversion failed: {process.stderr}")
                return None
                
        except subprocess.TimeoutExpired:
            logger.error(f"Timeout while converting: {input_path}")
            return None
        except Exception as e:
            logger.error(f"Failed to convert with LibreOffice: {str(e)}")
            return None
    
    def _process_pdf_document(self, file_path):
        """
        Enhanced PDF processing with multiple extraction strategies and table detection
        Uses a cascading approach to get the best possible text and table extraction
        """
        logger.info(f"Processing PDF: {file_path}")
        
        all_results = []
        best_result = {"text_chunks": [], "tables": []}
        
        # Strategy 1: Enhanced llmsherpa processing
        if LLMSHERPA_AVAILABLE and LayoutPDFReader:
            try:
                logger.info("🔍 Attempting llmsherpa extraction...")
                pdf_reader = LayoutPDFReader(self.llmsherpa_api_url)
                doc = pdf_reader.read_pdf(str(file_path))
                
                text_chunks = []
                tables = []
                
                # Enhanced section extraction
                sections_found = False
                for section in doc.sections():
                    if section.title and section.to_text():
                        sections_found = True
                        section_text = section.to_text().strip()
                        if len(section_text) > 10:  # Only meaningful content
                            chunk_text = f"## {section.title}\n\n{section_text}"
                            text_chunks.extend(self.chunk_text(chunk_text))
                
                # Enhanced chunk extraction with better filtering
                chunks_found = False
                for chunk in doc.chunks():
                    chunk_text = chunk.to_text().strip()
                    if len(chunk_text) > 20:  # Filter out very short chunks
                        chunks_found = True
                        text_chunks.extend(self.chunk_text(chunk_text))
                
                # Extract tables if available
                try:
                    for table in doc.tables():
                        table_data = table.to_text()
                        if table_data and len(table_data) > 10:
                            tables.append(table_data)
                except:
                    pass
                
                # Get full document text as fallback
                if not text_chunks:
                    full_text = doc.to_text().strip()
                    if full_text and len(full_text) > 50:
                        text_chunks.extend(self.chunk_text(full_text))
                
                if text_chunks:
                    result = {"text_chunks": text_chunks, "tables": tables}
                    all_results.append(("llmsherpa", result))
                    logger.info(f"✅ llmsherpa: {len(text_chunks)} chunks, {len(tables)} tables")
                    self._save_extraction_debug_log(file_path, result, "llmsherpa")
                    
                    # If we got substantial content, use this as our best result
                    total_text_length = sum(len(chunk) for chunk in text_chunks)
                    if total_text_length > 200:
                        best_result = result
                
            except Exception as e:
                logger.warning(f"llmsherpa failed for {file_path}: {e}")
        
        # Strategy 2: Enhanced pdfplumber with table extraction
        try:
            import pdfplumber
            logger.info("🔍 Attempting pdfplumber extraction...")
            
            text_chunks = []
            tables = []
            
            with pdfplumber.open(file_path) as pdf:
                for i, page in enumerate(pdf.pages):
                    try:
                        # Extract text
                        page_text = page.extract_text()
                        if page_text and len(page_text.strip()) > 20:
                            clean_text = self._clean_pdf_text(page_text)
                            if clean_text:
                                page_content = f"Page {i+1}:\n\n{clean_text}"
                                text_chunks.extend(self.chunk_text(page_content))
                        
                        # Extract tables
                        page_tables = page.extract_tables()
                        if page_tables:
                            for j, table in enumerate(page_tables):
                                if table and len(table) > 1:  # Must have header + data
                                    markdown_table = self._table_to_markdown(table)
                                    if markdown_table:
                                        tables.append(f"Page {i+1} Table {j+1}:\n\n{markdown_table}")
                    
                    except Exception as page_error:
                        logger.warning(f"Error processing page {i+1} with pdfplumber: {page_error}")
            
            if text_chunks:
                result = {"text_chunks": text_chunks, "tables": tables}
                all_results.append(("pdfplumber", result))
                logger.info(f"✅ pdfplumber: {len(text_chunks)} chunks, {len(tables)} tables")
                self._save_extraction_debug_log(file_path, result, "pdfplumber")
                
                # Compare with current best result
                total_text_length = sum(len(chunk) for chunk in text_chunks)
                current_best_length = sum(len(chunk) for chunk in best_result.get('text_chunks', []))
                
                if total_text_length > current_best_length:
                    best_result = result
                
        except ImportError:
            logger.warning("pdfplumber not available")
        except Exception as e:
            logger.warning(f"pdfplumber processing failed: {e}")
        
        # Strategy 3: Enhanced PyPDF2 with better text cleaning
        if PYPDF2_AVAILABLE and PyPDF2:
            try:
                logger.info("🔍 Attempting PyPDF2 extraction...")
                text_chunks = []
                
                with open(file_path, 'rb') as file:
                    pdf_reader = PyPDF2.PdfReader(file)
                    
                    for i, page in enumerate(pdf_reader.pages):
                        try:
                            page_text = page.extract_text()
                            if page_text and len(page_text.strip()) > 20:
                                clean_text = self._clean_pdf_text(page_text)
                                if clean_text:
                                    page_content = f"Page {i+1}:\n\n{clean_text}"
                                    text_chunks.extend(self.chunk_text(page_content))
                        except Exception as page_error:
                            logger.warning(f"Error extracting page {i+1}: {page_error}")
                
                if text_chunks:
                    result = {"text_chunks": text_chunks, "tables": []}
                    all_results.append(("pypdf2", result))
                    logger.info(f"✅ PyPDF2: {len(text_chunks)} chunks")
                    self._save_extraction_debug_log(file_path, result, "pypdf2")
                    
                    # Compare with current best result (only if we don't have a good result yet)
                    total_text_length = sum(len(chunk) for chunk in text_chunks)
                    current_best_length = sum(len(chunk) for chunk in best_result.get('text_chunks', []))
                    
                    if current_best_length < 100 and total_text_length > current_best_length:
                        best_result = result
                
            except Exception as e:
                logger.warning(f"PyPDF2 processing failed: {e}")
        
        # Strategy 4: OCR fallback for scanned PDFs or difficult layouts
        try:
            logger.info("🔍 Attempting OCR extraction as fallback...")
            ocr_result = self._ocr_pdf_extraction(file_path)
            if ocr_result and ocr_result.get('text_chunks'):
                all_results.append(("ocr", ocr_result))
                logger.info(f"✅ OCR: {len(ocr_result['text_chunks'])} chunks")
                self._save_extraction_debug_log(file_path, ocr_result, "ocr")
                
                # Use OCR if we still don't have good results
                current_best_length = sum(len(chunk) for chunk in best_result.get('text_chunks', []))
                ocr_text_length = sum(len(chunk) for chunk in ocr_result.get('text_chunks', []))
                
                if current_best_length < 100 and ocr_text_length > current_best_length:
                    best_result = ocr_result
                    
        except Exception as e:
            logger.warning(f"OCR processing failed: {e}")
        
        # Select and return the best result
        if best_result.get('text_chunks'):
            # Combine results if we have good tables from one method and good text from another
            final_result = best_result.copy()
            
            # Look for the best table extraction across all methods
            all_tables = []
            for method_name, result in all_results:
                if result.get('tables'):
                    all_tables.extend(result['tables'])
            
            if all_tables and len(all_tables) > len(final_result.get('tables', [])):
                final_result['tables'] = all_tables
            
            logger.info(f"🎯 Final PDF result: {len(final_result['text_chunks'])} chunks, {len(final_result.get('tables', []))} tables")
            self._save_extraction_debug_log(file_path, final_result, "best-combined")
            return final_result
        
        # If all methods failed
        logger.warning(f"All PDF extraction methods failed for {file_path}")
        result = {"text_chunks": [], "tables": [], "error": "PDF processing failed - all extraction methods unsuccessful"}
        self._save_extraction_debug_log(file_path, result, "all-failed")
        return result
    
    def _process_word_document(self, file_path):
        """Process Word documents"""
        logger.info(f"Processing Word document: {file_path}")
        
        # First try python-docx for .docx files
        if DOCX_AVAILABLE and docx and file_path.suffix.lower() == '.docx':
            try:
                doc = docx.Document(file_path)
                text_chunks = []
                
                # Extract paragraphs
                full_text = ""
                for paragraph in doc.paragraphs:
                    if paragraph.text.strip():
                        full_text += paragraph.text + "\n"
                
                # Extract tables
                tables = []
                for table in doc.tables:
                    table_data = []
                    for row in table.rows:
                        row_data = [cell.text.strip() for cell in row.cells]
                        table_data.append(row_data)
                    
                    if table_data:
                        # Convert to markdown
                        markdown_table = self._table_to_markdown(table_data)
                        tables.append(markdown_table)
                
                # Chunk the text
                if full_text:
                    text_chunks.extend(self.chunk_text(full_text))
                
                logger.info(f"✅ python-docx processed Word doc with {len(text_chunks)} text chunks and {len(tables)} tables")
                result = {"text_chunks": text_chunks, "tables": tables}
                self._save_extraction_debug_log(file_path, result, "python-docx")
                return result
                
            except Exception as e:
                logger.warning(f"python-docx failed for {file_path}: {e}")
        
        # Fallback: convert to PDF and process
        logger.info(f"Falling back to PDF conversion for Word document: {file_path}")
        pdf_path = self._convert_to_pdf(file_path)
        if pdf_path:
            result = self._process_pdf_document(pdf_path)
            # Clean up temporary PDF
            try:
                pdf_path.unlink()
            except:
                pass
            # Update debug log to show it was converted from Word
            if 'text_chunks' in result or 'tables' in result:
                self._save_extraction_debug_log(file_path, result, "word-to-pdf-conversion")
            return result
        
        result = {"text_chunks": [], "tables": [], "error": "Word document processing failed"}
        self._save_extraction_debug_log(file_path, result, "word-failed")
        return result
    
    def _process_powerpoint(self, file_path):
        """Process PowerPoint documents"""
        logger.info(f"Processing PowerPoint: {file_path}")
        
        # Try python-pptx for .pptx files
        if PPTX_AVAILABLE and pptx and file_path.suffix.lower() == '.pptx':
            try:
                presentation = pptx.Presentation(file_path)
                text_chunks = []
                tables = []
                
                for i, slide in enumerate(presentation.slides):
                    slide_text = []
                    
                    # Extract text from shapes
                    for shape in slide.shapes:
                        if hasattr(shape, 'text') and shape.text:
                            slide_text.append(shape.text)
                        
                        # Extract tables
                        if hasattr(shape, 'has_table') and shape.has_table:
                            table_data = []
                            for row in shape.table.rows:
                                row_data = [cell.text.strip() for cell in row.cells]
                                table_data.append(row_data)
                            
                            if table_data:
                                markdown_table = self._table_to_markdown(table_data)
                                tables.append(f"Slide {i+1} Table:\n{markdown_table}")
                    
                    # Combine slide text
                    if slide_text:
                        slide_content = f"Slide {i+1}:\n" + "\n".join(slide_text)
                        text_chunks.extend(self.chunk_text(slide_content))
                
                logger.info(f"✅ python-pptx processed PowerPoint with {len(text_chunks)} text chunks and {len(tables)} tables")
                result = {"text_chunks": text_chunks, "tables": tables}
                self._save_extraction_debug_log(file_path, result, "python-pptx")
                return result
                
            except Exception as e:
                logger.warning(f"python-pptx failed for {file_path}: {e}")
        
        # Fallback: convert to PDF and process
        logger.info(f"Falling back to PDF conversion for PowerPoint: {file_path}")
        pdf_path = self._convert_to_pdf(file_path)
        if pdf_path:
            result = self._process_pdf_document(pdf_path)
            # Clean up temporary PDF
            try:
                pdf_path.unlink()
            except:
                pass
            # Update debug log to show it was converted from PowerPoint
            if 'text_chunks' in result or 'tables' in result:
                self._save_extraction_debug_log(file_path, result, "powerpoint-to-pdf-conversion")
            return result
        
        result = {"text_chunks": [], "tables": [], "error": "PowerPoint processing failed"}
        self._save_extraction_debug_log(file_path, result, "powerpoint-failed")
        return result
    
    def _process_excel(self, file_path):
        """Process Excel documents"""
        logger.info(f"Processing Excel: {file_path}")
        
        if PANDAS_AVAILABLE and pd:
            try:
                # Read all sheets
                excel_file = pd.ExcelFile(file_path)
                text_chunks = []
                tables = []
                
                for sheet_name in excel_file.sheet_names:
                    try:
                        df = pd.read_excel(file_path, sheet_name=sheet_name)
                        
                        # Convert DataFrame to markdown table
                        if not df.empty:
                            # Add sheet name as header
                            table_text = f"Sheet: {sheet_name}\n\n"
                            table_text += df.to_markdown(index=False)
                            tables.append(table_text)
                            
                            # Also create a text summary
                            summary = f"Sheet '{sheet_name}' contains {len(df)} rows and {len(df.columns)} columns.\n"
                            summary += f"Columns: {', '.join(df.columns.astype(str))}\n"
                            text_chunks.extend(self.chunk_text(summary))
                    
                    except Exception as sheet_error:
                        logger.warning(f"Error processing sheet {sheet_name}: {sheet_error}")
                
                logger.info(f"✅ pandas processed Excel with {len(text_chunks)} text chunks and {len(tables)} tables")
                result = {"text_chunks": text_chunks, "tables": tables}
                self._save_extraction_debug_log(file_path, result, "pandas")
                return result
                
            except Exception as e:
                logger.warning(f"pandas Excel processing failed for {file_path}: {e}")
        
        # Fallback: try with xlrd for older Excel files
        try:
            import xlrd
            
            workbook = xlrd.open_workbook(file_path)
            text_chunks = []
            tables = []
            
            for sheet_name in workbook.sheet_names():
                sheet = workbook.sheet_by_name(sheet_name)
                
                # Extract data as table
                table_data = []
                for row_idx in range(sheet.nrows):
                    row_data = []
                    for col_idx in range(sheet.ncols):
                        cell_value = sheet.cell_value(row_idx, col_idx)
                        row_data.append(str(cell_value))
                    table_data.append(row_data)
                
                if table_data:
                    # Convert to markdown
                    markdown_table = self._table_to_markdown(table_data)
                    tables.append(f"Sheet: {sheet_name}\n\n{markdown_table}")
                    
                    # Add summary
                    summary = f"Sheet '{sheet_name}' contains {sheet.nrows} rows and {sheet.ncols} columns.\n"
                    text_chunks.extend(self.chunk_text(summary))
            
            logger.info(f"✅ xlrd processed Excel with {len(text_chunks)} text chunks and {len(tables)} tables")
            result = {"text_chunks": text_chunks, "tables": tables}
            self._save_extraction_debug_log(file_path, result, "xlrd")
            return result
            
        except ImportError:
            logger.warning("xlrd not available for Excel processing")
        except Exception as e:
            logger.warning(f"xlrd Excel processing failed: {e}")
        
        result = {"text_chunks": [], "tables": [], "error": "Excel processing failed - no Excel processing libraries available"}
        self._save_extraction_debug_log(file_path, result, "excel-failed")
        return result
    
    def _table_to_markdown(self, table_data):
        """Convert a 2D array to a markdown table"""
        if not table_data or not table_data[0]:
            return ""
        
        # Create header
        markdown = "| " + " | ".join([str(cell) for cell in table_data[0]]) + " |\n"
        # Add separator line
        markdown += "| " + " | ".join(["---" for _ in table_data[0]]) + " |\n"
        
        # Add data rows
        for row in table_data[1:]:
            markdown += "| " + " | ".join([str(cell) for cell in row]) + " |\n"
        
        return markdown
    
    def process_document(self, file_path):
        """
        Process a document and extract text content
        
        Args:
            file_path: Path to the document file
            
        Returns:
            dict: {
                'text_chunks': [list of text chunks],
                'tables': [list of table content],
                'metadata': {file info},
                'error': optional error message
            }
        """
        try:
            file_path = Path(file_path)
            
            # Validate file exists
            if not file_path.exists():
                return {"error": "File not found"}
            
            # Get file type
            file_type = self._get_file_type(file_path)
            
            if file_type == "unknown":
                return {"error": f"Unsupported file type: {file_path.suffix}"}
            
            # Process based on file type
            if file_type == "pdf":
                result = self._process_pdf_document(file_path)
            elif file_type == "word":
                result = self._process_word_document(file_path)
            elif file_type == "powerpoint":
                result = self._process_powerpoint(file_path)
            elif file_type == "excel":
                result = self._process_excel(file_path)
            else:
                return {"error": f"Unknown file type: {file_type}"}
            
            # Add metadata
            result['metadata'] = {
                'filename': file_path.name,
                'file_type': file_type,
                'size': file_path.stat().st_size,
                'extension': file_path.suffix.lower()
            }
            
            # Sanitize all text content
            if 'text_chunks' in result:
                result['text_chunks'] = [self.sanitize_text(chunk) for chunk in result['text_chunks']]
            if 'tables' in result:
                result['tables'] = [self.sanitize_text(table) for table in result['tables']]
            
            # Save final processing summary if not already saved by specific method
            if not any(key in result for key in ['debug_logged']):
                self._save_extraction_debug_log(file_path, result, f"final-{file_type}")
            
            return result
            
        except Exception as e:
            logger.error(f"Error processing document {file_path}: {e}")
            error_result = {"error": f"Processing failed: {str(e)}"}
            self._save_extraction_debug_log(file_path, error_result, "exception")
            return error_result
    
    def get_combined_text(self, processed_result):
        """
        Get combined text content from processed document result
        
        Args:
            processed_result: Result from process_document()
            
        Returns:
            str: Combined text content
        """
        if 'error' in processed_result:
            return ""
        
        combined_text = ""
        
        # Add text chunks
        if processed_result.get('text_chunks'):
            combined_text += "\n\n".join(processed_result['text_chunks'])
        
        # Add tables
        if processed_result.get('tables'):
            if combined_text:
                combined_text += "\n\n"
            combined_text += "\n\n".join(processed_result['tables'])
        
        return combined_text.strip()

Parameters

Name Type Default Kind
bases - -

Parameter Details

bases: Parameter of type

Return Value

Returns unspecified type

Class Interface

Methods

__init__(self, temp_dir, llmsherpa_api_url)

Purpose: Initialize the document processor Args: temp_dir: Directory for temporary files (optional) llmsherpa_api_url: URL for llmsherpa API

Parameters:

  • temp_dir: Parameter
  • llmsherpa_api_url: Parameter

Returns: None

_log_available_capabilities(self)

Purpose: Log which document processing libraries are available

Returns: None

get_available_formats(self)

Purpose: Get list of supported file formats based on available libraries

Returns: None

_save_extraction_debug_log(self, file_path, processed_result, extraction_method)

Purpose: Save extraction results to debug log file for performance analysis Args: file_path: Original file path processed_result: Result from document processing extraction_method: Method used for extraction (e.g., 'llmsherpa', 'pypdf2', 'python-docx')

Parameters:

  • file_path: Parameter
  • processed_result: Parameter
  • extraction_method: Parameter

Returns: None

_get_file_extension(self, file_path)

Purpose: Get lowercase file extension including the dot

Parameters:

  • file_path: Parameter

Returns: None

_get_file_type(self, file_path)

Purpose: Determine file type based on extension

Parameters:

  • file_path: Parameter

Returns: None

count_tokens(self, text)

Purpose: Count tokens in text using tiktoken

Parameters:

  • text: Parameter

Returns: None

sanitize_text(self, text)

Purpose: Sanitize text by encoding to UTF-8 with error replacement

Parameters:

  • text: Parameter

Returns: None

chunk_text(self, text, max_chunk_size, overlap)

Purpose: Split text into chunks with overlap for better context preservation Args: text: Text to chunk max_chunk_size: Maximum characters per chunk overlap: Character overlap between chunks Returns: List of text chunks

Parameters:

  • text: Parameter
  • max_chunk_size: Parameter
  • overlap: Parameter

Returns: See docstring for return details

_clean_pdf_text(self, text)

Purpose: Clean and normalize PDF extracted text Removes excessive whitespace, fixes common OCR issues, and improves readability

Parameters:

  • text: Parameter

Returns: None

_ocr_pdf_extraction(self, file_path)

Purpose: OCR-based PDF extraction using pymupdf (fitz) or other OCR libraries This is a fallback for scanned PDFs or when other methods fail

Parameters:

  • file_path: Parameter

Returns: None

_convert_to_pdf(self, input_file)

Purpose: Convert a document to PDF using LibreOffice

Parameters:

  • input_file: Parameter

Returns: None

_process_pdf_document(self, file_path)

Purpose: Enhanced PDF processing with multiple extraction strategies and table detection Uses a cascading approach to get the best possible text and table extraction

Parameters:

  • file_path: Parameter

Returns: None

_process_word_document(self, file_path)

Purpose: Process Word documents

Parameters:

  • file_path: Parameter

Returns: None

_process_powerpoint(self, file_path)

Purpose: Process PowerPoint documents

Parameters:

  • file_path: Parameter

Returns: None

_process_excel(self, file_path)

Purpose: Process Excel documents

Parameters:

  • file_path: Parameter

Returns: None

_table_to_markdown(self, table_data)

Purpose: Convert a 2D array to a markdown table

Parameters:

  • table_data: Parameter

Returns: None

process_document(self, file_path)

Purpose: Process a document and extract text content Args: file_path: Path to the document file Returns: dict: { 'text_chunks': [list of text chunks], 'tables': [list of table content], 'metadata': {file info}, 'error': optional error message }

Parameters:

  • file_path: Parameter

Returns: See docstring for return details

get_combined_text(self, processed_result)

Purpose: Get combined text content from processed document result Args: processed_result: Result from process_document() Returns: str: Combined text content

Parameters:

  • processed_result: Parameter

Returns: See docstring for return details

Required Imports

import os
import tempfile
import logging
from pathlib import Path
from uuid import uuid4

Usage Example

# Example usage:
# result = DocumentProcessor(bases)

Similar Components

AI-powered semantic similarity - components with related functionality:

  • class DocumentProcessor_v7 67.3% similar

    Process different document types for indexing

    From: /tf/active/vicechatdev/docchat/document_indexer.py
  • class DocumentProcessor 64.9% similar

    A comprehensive document processing class that converts documents to PDF, adds audit trails, applies security features (watermarks, signatures, hashing), and optionally converts to PDF/A format with document protection.

    From: /tf/active/vicechatdev/document_auditor/src/document_processor.py
  • class DocumentProcessor_v5 62.2% similar

    Process different document types for RAG context extraction

    From: /tf/active/vicechatdev/offline_docstore_multi.py
  • class DocumentProcessor_v3 62.2% similar

    Handles document processing and text extraction using llmsherpa (same approach as offline_docstore_multi_vice.py).

    From: /tf/active/vicechatdev/docchat/document_processor.py
  • function api_chat_upload_document 62.1% similar

    Flask API endpoint that handles document upload for chat context, processes the document to extract text content, and stores it for later retrieval in chat sessions.

    From: /tf/active/vicechatdev/vice_ai/complex_app.py
← Back to Browse