🔍 Code Extractor

class DocumentProcessor_v5

Maturity: 26

Process different document types for RAG context extraction

File:
/tf/active/vicechatdev/offline_docstore_multi.py
Lines:
189 - 1177
Complexity:
moderate

Purpose

Process different document types for RAG context extraction

Source Code

class DocumentProcessor:
    """Process different document types for RAG context extraction"""
    
    # Supported file extensions by type
    WORD_EXTENSIONS = ['.doc', '.docx', '.docm', '.dot', '.dotx', '.dotm', '.rtf', '.odt']
    PPT_EXTENSIONS = ['.ppt', '.pptx', '.pptm', '.pot', '.potx', '.potm', '.pps', '.ppsx', '.odp']
    EXCEL_EXTENSIONS = ['.xls', '.xlsx', '.xlsm', '.xlt', '.xltx', '.xltm', '.xlsb', '.ods']
    PDF_EXTENSIONS = ['.pdf']
    
    def __init__(self, input_dir, output_dir, temp_dir=None, llmsherpa_api_url=None):
        """
        Initialize the document processor
        
        Args:
            input_dir: Directory with source documents
            output_dir: Directory to save processed chunks
            temp_dir: Directory for temporary files (optional)
            llmsherpa_api_url: URL for llmsherpa API
        """
        self.input_dir = Path(input_dir).absolute()
        self.output_dir = Path(output_dir).absolute()
        self.temp_dir = Path(temp_dir) if temp_dir else Path(tempfile.mkdtemp())
        self.llmsherpa_api_url = llmsherpa_api_url or "http://llmsherpa:5001/api/parseDocument?renderFormat=all&useNewIndentParser=yes"
        
        # Create directories if they don't exist
        os.makedirs(self.output_dir, exist_ok=True)
        os.makedirs(self.temp_dir, exist_ok=True)
        
    def _get_file_extension(self, file_path):
        """Get lowercase file extension including the dot"""
        return Path(file_path).suffix.lower()
    
    def _is_valid_file(self, file_path):
        """Check if a file appears to be valid and processable with better format detection"""
        try:
            # Check if file exists and has non-zero size
            path = Path(file_path)
            if not path.exists() or path.stat().st_size == 0:
                logger.warning(f"File doesn't exist or is empty: {file_path}")
                return False
            
            # Check if file is readable
            with open(file_path, 'rb') as f:
                # Read first few bytes to see if it's readable
                header = f.read(16)
                if not header:
                    logger.warning(f"File appears unreadable: {file_path}")
                    return False
                
                # Additional checks for specific file types
                extension = path.suffix.lower()
                
                # PowerPoint files (.pptx) should start with PK header (zip file)
                if extension == '.pptx':
                    if header[:2] != b'PK':
                        logger.warning(f"File has .pptx extension but isn't a ZIP file: {file_path}")
                        return True  # Still return True to allow fallback handling
                
                # Excel files (.xlsx) should start with PK header (zip file)
                if extension == '.xlsx':
                    if header[:2] != b'PK':
                        logger.warning(f"File has .xlsx extension but isn't a ZIP file. " 
                                      f"It might be an old format Excel file: {file_path}")
                        return True  # Still return True to allow fallback handling
                    
                # Check if the file is an old Excel format (.xls) with the wrong extension
                # Old Excel files often start with D0CF11E0 (in hex)
                if extension == '.xlsx' and header[:8] == b'\xD0\xCF\x11\xE0\xA1\xB1\x1A\xE1':
                    logger.warning(f"File has .xlsx extension but appears to be an old .xls format: {file_path}")
                    return True  # Return True to allow processing with xlrd
            
            return True
        except Exception as e:
            logger.error(f"Error checking file validity: {file_path} - {str(e)}")
            return False
    
    def _get_file_type(self, file_path):
        """Determine file type based on extension"""
        ext = self._get_file_extension(file_path)
        
        if ext in self.WORD_EXTENSIONS:
            return "word"
        elif ext in self.PPT_EXTENSIONS:
            return "powerpoint"
        elif ext in self.EXCEL_EXTENSIONS:
            return "excel"
        elif ext in self.PDF_EXTENSIONS:
            return "pdf"
        else:
            return "unknown"
    
    def _convert_to_pdf(self, input_file):
        """Convert a document to PDF using LibreOffice with unoconv fallback"""
        input_path = Path(input_file)
        output_pdf = self.temp_dir / f"{input_path.stem}.pdf"
        
        # First attempt: Use LibreOffice directly
        try:
            # First check if the input file exists
            if not input_path.exists():
                logger.error(f"Input file does not exist: {input_path}")
                return None
            
            # Make sure the temp directory exists
            os.makedirs(self.temp_dir, exist_ok=True)
            
            # Absolute paths to avoid directory issues
            abs_input = input_path.absolute()
            abs_output_dir = self.temp_dir.absolute()
            
            logger.info(f"Converting {abs_input} to PDF in {abs_output_dir}")
            
            # Use LibreOffice for conversion with expanded command
            cmd = [
                'libreoffice',
                '--headless',
                '--norestore',
                '--nofirststartwizard',
                '--convert-to', 'pdf',
                '--outdir', str(abs_output_dir),
                str(abs_input)
            ]
            
            # Run with increased timeout
            process = subprocess.run(
                cmd,
                capture_output=True,
                text=True,
                timeout=180  # 3 minute timeout
            )
            
            # Check if there was an error
            if process.returncode != 0:
                logger.error(f"LibreOffice error: {process.stderr}")
                # Don't return None here, try unoconv instead
            else:
                # Verify the file was actually created
                if not output_pdf.exists():
                    # Sometimes LibreOffice creates files with slightly different names
                    # Try to find a matching PDF
                    potential_pdfs = list(self.temp_dir.glob(f"{input_path.stem}*.pdf"))
                    if potential_pdfs:
                        output_pdf = potential_pdfs[0]
                        logger.info(f"Found PDF with alternative name: {output_pdf}")
                        return output_pdf
                    # If no file was found, continue to unoconv
                else:
                    logger.info(f"Successfully converted to PDF with LibreOffice: {output_pdf}")
                    return output_pdf
                    
            # If we get here, LibreOffice failed or didn't create the file
            logger.info(f"Trying alternative conversion with unoconv for {input_path}")
            
            # Second attempt: Use unoconv
            try:
                alt_output_pdf = self.temp_dir / f"{input_path.stem}_unoconv.pdf"
                
                # Run unoconv with timeout
                unoconv_cmd = [
                    'unoconv',
                    '-f', 'pdf',
                    '-o', str(alt_output_pdf),
                    str(abs_input)
                ]
                
                unoconv_process = subprocess.run(
                    unoconv_cmd,
                    capture_output=True,
                    text=True,
                    timeout=180  # 3 minute timeout
                )
                
                if unoconv_process.returncode != 0:
                    logger.error(f"Unoconv error: {unoconv_process.stderr}")
                    return None
                
                if alt_output_pdf.exists():
                    logger.info(f"Successfully converted to PDF with unoconv: {alt_output_pdf}")
                    return alt_output_pdf
                else:
                    # Check for alternative names again
                    potential_pdfs = list(self.temp_dir.glob(f"{input_path.stem}*unoconv*.pdf"))
                    if potential_pdfs:
                        alt_output_pdf = potential_pdfs[0]
                        logger.info(f"Found PDF with alternative name from unoconv: {alt_output_pdf}")
                        return alt_output_pdf
                    
                    logger.error(f"Unoconv did not create a PDF file.")
                    return None
            except subprocess.TimeoutExpired:
                logger.error(f"Timeout while converting with unoconv: {input_path}")
                return None
            except Exception as unoconv_err:
                logger.error(f"Failed to convert with unoconv: {str(unoconv_err)}")
                return None
                
        except subprocess.TimeoutExpired:
            logger.error(f"Timeout while converting with LibreOffice: {input_path}")
            
            # Try unoconv as a fallback for timeout
            try:
                logger.info(f"Trying unoconv after LibreOffice timeout for {input_path}")
                alt_output_pdf = self.temp_dir / f"{input_path.stem}_unoconv.pdf"
                
                subprocess.run(
                    ['unoconv', '-f', 'pdf', '-o', str(alt_output_pdf), str(input_path)],
                    check=True,
                    timeout=180
                )
                
                if alt_output_pdf.exists():
                    logger.info(f"Successfully converted to PDF with unoconv after timeout: {alt_output_pdf}")
                    return alt_output_pdf
                else:
                    return None
            except:
                return None
                
        except Exception as e:
            logger.error(f"Failed to convert with LibreOffice: {str(e)}")
            
            # Try unoconv as a fallback for general errors
            try:
                logger.info(f"Trying unoconv after LibreOffice error for {input_path}")
                alt_output_pdf = self.temp_dir / f"{input_path.stem}_unoconv.pdf"
                
                subprocess.run(
                    ['unoconv', '-f', 'pdf', '-o', str(alt_output_pdf), str(input_path)],
                    check=True,
                    timeout=180
                )
                
                if alt_output_pdf.exists():
                    logger.info(f"Successfully converted to PDF with unoconv after error: {alt_output_pdf}")
                    return alt_output_pdf
                else:
                    return None
            except Exception as unoconv_err:
                logger.error(f"All conversion methods failed for {input_path}: {str(unoconv_err)}")
                return None
    
    def _process_word_document(self, file_path):
        """Process Word documents by converting to PDF and using llmsherpa"""
        logger.info(f"Processing Word document: {file_path}")
        
        # Convert to PDF
        pdf_path = self._convert_to_pdf(file_path)
        if not pdf_path:
            return None
        
        # Process using llmsherpa (same as PDF processing)
        return self._process_pdf_document(pdf_path)
    
    def _process_pdf_document(self, file_path):
        """Process PDF documents using llmsherpa"""
        logger.info(f"Processing PDF: {file_path}")
        
            # Use llmsherpa to extract content
        llmsherpa_api_url = "http://llmsherpa:5001/api/parseDocument?renderFormat=all&useNewIndentParser=yes"
        pdf_reader = LayoutPDFReader(llmsherpa_api_url)
        min_chunk_len=4000
        try:
            doc = pdf_reader.read_pdf(str(file_path))
        except Exception as e:
            print(f"Error processing PDF {file_path}: {str(e)}")
            return None

        try:
            text_chunks=[]
            table_chunks=[]
            text_chunk_interim=""
            for d in doc.chunks():
                ## Adding a minimum chunk lenght
                if isinstance(d,llmsherpa.readers.layout_reader.Paragraph):
                    clean_text=d.to_text().replace("- ","").replace("\n","")   
                    text_chunk_interim=clean_text if text_chunk_interim=="" else text_chunk_interim+"\n"+clean_text
                    if len(text_chunk_interim)>min_chunk_len:
                        newuid=str(uuid4())
                        text_chunks.append([d.parent_text(),text_chunk_interim,newuid])
                        text_chunk_interim=""
                if isinstance(d,llmsherpa.readers.layout_reader.Table):
                    newuid=str(uuid4())
                    table_chunks.append([d.parent_text(),d.to_html(),d.to_text(),newuid])
            if text_chunk_interim!="":
                newuid=str(uuid4())
                text_chunks.append([d.parent_text(),text_chunk_interim,newuid])
                
            return {"text_chunks": text_chunks, "table_chunks": table_chunks}
            
        except Exception as e:
            print(f"Error processing PDF {file_path}: {str(e)}")
            return None
    
    def _process_powerpoint(self, file_path):
        logger.info(f"Processing PowerPoint: {file_path}")
        print("processing ", file_path)
        
        # First try the standard python-pptx approach
        try:
            text_chunks = []
            table_chunks = []
            
            # Try to open the presentation with increased timeout and error handling
            try:
                presentation = pptx.Presentation(file_path)
            except Exception as e:
                logger.error(f"Error opening PowerPoint with python-pptx: {str(e)}")
                # Fall back to conversion method
                return self._process_powerpoint_via_pdf(file_path)
            
            # Process each slide
            for i, slide in enumerate(presentation.slides):
                # Get slide title or default
                try:
                    slide_title = next((shape.text for shape in slide.shapes 
                                    if shape.has_text_frame and shape.text and 
                                    getattr(shape, "is_title", False)), f"Slide {i+1}")
                except Exception as slide_err:
                    logger.warning(f"Error getting slide title: {str(slide_err)}")
                    slide_title = f"Slide {i+1}"
                
                # First identify all tables to exclude them from text extraction
                tables = []
                try:
                    tables = [shape for shape in slide.shapes if hasattr(shape, "has_table") and shape.has_table]
                    table_ids = set(id(table) for table in tables)
                except Exception as table_err:
                    logger.warning(f"Error identifying tables in slide {i+1}: {str(table_err)}")
                    table_ids = set()
                
                # Extract all text from shapes on this slide (excluding tables and titles)
                text_content = []
                for shape in slide.shapes:
                    try:
                        # Skip tables and titles
                        if (id(shape) in table_ids or 
                            (hasattr(shape, "text") and shape.text == slide_title)):
                            continue
                            
                        # Add text from non-table shapes
                        if hasattr(shape, "text") and shape.text:
                            text_content.append(shape.text)
                    except Exception as shape_err:
                        logger.warning(f"Error processing shape in slide {i+1}: {str(shape_err)}")
                
                # Combine all text from this slide into a single chunk
                combined_text = "\n".join(text_content)
                if combined_text.strip():  # Only add if there's meaningful text
                    newuid = str(uuid4())
                    parent_text = f"Slide {i+1}: {slide_title}"
                    text_chunks.append([parent_text.replace("'","`"), combined_text.replace("'","`"), newuid])
                
                # Process tables separately
                for shape in tables:
                    try:
                        table_data = []
                        for r, row in enumerate(shape.table.rows):
                            row_data = []
                            for c, cell in enumerate(row.cells):
                                if cell.text:
                                    row_data.append(cell.text)
                                else:
                                    row_data.append("")
                            table_data.append(row_data)
                        
                        # Convert to markdown
                        markdown_table = self._table_to_markdown(table_data)
                        newuid = str(uuid4())
                        parent_text = f"Slide {i+1}: {slide_title}"
                        table_chunks.append([parent_text.replace("'","`"), markdown_table.replace("'","`"), "", newuid])
                    except Exception as table_process_err:
                        logger.warning(f"Error processing table in slide {i+1}: {str(table_process_err)}")
            
            return {"text_chunks": text_chunks, "table_chunks": table_chunks}
            
        except Exception as e:
            logger.error(f"Error processing PowerPoint {file_path}: {str(e)}")
            # Fall back to conversion method
            return self._process_powerpoint_via_pdf(file_path)

    def _process_powerpoint_via_pdf(self, file_path):
        """Fall back method to process PowerPoint by converting to PDF first"""
        logger.info(f"Falling back to PDF conversion for PowerPoint: {file_path}")
        
        try:
            # Convert PowerPoint to PDF
            pdf_path = self._convert_to_pdf(file_path)
            if not pdf_path:
                return None
            
            # Process the PDF with llmsherpa
            result = self._process_pdf_document(pdf_path)
            
            # Clean up the temporary PDF file
            try:
                os.remove(pdf_path)
            except:
                pass
                
            return result
        except Exception as e:
            logger.error(f"Error in PowerPoint fallback processing: {str(e)}")
            return None
    
    def _table_to_markdown(self, table_data):
        """Convert a 2D array to a markdown table"""
        if not table_data or not table_data[0]:
            return "| |"
            
        # Create header
        markdown = "| " + " | ".join([str(cell) for cell in table_data[0]]) + " |\n"
        # Add separator line
        markdown += "| " + " | ".join(["---" for _ in table_data[0]]) + " |\n"
        
        # Add data rows
        for row in table_data[1:]:
            markdown += "| " + " | ".join([str(cell) for cell in row]) + " |\n"
            
        return markdown
    
    def _process_excel(self, file_path):
        """Process Excel files with fallback for problematic files"""
        logger.info(f"Processing Excel: {file_path}")
        
        # Try pandas first (for modern Excel formats)
        try:
            # Load the Excel file using pandas
            excel_file = pd.ExcelFile(file_path)
            
            table_chunks = []
            
            # Process each sheet
            for sheet_name in excel_file.sheet_names:
                df = pd.read_excel(excel_file, sheet_name=sheet_name)
                
                # Convert to markdown
                markdown = df.to_markdown(index=False).replace("'","`")
                newuid = str(uuid4())
                table_chunks.append([sheet_name.replace("'","`"), markdown, "", newuid])
            
            return {"table_chunks": table_chunks}
                
        except Exception as e:
            logger.warning(f"Error processing Excel with pandas: {file_path}: {str(e)}")
            logger.info(f"Trying fallback method for Excel: {file_path}")
            
            # Try xlrd as fallback for older or problematic Excel files
            return self._process_excel_with_xlrd(file_path)

    def _process_excel_with_xlrd(self, file_path):
        """Fallback processing for Excel files using xlrd with extension adaptation"""
        try:
            # Import xlrd here to avoid dependency issues if not installed
            try:
                import xlrd
                import shutil
            except ImportError:
                logger.error("xlrd library not installed. Install with: pip install xlrd==1.2.0")
                return None
            
            # Create a temporary file with .xls extension regardless of original extension
            # This helps xlrd handle the file correctly based on content rather than extension
            temp_xls_path = self.temp_dir / f"{Path(file_path).stem}_copy.xls"
            
            try:
                # Copy to a temporary file with .xls extension
                shutil.copy2(file_path, temp_xls_path)
                logger.info(f"Created temporary .xls copy at {temp_xls_path}")
                
                # Open the workbook with xlrd using the .xls copy
                workbook = xlrd.open_workbook(str(temp_xls_path))
                
            except Exception as copy_err:
                logger.error(f"Error creating .xls copy: {str(copy_err)}")
                # Try with original file as last resort
                workbook = xlrd.open_workbook(str(file_path))
            
            table_chunks = []
            
            # Process each sheet
            for sheet_name in workbook.sheet_names():
                sheet = workbook.sheet_by_name(sheet_name)
                
                # Convert sheet data to a list of lists
                data = []
                headers = []
                
                # Get headers (first row) if sheet has data
                if sheet.nrows > 0:
                    for col in range(sheet.ncols):
                        headers.append(str(sheet.cell_value(0, col)))
                    data.append(headers)
                
                    # Get data rows
                    for row in range(1, sheet.nrows):
                        row_data = []
                        for col in range(sheet.ncols):
                            cell_value = sheet.cell_value(row, col)
                            
                            # Handle date values which are stored as floats in xls
                            if sheet.cell_type(row, col) == xlrd.XL_CELL_DATE:
                                try:
                                    dt_tuple = xlrd.xldate_as_tuple(cell_value, workbook.datemode)
                                    # Format as ISO date string
                                    if dt_tuple[3] == 0 and dt_tuple[4] == 0 and dt_tuple[5] == 0:
                                        # Just date without time
                                        cell_value = f"{dt_tuple[0]}-{dt_tuple[1]:02d}-{dt_tuple[2]:02d}"
                                    else:
                                        # Date and time
                                        cell_value = f"{dt_tuple[0]}-{dt_tuple[1]:02d}-{dt_tuple[2]:02d} {dt_tuple[3]:02d}:{dt_tuple[4]:02d}:{dt_tuple[5]:02d}"
                                except:
                                    # If date conversion fails, just use the string value
                                    cell_value = str(cell_value)
                            
                            row_data.append(str(cell_value).replace("'","`"))
                        data.append(row_data)
                else:
                    # Handle empty sheets
                    data = [["Empty Sheet"]]
                
                # Convert to markdown table
                markdown = self._list_to_markdown(data)
                newuid = str(uuid4())
                table_chunks.append([sheet_name.replace("'","`"), markdown, "", newuid])
            
            # Clean up temporary file
            try:
                if temp_xls_path.exists():
                    os.remove(temp_xls_path)
            except:
                pass
                
            return {"table_chunks": table_chunks}
            
        except Exception as e:
            logger.error(f"Error processing Excel with xlrd fallback: {file_path}: {str(e)}")
            
            # If even xlrd fails, try LibreOffice conversion to CSV as last resort
            return self._process_excel_via_conversion(file_path)

    def _list_to_markdown(self, data):
        """Convert a 2D list to a markdown table"""
        if not data or len(data) == 0:
            return "| Empty Table |"
        if len(data[0]) == 0:
            return "| Empty Table |"
            
        # Create header
        markdown = "| " + " | ".join([str(cell) for cell in data[0]]) + " |\n"
        # Add separator line
        markdown += "| " + " | ".join(["---" for _ in data[0]]) + " |\n"
        
        # Add data rows
        for row in data[1:]:
            # Make sure row has enough cells to match header
            while len(row) < len(data[0]):
                row.append("")
            # Limit row to number of header columns
            row = row[:len(data[0])]
            markdown += "| " + " | ".join([str(cell) for cell in row]) + " |\n"
            
        return markdown

    def _process_excel_via_conversion(self, file_path):
        """Last resort for Excel processing via LibreOffice conversion to CSV"""
        logger.info(f"Attempting to convert Excel to CSV as last resort: {file_path}")
        
        try:
            # Create a temp directory for CSV output
            csv_dir = self.temp_dir / "csv_output"
            os.makedirs(csv_dir, exist_ok=True)
            
            # Use LibreOffice to convert to CSV
            cmd = [
                'libreoffice',
                '--headless',
                '--convert-to', 'csv',
                '--outdir', str(csv_dir),
                str(file_path)
            ]
            
            process = subprocess.run(cmd, capture_output=True, text=True, timeout=180)
            
            # Find the generated CSV files
            csv_files = list(csv_dir.glob("*.csv"))
            
            if not csv_files:
                logger.error(f"No CSV files generated from {file_path}")
                return self._create_minimal_excel_representation(file_path)
            
            table_chunks = []
            
            # Process each CSV file
            for csv_file in csv_files:
                try:
                    # Try to read the CSV
                    df = pd.read_csv(csv_file, encoding='utf-8')
                    
                    # Convert to markdown
                    markdown = df.to_markdown(index=False).replace("'","`")
                    sheet_name = csv_file.stem.replace(Path(file_path).stem, "").lstrip("_").replace("'","`")
                    if not sheet_name:
                        sheet_name = "Sheet1"
                        
                    newuid = str(uuid4())
                    table_chunks.append([sheet_name, markdown, "", newuid])
                except:
                    # If CSV read fails, create basic representation
                    content = "CSV conversion failed for this sheet"
                    newuid = str(uuid4())
                    sheet_name = csv_file.stem.replace(Path(file_path).stem, "").lstrip("_")
                    if not sheet_name:
                        sheet_name = "Sheet1"
                    table_chunks.append([sheet_name.replace("'","`"), content.replace("'","`"), "", newuid])
            
            # Clean up CSV files
            for csv_file in csv_files:
                try:
                    os.remove(csv_file)
                except:
                    pass
                    
            if table_chunks:
                return {"table_chunks": table_chunks}
            else:
                return self._create_minimal_excel_representation(file_path)
                
        except Exception as e:
            logger.error(f"Error in Excel CSV conversion: {str(e)}")
            return self._create_minimal_excel_representation(file_path)

    def _create_minimal_excel_representation(self, file_path):
        """Create minimal representation for Excel files that cannot be processed"""
        logger.warning(f"Creating minimal representation for: {file_path}")
        
        newuid = str(uuid4())
        error_text = f"| Error Processing Excel File |\n|---|\n| The file {Path(file_path).name} could not be processed. |"
        table_chunks = [[
            "Error", 
            error_text.replace("'","`"), 
            "", 
            newuid
        ]]
        
        return {"table_chunks": table_chunks}
    
    def process_document(self, file_path):
        """Process a document based on its type and return structured data"""
        file_path = Path(file_path)
        # Validate file before processing
        if not self._is_valid_file(file_path):
            logger.error(f"Invalid or corrupted file, skipping: {file_path}")
            return None
        
        file_type = self._get_file_type(file_path)
        
        if file_type == "unknown":
            logger.warning(f"Unsupported file type: {file_path}")
            return None
        
        # Process based on file type
        if file_type == "word":
            result = self._process_word_document(file_path)
        elif file_type == "powerpoint":
            result = self._process_powerpoint(file_path)
        elif file_type == "excel":
            result = self._process_excel(file_path)
        elif file_type == "pdf":
            result = self._process_pdf_document(file_path)
            
        # Add metadata
        if result:
            result["file_path"] = str(file_path)
            result["file_type"] = file_type
            result["file_name"] = file_path.name
            
        return result
    
    def process_documents(self, file_paths, max_workers=4):
        """Process multiple documents in parallel"""
        results = {}
        
        with ThreadPoolExecutor(max_workers=max_workers) as executor:
            future_to_file = {executor.submit(self.process_document, file): file for file in file_paths}
            
            for future in concurrent.futures.as_completed(future_to_file):
                file = future_to_file[future]
                try:
                    result = future.result()
                    if result:
                        results[file] = result
                except Exception as e:
                    logger.error(f"Error processing {file}: {str(e)}")
        
        return results
    
    def sanitize_text(self,text):
        """
        Sanitize text by encoding to UTF-8 with error replacement and decoding back.
        This replaces any characters that might cause ASCII encoding errors.
        """
        return text.encode("utf-8", errors="replace").decode("utf-8")
    
    def summarize_text(self,text, api_key, max_tokens_summary=8192):
        """
        Summarize the input text using the GPT-4o-mini summarizer.
        The summary will be limited to under max_tokens_summary tokens.
        """
        # Prepare the summarization prompt
        llm = ChatOpenAI(model_name="gpt-4o-mini", temperature=0,api_key=api_key)
        text=self.sanitize_text(text)

        prompt = (
            f"Please summarize the following text such that the summary is under {max_tokens_summary} tokens:\n\n{text}"
        )
        
        # Call the ChatCompletion API with the GPT-4o-mini model
        response = llm.invoke(prompt)
        
        summary = response.content.strip()
        return summary
    
    

    def count_tokens(self,text):
        encoding = tiktoken.get_encoding("cl100k_base")
        return len(encoding.encode(text))    
    
    def generate_embeddings(self, session, api_key):
        """Generate embeddings for text and table chunks using OpenAI API"""
        import openai
        import time
        
        openai.api_key = api_key
        
        # Get chunks without embeddings
        text_chunks = run_query(session,"""
            MATCH (c:Text_chunk) 
            WHERE c.embedding IS NULL
            RETURN c.UID as uid, c.Text as text
            LIMIT 100
        """).data()
        
        table_chunks = run_query(session,"""
            MATCH (c:Table_chunk) 
            WHERE c.embedding IS NULL
            RETURN c.UID as uid, c.Text as text, c.Html as html
            LIMIT 100
        """).data()
        
        logger.info(f"Found {len(text_chunks)} text chunks and {len(table_chunks)} table chunks without embeddings")
        
        # Process text chunks
        for chunk in text_chunks:
            try:
                # Rate limit to avoid API errors (3 requests per second)
                #time.sleep(0.1)
                content=chunk['text']
                if len(content) > 1000000:
                    content = content[:1000000]
                    logger.warning(f"Shrinking table chunk {chunk['uid']} due to token limit")
                    while self.count_tokens(content) > 110000:
                        content = content[:-1000]

                # Create embedding
                if self.count_tokens(chunk['text']) > 8192:
                    logger.warning(f"Summarizing text chunk {chunk['uid']} due to token limit")
                    content=self.summarize_text(content, api_key)

                    
                response = openai.embeddings.create(
                    model="text-embedding-3-small",
                    input=content
                )
                embedding = response.data[0].embedding
                
                # Update the node
                run_query(session,f"""
                    MATCH (c:Text_chunk {{UID: '{chunk['uid']}'}})
                    SET c.embedding = {embedding}
                """)
                
                logger.info(f"Added embedding for Text_chunk {chunk['uid']}")
            except Exception as e:
                logger.error(f"Error creating embedding for text chunk {chunk['uid']}: {str(e)}")
        
        # Process table chunks
        for chunk in table_chunks:
            try:
                # Rate limit to avoid API errors (3 requests per second)
                #time.sleep(0.1)
                
                # Use text if available, otherwise HTML
                content = chunk['text'] if chunk['text'] and len(chunk['text']) > 0 else chunk['html']
                if len(content) > 1000000:
                    content = content[:1000000]
                    logger.warning(f"Shrinking table chunk {chunk['uid']} due to token limit")
                    while self.count_tokens(content) > 110000:
                        content = content[:-1000]
                        
                
                # Create embedding
                if self.count_tokens(content) > 8192:
                    logger.warning(f"Summarizing text chunk {chunk['uid']} due to token limit")
                    content=self.summarize_text(content, api_key)

                response = openai.embeddings.create(
                    model="text-embedding-3-small",
                    input=content
                )
                embedding = response.data[0].embedding
                
                # Update the node
                run_query(session,f"""
                    MATCH (c:Table_chunk {{UID: '{chunk['uid']}'}})
                    SET c.embedding = {embedding}
                """)
                
                logger.info(f"Added embedding for Table_chunk {chunk['uid']}")
            except Exception as e:
                logger.error(f"Error creating embedding for table chunk {chunk['uid']}: {str(e)}")
        
        # Return counts of updated nodes
        return len(text_chunks), len(table_chunks)
    
    
    def get_all_bibtex(self, session):
        """
        Get BibTeX entries from all Publication nodes
        
        Parameters:
        -----------
        session : Neo4j session
            Active database session
                
        Returns:
        --------
        dict
            Dictionary mapping document UIDs to their BibTeX entries
        """
        
        # Query to get all Publication nodes with their BibTeX entries
        query = """
        MATCH (doc:Publication)
        WHERE doc.BibTex IS NOT NULL
        RETURN doc.UID AS doc_uid, doc.BibTex AS bibtex
        """
        
        results = run_query(session, query).data()
        
        # Create a dictionary mapping document UIDs to BibTeX entries
        bibtex_entries = {}
        for result in results:
            doc_uid = result['doc_uid']
            bibtex = result['bibtex']
            bibtex_entries[doc_uid] = bibtex
        
        return bibtex_entries
    
    def get_all_document_paths(self,session):
        """
        Get the full paths from Rootfolder to all Documents
        
        Parameters:
        -----------
        session : Neo4j session
            Active database session
            
        Returns:
        --------
        dict
            Dictionary mapping document UIDs to their full paths
        """

        # Query to get path nodes from Rootfolder to all Documents
        query = """
        MATCH path = (root:Rootfolder {Name: 'T001'})-[:PATH*]->(doc:Document)
        WITH doc.UID AS doc_uid, nodes(path) AS pathNodes
        UNWIND range(0, size(pathNodes)-1) AS i
        WITH doc_uid, pathNodes[i].Name AS name, i
        ORDER BY doc_uid, i
        RETURN doc_uid, collect(name) AS path_names
        """
        
        results = run_query(session, query).data()
        
        # Create a dictionary mapping document UIDs to path strings
        document_paths = {}
        for result in results:
            doc_uid = result['doc_uid']
            path_components = result['path_names']
            path_string = '/'.join(path_components)
            document_paths[doc_uid] = path_string
        
        return document_paths
    

    def batch_generate_embeddings(self, session, api_key, chroma_collection ,batch_size=100):

        #self.doc_paths=self.get_all_document_paths(session)
        self.doc_paths=self.get_all_bibtex(session)
        
        total_text = 0
        total_tables = 0
        
        while True:
            #text_count, table_count = self.generate_embeddings(session, api_key)
            text_count, table_count = self.embed_in_chroma(chroma_collection,session)
            total_text += text_count
            total_tables += table_count
            
            # If no more chunks to process, break
            if text_count == 0 and table_count == 0:
                break
                
            logger.info(f"Processed batch: {text_count} text chunks, {table_count} table chunks")
        
        logger.info(f"Completed embedding generation: {total_text} text chunks, {total_tables} table chunks")
        return total_text, total_tables
    
    def embed_in_chroma(self,chroma_collection,session):

        # Get chunks without embeddings
        text_chunks = run_query(session,"""
            MATCH (d:Publication)-->(c:Text_chunk) 
            WHERE c.embedding IS NULL
            RETURN c.UID as uid, c.Text as text,d.UID as puid
            LIMIT 100
        """).data()
        
        table_chunks = run_query(session,"""
            MATCH (d:Publication)-->(c:Table_chunk) 
            WHERE c.embedding IS NULL
            RETURN c.UID as uid, c.Text as text, c.Html as html,d.UID as puid
            LIMIT 100
        """).data()

        doclist=[]
        id_list=[]
        metadata_list=[]
        for chunk in text_chunks:
            doclist.append(chunk['text'])
            id_list.append(chunk['uid'])
            metadata_list.append({'bibtex':str(self.doc_paths[chunk['puid']])})
            # Update the node
            run_query(session,f"""
                MATCH (c:Text_chunk {{UID: '{chunk['uid']}'}})
                SET c.embedding = 'yes'
            """)

        try:
            # Create embedding
            collection.add(
                documents=doclist,
                metadatas=metadata_list ,
                ids=id_list
            )
            
            
            logger.info(f"Added embedding for list of table chunks")
        except Exception as e:
            logger.error(f"Error creating embedding for text chunk with Chroma: {str(e)}")

        doclist=[]
        id_list=[]
        metadata_list=[]
        # Process table chunks
        for chunk in table_chunks:
            doclist.append(chunk['text'] if chunk['text'] and len(chunk['text']) > 0 else chunk['html'])
            id_list.append(chunk['uid'])
            metadata_list.append({'bibtex':str(self.doc_paths[chunk['puid']])})
                # Update the node
            run_query(session,f"""
                MATCH (c:Table_chunk {{UID: '{chunk['uid']}'}})
                SET c.embedding = 'yes'
            """)
        try:
                # Create embedding
            collection.add(
                documents=doclist,
                metadatas=metadata_list ,
                ids=id_list
            )
            
            logger.info(f"Added embedding for list of table chunks")
        except Exception as e:
            logger.error(f"Error creating embedding for text chunk with Chroma: {str(e)}")

        return len(text_chunks), len(table_chunks)

Parameters

Name Type Default Kind
bases - -

Parameter Details

bases: Parameter of type

Return Value

Returns unspecified type

Class Interface

Methods

__init__(self, input_dir, output_dir, temp_dir, llmsherpa_api_url)

Purpose: Initialize the document processor Args: input_dir: Directory with source documents output_dir: Directory to save processed chunks temp_dir: Directory for temporary files (optional) llmsherpa_api_url: URL for llmsherpa API

Parameters:

  • input_dir: Parameter
  • output_dir: Parameter
  • temp_dir: Parameter
  • llmsherpa_api_url: Parameter

Returns: None

_get_file_extension(self, file_path)

Purpose: Get lowercase file extension including the dot

Parameters:

  • file_path: Parameter

Returns: None

_is_valid_file(self, file_path)

Purpose: Check if a file appears to be valid and processable with better format detection

Parameters:

  • file_path: Parameter

Returns: None

_get_file_type(self, file_path)

Purpose: Determine file type based on extension

Parameters:

  • file_path: Parameter

Returns: None

_convert_to_pdf(self, input_file)

Purpose: Convert a document to PDF using LibreOffice with unoconv fallback

Parameters:

  • input_file: Parameter

Returns: None

_process_word_document(self, file_path)

Purpose: Process Word documents by converting to PDF and using llmsherpa

Parameters:

  • file_path: Parameter

Returns: None

_process_pdf_document(self, file_path)

Purpose: Process PDF documents using llmsherpa

Parameters:

  • file_path: Parameter

Returns: None

_process_powerpoint(self, file_path)

Purpose: Internal method: process powerpoint

Parameters:

  • file_path: Parameter

Returns: None

_process_powerpoint_via_pdf(self, file_path)

Purpose: Fall back method to process PowerPoint by converting to PDF first

Parameters:

  • file_path: Parameter

Returns: None

_table_to_markdown(self, table_data)

Purpose: Convert a 2D array to a markdown table

Parameters:

  • table_data: Parameter

Returns: None

_process_excel(self, file_path)

Purpose: Process Excel files with fallback for problematic files

Parameters:

  • file_path: Parameter

Returns: None

_process_excel_with_xlrd(self, file_path)

Purpose: Fallback processing for Excel files using xlrd with extension adaptation

Parameters:

  • file_path: Parameter

Returns: None

_list_to_markdown(self, data)

Purpose: Convert a 2D list to a markdown table

Parameters:

  • data: Parameter

Returns: None

_process_excel_via_conversion(self, file_path)

Purpose: Last resort for Excel processing via LibreOffice conversion to CSV

Parameters:

  • file_path: Parameter

Returns: None

_create_minimal_excel_representation(self, file_path)

Purpose: Create minimal representation for Excel files that cannot be processed

Parameters:

  • file_path: Parameter

Returns: None

process_document(self, file_path)

Purpose: Process a document based on its type and return structured data

Parameters:

  • file_path: Parameter

Returns: See docstring for return details

process_documents(self, file_paths, max_workers)

Purpose: Process multiple documents in parallel

Parameters:

  • file_paths: Parameter
  • max_workers: Parameter

Returns: None

sanitize_text(self, text)

Purpose: Sanitize text by encoding to UTF-8 with error replacement and decoding back. This replaces any characters that might cause ASCII encoding errors.

Parameters:

  • text: Parameter

Returns: None

summarize_text(self, text, api_key, max_tokens_summary)

Purpose: Summarize the input text using the GPT-4o-mini summarizer. The summary will be limited to under max_tokens_summary tokens.

Parameters:

  • text: Parameter
  • api_key: Parameter
  • max_tokens_summary: Parameter

Returns: None

count_tokens(self, text)

Purpose: Performs count tokens

Parameters:

  • text: Parameter

Returns: None

generate_embeddings(self, session, api_key)

Purpose: Generate embeddings for text and table chunks using OpenAI API

Parameters:

  • session: Parameter
  • api_key: Parameter

Returns: None

get_all_bibtex(self, session)

Purpose: Get BibTeX entries from all Publication nodes Parameters: ----------- session : Neo4j session Active database session Returns: -------- dict Dictionary mapping document UIDs to their BibTeX entries

Parameters:

  • session: Parameter

Returns: See docstring for return details

get_all_document_paths(self, session)

Purpose: Get the full paths from Rootfolder to all Documents Parameters: ----------- session : Neo4j session Active database session Returns: -------- dict Dictionary mapping document UIDs to their full paths

Parameters:

  • session: Parameter

Returns: See docstring for return details

batch_generate_embeddings(self, session, api_key, chroma_collection, batch_size)

Purpose: Performs batch generate embeddings

Parameters:

  • session: Parameter
  • api_key: Parameter
  • chroma_collection: Parameter
  • batch_size: Parameter

Returns: None

embed_in_chroma(self, chroma_collection, session)

Purpose: Performs embed in chroma

Parameters:

  • chroma_collection: Parameter
  • session: Parameter

Returns: None

Required Imports

import os
import subprocess
import tempfile
from pathlib import Path
import pandas as pd

Usage Example

# Example usage:
# result = DocumentProcessor(bases)

Similar Components

AI-powered semantic similarity - components with related functionality:

  • class DocumentProcessor_v4 99.0% similar

    Process different document types for RAG context extraction

    From: /tf/active/vicechatdev/offline_docstore_multi_vice.py
  • class DocumentProcessor_v7 81.2% similar

    Process different document types for indexing

    From: /tf/active/vicechatdev/docchat/document_indexer.py
  • class DocChatRAG 65.3% similar

    Main RAG engine with three operating modes: 1. Basic RAG (similarity search) 2. Extensive (full document retrieval with preprocessing) 3. Full Reading (process all documents)

    From: /tf/active/vicechatdev/docchat/rag_engine.py
  • class DocumentProcessor 62.9% similar

    A comprehensive document processing class that converts documents to PDF, adds audit trails, applies security features (watermarks, signatures, hashing), and optionally converts to PDF/A format with document protection.

    From: /tf/active/vicechatdev/document_auditor/src/document_processor.py
  • class DocumentProcessor_v3 62.4% similar

    Handles document processing and text extraction using llmsherpa (same approach as offline_docstore_multi_vice.py).

    From: /tf/active/vicechatdev/docchat/document_processor.py
← Back to Browse