class DocumentProcessor_v6
Lightweight document processor for chat upload functionality
/tf/active/vicechatdev/vice_ai/document_processor.py
97 - 1028
moderate
Purpose
Lightweight document processor for chat upload functionality
Source Code
class DocumentProcessor:
"""Lightweight document processor for chat upload functionality"""
# Supported file extensions by type
WORD_EXTENSIONS = ['.doc', '.docx', '.docm', '.dot', '.dotx', '.dotm', '.rtf', '.odt']
PPT_EXTENSIONS = ['.ppt', '.pptx', '.pptm', '.pot', '.potx', '.potm', '.pps', '.ppsx', '.odp']
EXCEL_EXTENSIONS = ['.xls', '.xlsx', '.xlsm', '.xlt', '.xltx', '.xltm', '.xlsb', '.ods']
PDF_EXTENSIONS = ['.pdf']
def __init__(self, temp_dir=None, llmsherpa_api_url=None):
"""
Initialize the document processor
Args:
temp_dir: Directory for temporary files (optional)
llmsherpa_api_url: URL for llmsherpa API
"""
self.temp_dir = Path(temp_dir) if temp_dir else Path(tempfile.mkdtemp())
self.llmsherpa_api_url = llmsherpa_api_url or "http://llmsherpa:5001/api/parseDocument?renderFormat=all&useNewIndentParser=yes"
# Create temp directory if it doesn't exist
os.makedirs(self.temp_dir, exist_ok=True)
# Setup extraction debugging directory
self.extracted_dir = Path(__file__).parent / "extracted"
os.makedirs(self.extracted_dir, exist_ok=True)
# Setup tiktoken for token counting
self.tokenizer = tiktoken.get_encoding("cl100k_base")
# Log available processing capabilities
self._log_available_capabilities()
def _log_available_capabilities(self):
"""Log which document processing libraries are available"""
capabilities = []
if LLMSHERPA_AVAILABLE:
capabilities.append("✅ llmsherpa (advanced PDF processing)")
else:
capabilities.append("❌ llmsherpa (advanced PDF processing)")
if PYPDF2_AVAILABLE:
capabilities.append("✅ PyPDF2 (basic PDF processing)")
else:
capabilities.append("❌ PyPDF2 (basic PDF processing)")
if PYMUPDF_AVAILABLE:
capabilities.append("✅ pymupdf (enhanced PDF processing + OCR)")
else:
capabilities.append("❌ pymupdf (enhanced PDF processing + OCR)")
if PYTESSERACT_AVAILABLE:
capabilities.append("✅ pytesseract (OCR text recognition)")
else:
capabilities.append("❌ pytesseract (OCR text recognition)")
if PDF2IMAGE_AVAILABLE:
capabilities.append("✅ pdf2image (PDF to image conversion)")
else:
capabilities.append("❌ pdf2image (PDF to image conversion)")
if DOCX_AVAILABLE:
capabilities.append("✅ python-docx (Word document processing)")
else:
capabilities.append("❌ python-docx (Word document processing)")
if PPTX_AVAILABLE:
capabilities.append("✅ python-pptx (PowerPoint processing)")
else:
capabilities.append("❌ python-pptx (PowerPoint processing)")
if PANDAS_AVAILABLE:
capabilities.append("✅ pandas (Excel processing)")
else:
capabilities.append("❌ pandas (Excel processing)")
logger.info("📋 Document Processing Capabilities:")
for capability in capabilities:
logger.info(f" {capability}")
def get_available_formats(self):
"""Get list of supported file formats based on available libraries"""
formats = []
if LLMSHERPA_AVAILABLE or PYPDF2_AVAILABLE:
formats.extend(self.PDF_EXTENSIONS)
if DOCX_AVAILABLE:
formats.append('.docx')
# Always support conversion fallback for Word docs
formats.extend([ext for ext in self.WORD_EXTENSIONS if ext not in formats])
if PPTX_AVAILABLE:
formats.append('.pptx')
# Always support conversion fallback for PowerPoint
formats.extend([ext for ext in self.PPT_EXTENSIONS if ext not in formats])
if PANDAS_AVAILABLE:
formats.extend(self.EXCEL_EXTENSIONS)
return list(set(formats)) # Remove duplicates
def _save_extraction_debug_log(self, file_path, processed_result, extraction_method):
"""
Save extraction results to debug log file for performance analysis
Args:
file_path: Original file path
processed_result: Result from document processing
extraction_method: Method used for extraction (e.g., 'llmsherpa', 'pypdf2', 'python-docx')
"""
try:
# Create timestamp and sanitized filename
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
original_name = Path(file_path).stem
# Sanitize filename for filesystem
safe_name = "".join(c for c in original_name if c.isalnum() or c in (' ', '-', '_')).strip()
safe_name = safe_name.replace(' ', '_')
# Create debug log filename
debug_filename = f"{timestamp}_{safe_name}_{extraction_method}.json"
debug_path = self.extracted_dir / debug_filename
# Prepare debug data
debug_data = {
"timestamp": timestamp,
"original_file": str(file_path),
"filename": Path(file_path).name,
"file_size_bytes": Path(file_path).stat().st_size if Path(file_path).exists() else 0,
"extraction_method": extraction_method,
"processing_result": {
"text_chunks_count": len(processed_result.get('text_chunks', [])),
"tables_count": len(processed_result.get('tables', [])),
"has_error": 'error' in processed_result,
"error_message": processed_result.get('error', None)
},
"text_content": {
"text_chunks": processed_result.get('text_chunks', []),
"tables": processed_result.get('tables', [])
},
"metadata": processed_result.get('metadata', {}),
"extraction_stats": {
"total_text_length": sum(len(chunk) for chunk in processed_result.get('text_chunks', [])),
"total_table_length": sum(len(table) for table in processed_result.get('tables', [])),
"estimated_tokens": self.count_tokens(self.get_combined_text(processed_result))
}
}
# Save to JSON file
with open(debug_path, 'w', encoding='utf-8') as f:
json.dump(debug_data, f, indent=2, ensure_ascii=False)
logger.info(f"📄 Extraction debug log saved: {debug_filename}")
logger.info(f" Method: {extraction_method}")
logger.info(f" Text chunks: {debug_data['processing_result']['text_chunks_count']}")
logger.info(f" Tables: {debug_data['processing_result']['tables_count']}")
logger.info(f" Total text length: {debug_data['extraction_stats']['total_text_length']}")
logger.info(f" Estimated tokens: {debug_data['extraction_stats']['estimated_tokens']}")
except Exception as e:
logger.warning(f"Failed to save extraction debug log: {e}")
def _get_file_extension(self, file_path):
"""Get lowercase file extension including the dot"""
return Path(file_path).suffix.lower()
def _get_file_type(self, file_path):
"""Determine file type based on extension"""
ext = self._get_file_extension(file_path)
if ext in self.WORD_EXTENSIONS:
return "word"
elif ext in self.PPT_EXTENSIONS:
return "powerpoint"
elif ext in self.EXCEL_EXTENSIONS:
return "excel"
elif ext in self.PDF_EXTENSIONS:
return "pdf"
else:
return "unknown"
def count_tokens(self, text):
"""Count tokens in text using tiktoken"""
return len(self.tokenizer.encode(text))
def sanitize_text(self, text):
"""Sanitize text by encoding to UTF-8 with error replacement"""
if not text:
return ""
return text.encode("utf-8", errors="replace").decode("utf-8")
def chunk_text(self, text, max_chunk_size=4000, overlap=200):
"""
Split text into chunks with overlap for better context preservation
Args:
text: Text to chunk
max_chunk_size: Maximum characters per chunk
overlap: Character overlap between chunks
Returns:
List of text chunks
"""
if not text or len(text) <= max_chunk_size:
return [text] if text else []
chunks = []
start = 0
while start < len(text):
# Calculate end position
end = start + max_chunk_size
# Try to break at sentence boundary if possible
if end < len(text):
# Look for sentence endings near the end
for punct in ['. ', '.\n', '? ', '?\n', '! ', '!\n']:
last_sentence = text.rfind(punct, start + max_chunk_size - 200, end)
if last_sentence != -1:
end = last_sentence + len(punct)
break
chunk = text[start:end].strip()
if chunk:
chunks.append(chunk)
# Move start position with overlap
start = max(start + max_chunk_size - overlap, end)
return chunks
def _clean_pdf_text(self, text):
"""
Clean and normalize PDF extracted text
Removes excessive whitespace, fixes common OCR issues, and improves readability
"""
if not text:
return ""
# Normalize whitespace
import re
# Remove excessive newlines (more than 2 consecutive)
text = re.sub(r'\n{3,}', '\n\n', text)
# Fix broken words (common in PDF extraction)
# Look for single letters followed by space and continue
text = re.sub(r'\b([a-zA-Z])\s+([a-zA-Z])\b', r'\1\2', text)
# Remove excessive spaces
text = re.sub(r' {2,}', ' ', text)
# Fix common OCR issues
text = text.replace('fi', 'fi')
text = text.replace('fl', 'fl')
text = text.replace('–', '-')
text = text.replace('"', '"')
text = text.replace('"', '"')
text = text.replace(''', "'")
text = text.replace(''', "'")
# Remove isolated special characters that are OCR artifacts
text = re.sub(r'\s+[^\w\s]{1}\s+', ' ', text)
# Ensure proper sentence spacing
text = re.sub(r'\.([A-Z])', r'. \1', text)
# Remove leading/trailing whitespace from lines
lines = text.split('\n')
cleaned_lines = [line.strip() for line in lines if line.strip()]
# Join lines and remove extra spaces
cleaned_text = '\n'.join(cleaned_lines)
# Final cleanup
cleaned_text = re.sub(r'\s+', ' ', cleaned_text)
cleaned_text = cleaned_text.strip()
return cleaned_text
def _ocr_pdf_extraction(self, file_path):
"""
OCR-based PDF extraction using pymupdf (fitz) or other OCR libraries
This is a fallback for scanned PDFs or when other methods fail
"""
text_chunks = []
tables = []
# Try with pymupdf (fitz) first - good for both text and OCR
if PYMUPDF_AVAILABLE and fitz:
try:
doc = fitz.open(file_path)
for page_num in range(len(doc)):
page = doc.load_page(page_num)
# Try text extraction first (for non-scanned pages)
text = page.get_text()
# If very little text found, try OCR
if len(text.strip()) < 50:
try:
# Get page as image and OCR it
mat = fitz.Matrix(2, 2) # 2x zoom for better OCR
pix = page.get_pixmap(matrix=mat)
# Try with pytesseract if available
if PYTESSERACT_AVAILABLE:
try:
import pytesseract
from PIL import Image
import io
img_data = pix.tobytes("ppm")
img = Image.open(io.BytesIO(img_data))
text = pytesseract.image_to_string(img)
except Exception as ocr_error:
logger.warning(f"pytesseract OCR failed for page {page_num}: {ocr_error}")
text = page.get_text() # Fallback to basic text
else:
logger.warning("pytesseract not available for OCR")
text = page.get_text() # Fallback to basic text
except Exception as ocr_error:
logger.warning(f"OCR failed for page {page_num}: {ocr_error}")
text = page.get_text() # Fallback to basic text
# Clean and chunk the text
if text and len(text.strip()) > 10:
clean_text = self._clean_pdf_text(text)
if clean_text:
page_content = f"Page {page_num + 1}:\n\n{clean_text}"
text_chunks.extend(self.chunk_text(page_content))
# Try to extract tables using pymupdf
try:
tables_on_page = page.find_tables()
for i, table in enumerate(tables_on_page):
table_data = table.extract()
if table_data and len(table_data) > 1:
markdown_table = self._table_to_markdown(table_data)
if markdown_table:
tables.append(f"Page {page_num + 1} Table {i + 1}:\n\n{markdown_table}")
except Exception as table_error:
logger.warning(f"Table extraction failed for page {page_num}: {table_error}")
doc.close()
if text_chunks:
return {"text_chunks": text_chunks, "tables": tables}
except Exception as e:
logger.warning(f"pymupdf OCR extraction failed: {e}")
else:
logger.warning("pymupdf (fitz) not available for OCR extraction")
# Fallback OCR with pdf2image + pytesseract
if PDF2IMAGE_AVAILABLE and PYTESSERACT_AVAILABLE:
try:
from pdf2image import convert_from_path
import pytesseract
logger.info("Attempting OCR with pdf2image + pytesseract...")
# Convert PDF to images
images = convert_from_path(file_path, dpi=200)
for i, image in enumerate(images):
try:
# Extract text using OCR
text = pytesseract.image_to_string(image, lang='eng')
if text and len(text.strip()) > 10:
clean_text = self._clean_pdf_text(text)
if clean_text:
page_content = f"Page {i + 1} (OCR):\n\n{clean_text}"
text_chunks.extend(self.chunk_text(page_content))
except Exception as page_error:
logger.warning(f"OCR failed for page {i + 1}: {page_error}")
if text_chunks:
return {"text_chunks": text_chunks, "tables": tables}
except Exception as e:
logger.warning(f"pdf2image OCR extraction failed: {e}")
else:
logger.warning("pdf2image or pytesseract not available for OCR")
return {"text_chunks": [], "tables": []}
def _convert_to_pdf(self, input_file):
"""Convert a document to PDF using LibreOffice"""
input_path = Path(input_file)
output_pdf = self.temp_dir / f"{input_path.stem}.pdf"
try:
# Use LibreOffice to convert to PDF
cmd = [
'libreoffice', '--headless', '--convert-to', 'pdf',
'--outdir', str(self.temp_dir),
str(input_path.absolute())
]
process = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=60 # 1 minute timeout
)
if process.returncode == 0 and output_pdf.exists():
logger.info(f"Successfully converted to PDF: {output_pdf}")
return output_pdf
else:
logger.error(f"LibreOffice conversion failed: {process.stderr}")
return None
except subprocess.TimeoutExpired:
logger.error(f"Timeout while converting: {input_path}")
return None
except Exception as e:
logger.error(f"Failed to convert with LibreOffice: {str(e)}")
return None
def _process_pdf_document(self, file_path):
"""
Enhanced PDF processing with multiple extraction strategies and table detection
Uses a cascading approach to get the best possible text and table extraction
"""
logger.info(f"Processing PDF: {file_path}")
all_results = []
best_result = {"text_chunks": [], "tables": []}
# Strategy 1: Enhanced llmsherpa processing
if LLMSHERPA_AVAILABLE and LayoutPDFReader:
try:
logger.info("🔍 Attempting llmsherpa extraction...")
pdf_reader = LayoutPDFReader(self.llmsherpa_api_url)
doc = pdf_reader.read_pdf(str(file_path))
text_chunks = []
tables = []
# Enhanced section extraction
sections_found = False
for section in doc.sections():
if section.title and section.to_text():
sections_found = True
section_text = section.to_text().strip()
if len(section_text) > 10: # Only meaningful content
chunk_text = f"## {section.title}\n\n{section_text}"
text_chunks.extend(self.chunk_text(chunk_text))
# Enhanced chunk extraction with better filtering
chunks_found = False
for chunk in doc.chunks():
chunk_text = chunk.to_text().strip()
if len(chunk_text) > 20: # Filter out very short chunks
chunks_found = True
text_chunks.extend(self.chunk_text(chunk_text))
# Extract tables if available
try:
for table in doc.tables():
table_data = table.to_text()
if table_data and len(table_data) > 10:
tables.append(table_data)
except:
pass
# Get full document text as fallback
if not text_chunks:
full_text = doc.to_text().strip()
if full_text and len(full_text) > 50:
text_chunks.extend(self.chunk_text(full_text))
if text_chunks:
result = {"text_chunks": text_chunks, "tables": tables}
all_results.append(("llmsherpa", result))
logger.info(f"✅ llmsherpa: {len(text_chunks)} chunks, {len(tables)} tables")
self._save_extraction_debug_log(file_path, result, "llmsherpa")
# If we got substantial content, use this as our best result
total_text_length = sum(len(chunk) for chunk in text_chunks)
if total_text_length > 200:
best_result = result
except Exception as e:
logger.warning(f"llmsherpa failed for {file_path}: {e}")
# Strategy 2: Enhanced pdfplumber with table extraction
try:
import pdfplumber
logger.info("🔍 Attempting pdfplumber extraction...")
text_chunks = []
tables = []
with pdfplumber.open(file_path) as pdf:
for i, page in enumerate(pdf.pages):
try:
# Extract text
page_text = page.extract_text()
if page_text and len(page_text.strip()) > 20:
clean_text = self._clean_pdf_text(page_text)
if clean_text:
page_content = f"Page {i+1}:\n\n{clean_text}"
text_chunks.extend(self.chunk_text(page_content))
# Extract tables
page_tables = page.extract_tables()
if page_tables:
for j, table in enumerate(page_tables):
if table and len(table) > 1: # Must have header + data
markdown_table = self._table_to_markdown(table)
if markdown_table:
tables.append(f"Page {i+1} Table {j+1}:\n\n{markdown_table}")
except Exception as page_error:
logger.warning(f"Error processing page {i+1} with pdfplumber: {page_error}")
if text_chunks:
result = {"text_chunks": text_chunks, "tables": tables}
all_results.append(("pdfplumber", result))
logger.info(f"✅ pdfplumber: {len(text_chunks)} chunks, {len(tables)} tables")
self._save_extraction_debug_log(file_path, result, "pdfplumber")
# Compare with current best result
total_text_length = sum(len(chunk) for chunk in text_chunks)
current_best_length = sum(len(chunk) for chunk in best_result.get('text_chunks', []))
if total_text_length > current_best_length:
best_result = result
except ImportError:
logger.warning("pdfplumber not available")
except Exception as e:
logger.warning(f"pdfplumber processing failed: {e}")
# Strategy 3: Enhanced PyPDF2 with better text cleaning
if PYPDF2_AVAILABLE and PyPDF2:
try:
logger.info("🔍 Attempting PyPDF2 extraction...")
text_chunks = []
with open(file_path, 'rb') as file:
pdf_reader = PyPDF2.PdfReader(file)
for i, page in enumerate(pdf_reader.pages):
try:
page_text = page.extract_text()
if page_text and len(page_text.strip()) > 20:
clean_text = self._clean_pdf_text(page_text)
if clean_text:
page_content = f"Page {i+1}:\n\n{clean_text}"
text_chunks.extend(self.chunk_text(page_content))
except Exception as page_error:
logger.warning(f"Error extracting page {i+1}: {page_error}")
if text_chunks:
result = {"text_chunks": text_chunks, "tables": []}
all_results.append(("pypdf2", result))
logger.info(f"✅ PyPDF2: {len(text_chunks)} chunks")
self._save_extraction_debug_log(file_path, result, "pypdf2")
# Compare with current best result (only if we don't have a good result yet)
total_text_length = sum(len(chunk) for chunk in text_chunks)
current_best_length = sum(len(chunk) for chunk in best_result.get('text_chunks', []))
if current_best_length < 100 and total_text_length > current_best_length:
best_result = result
except Exception as e:
logger.warning(f"PyPDF2 processing failed: {e}")
# Strategy 4: OCR fallback for scanned PDFs or difficult layouts
try:
logger.info("🔍 Attempting OCR extraction as fallback...")
ocr_result = self._ocr_pdf_extraction(file_path)
if ocr_result and ocr_result.get('text_chunks'):
all_results.append(("ocr", ocr_result))
logger.info(f"✅ OCR: {len(ocr_result['text_chunks'])} chunks")
self._save_extraction_debug_log(file_path, ocr_result, "ocr")
# Use OCR if we still don't have good results
current_best_length = sum(len(chunk) for chunk in best_result.get('text_chunks', []))
ocr_text_length = sum(len(chunk) for chunk in ocr_result.get('text_chunks', []))
if current_best_length < 100 and ocr_text_length > current_best_length:
best_result = ocr_result
except Exception as e:
logger.warning(f"OCR processing failed: {e}")
# Select and return the best result
if best_result.get('text_chunks'):
# Combine results if we have good tables from one method and good text from another
final_result = best_result.copy()
# Look for the best table extraction across all methods
all_tables = []
for method_name, result in all_results:
if result.get('tables'):
all_tables.extend(result['tables'])
if all_tables and len(all_tables) > len(final_result.get('tables', [])):
final_result['tables'] = all_tables
logger.info(f"🎯 Final PDF result: {len(final_result['text_chunks'])} chunks, {len(final_result.get('tables', []))} tables")
self._save_extraction_debug_log(file_path, final_result, "best-combined")
return final_result
# If all methods failed
logger.warning(f"All PDF extraction methods failed for {file_path}")
result = {"text_chunks": [], "tables": [], "error": "PDF processing failed - all extraction methods unsuccessful"}
self._save_extraction_debug_log(file_path, result, "all-failed")
return result
def _process_word_document(self, file_path):
"""Process Word documents"""
logger.info(f"Processing Word document: {file_path}")
# First try python-docx for .docx files
if DOCX_AVAILABLE and docx and file_path.suffix.lower() == '.docx':
try:
doc = docx.Document(file_path)
text_chunks = []
# Extract paragraphs
full_text = ""
for paragraph in doc.paragraphs:
if paragraph.text.strip():
full_text += paragraph.text + "\n"
# Extract tables
tables = []
for table in doc.tables:
table_data = []
for row in table.rows:
row_data = [cell.text.strip() for cell in row.cells]
table_data.append(row_data)
if table_data:
# Convert to markdown
markdown_table = self._table_to_markdown(table_data)
tables.append(markdown_table)
# Chunk the text
if full_text:
text_chunks.extend(self.chunk_text(full_text))
logger.info(f"✅ python-docx processed Word doc with {len(text_chunks)} text chunks and {len(tables)} tables")
result = {"text_chunks": text_chunks, "tables": tables}
self._save_extraction_debug_log(file_path, result, "python-docx")
return result
except Exception as e:
logger.warning(f"python-docx failed for {file_path}: {e}")
# Fallback: convert to PDF and process
logger.info(f"Falling back to PDF conversion for Word document: {file_path}")
pdf_path = self._convert_to_pdf(file_path)
if pdf_path:
result = self._process_pdf_document(pdf_path)
# Clean up temporary PDF
try:
pdf_path.unlink()
except:
pass
# Update debug log to show it was converted from Word
if 'text_chunks' in result or 'tables' in result:
self._save_extraction_debug_log(file_path, result, "word-to-pdf-conversion")
return result
result = {"text_chunks": [], "tables": [], "error": "Word document processing failed"}
self._save_extraction_debug_log(file_path, result, "word-failed")
return result
def _process_powerpoint(self, file_path):
"""Process PowerPoint documents"""
logger.info(f"Processing PowerPoint: {file_path}")
# Try python-pptx for .pptx files
if PPTX_AVAILABLE and pptx and file_path.suffix.lower() == '.pptx':
try:
presentation = pptx.Presentation(file_path)
text_chunks = []
tables = []
for i, slide in enumerate(presentation.slides):
slide_text = []
# Extract text from shapes
for shape in slide.shapes:
if hasattr(shape, 'text') and shape.text:
slide_text.append(shape.text)
# Extract tables
if hasattr(shape, 'has_table') and shape.has_table:
table_data = []
for row in shape.table.rows:
row_data = [cell.text.strip() for cell in row.cells]
table_data.append(row_data)
if table_data:
markdown_table = self._table_to_markdown(table_data)
tables.append(f"Slide {i+1} Table:\n{markdown_table}")
# Combine slide text
if slide_text:
slide_content = f"Slide {i+1}:\n" + "\n".join(slide_text)
text_chunks.extend(self.chunk_text(slide_content))
logger.info(f"✅ python-pptx processed PowerPoint with {len(text_chunks)} text chunks and {len(tables)} tables")
result = {"text_chunks": text_chunks, "tables": tables}
self._save_extraction_debug_log(file_path, result, "python-pptx")
return result
except Exception as e:
logger.warning(f"python-pptx failed for {file_path}: {e}")
# Fallback: convert to PDF and process
logger.info(f"Falling back to PDF conversion for PowerPoint: {file_path}")
pdf_path = self._convert_to_pdf(file_path)
if pdf_path:
result = self._process_pdf_document(pdf_path)
# Clean up temporary PDF
try:
pdf_path.unlink()
except:
pass
# Update debug log to show it was converted from PowerPoint
if 'text_chunks' in result or 'tables' in result:
self._save_extraction_debug_log(file_path, result, "powerpoint-to-pdf-conversion")
return result
result = {"text_chunks": [], "tables": [], "error": "PowerPoint processing failed"}
self._save_extraction_debug_log(file_path, result, "powerpoint-failed")
return result
def _process_excel(self, file_path):
"""Process Excel documents"""
logger.info(f"Processing Excel: {file_path}")
if PANDAS_AVAILABLE and pd:
try:
# Read all sheets
excel_file = pd.ExcelFile(file_path)
text_chunks = []
tables = []
for sheet_name in excel_file.sheet_names:
try:
df = pd.read_excel(file_path, sheet_name=sheet_name)
# Convert DataFrame to markdown table
if not df.empty:
# Add sheet name as header
table_text = f"Sheet: {sheet_name}\n\n"
table_text += df.to_markdown(index=False)
tables.append(table_text)
# Also create a text summary
summary = f"Sheet '{sheet_name}' contains {len(df)} rows and {len(df.columns)} columns.\n"
summary += f"Columns: {', '.join(df.columns.astype(str))}\n"
text_chunks.extend(self.chunk_text(summary))
except Exception as sheet_error:
logger.warning(f"Error processing sheet {sheet_name}: {sheet_error}")
logger.info(f"✅ pandas processed Excel with {len(text_chunks)} text chunks and {len(tables)} tables")
result = {"text_chunks": text_chunks, "tables": tables}
self._save_extraction_debug_log(file_path, result, "pandas")
return result
except Exception as e:
logger.warning(f"pandas Excel processing failed for {file_path}: {e}")
# Fallback: try with xlrd for older Excel files
try:
import xlrd
workbook = xlrd.open_workbook(file_path)
text_chunks = []
tables = []
for sheet_name in workbook.sheet_names():
sheet = workbook.sheet_by_name(sheet_name)
# Extract data as table
table_data = []
for row_idx in range(sheet.nrows):
row_data = []
for col_idx in range(sheet.ncols):
cell_value = sheet.cell_value(row_idx, col_idx)
row_data.append(str(cell_value))
table_data.append(row_data)
if table_data:
# Convert to markdown
markdown_table = self._table_to_markdown(table_data)
tables.append(f"Sheet: {sheet_name}\n\n{markdown_table}")
# Add summary
summary = f"Sheet '{sheet_name}' contains {sheet.nrows} rows and {sheet.ncols} columns.\n"
text_chunks.extend(self.chunk_text(summary))
logger.info(f"✅ xlrd processed Excel with {len(text_chunks)} text chunks and {len(tables)} tables")
result = {"text_chunks": text_chunks, "tables": tables}
self._save_extraction_debug_log(file_path, result, "xlrd")
return result
except ImportError:
logger.warning("xlrd not available for Excel processing")
except Exception as e:
logger.warning(f"xlrd Excel processing failed: {e}")
result = {"text_chunks": [], "tables": [], "error": "Excel processing failed - no Excel processing libraries available"}
self._save_extraction_debug_log(file_path, result, "excel-failed")
return result
def _table_to_markdown(self, table_data):
"""Convert a 2D array to a markdown table"""
if not table_data or not table_data[0]:
return ""
# Create header
markdown = "| " + " | ".join([str(cell) for cell in table_data[0]]) + " |\n"
# Add separator line
markdown += "| " + " | ".join(["---" for _ in table_data[0]]) + " |\n"
# Add data rows
for row in table_data[1:]:
markdown += "| " + " | ".join([str(cell) for cell in row]) + " |\n"
return markdown
def process_document(self, file_path):
"""
Process a document and extract text content
Args:
file_path: Path to the document file
Returns:
dict: {
'text_chunks': [list of text chunks],
'tables': [list of table content],
'metadata': {file info},
'error': optional error message
}
"""
try:
file_path = Path(file_path)
# Validate file exists
if not file_path.exists():
return {"error": "File not found"}
# Get file type
file_type = self._get_file_type(file_path)
if file_type == "unknown":
return {"error": f"Unsupported file type: {file_path.suffix}"}
# Process based on file type
if file_type == "pdf":
result = self._process_pdf_document(file_path)
elif file_type == "word":
result = self._process_word_document(file_path)
elif file_type == "powerpoint":
result = self._process_powerpoint(file_path)
elif file_type == "excel":
result = self._process_excel(file_path)
else:
return {"error": f"Unknown file type: {file_type}"}
# Add metadata
result['metadata'] = {
'filename': file_path.name,
'file_type': file_type,
'size': file_path.stat().st_size,
'extension': file_path.suffix.lower()
}
# Sanitize all text content
if 'text_chunks' in result:
result['text_chunks'] = [self.sanitize_text(chunk) for chunk in result['text_chunks']]
if 'tables' in result:
result['tables'] = [self.sanitize_text(table) for table in result['tables']]
# Save final processing summary if not already saved by specific method
if not any(key in result for key in ['debug_logged']):
self._save_extraction_debug_log(file_path, result, f"final-{file_type}")
return result
except Exception as e:
logger.error(f"Error processing document {file_path}: {e}")
error_result = {"error": f"Processing failed: {str(e)}"}
self._save_extraction_debug_log(file_path, error_result, "exception")
return error_result
def get_combined_text(self, processed_result):
"""
Get combined text content from processed document result
Args:
processed_result: Result from process_document()
Returns:
str: Combined text content
"""
if 'error' in processed_result:
return ""
combined_text = ""
# Add text chunks
if processed_result.get('text_chunks'):
combined_text += "\n\n".join(processed_result['text_chunks'])
# Add tables
if processed_result.get('tables'):
if combined_text:
combined_text += "\n\n"
combined_text += "\n\n".join(processed_result['tables'])
return combined_text.strip()
Parameters
| Name | Type | Default | Kind |
|---|---|---|---|
bases |
- | - |
Parameter Details
bases: Parameter of type
Return Value
Returns unspecified type
Class Interface
Methods
__init__(self, temp_dir, llmsherpa_api_url)
Purpose: Initialize the document processor Args: temp_dir: Directory for temporary files (optional) llmsherpa_api_url: URL for llmsherpa API
Parameters:
temp_dir: Parameterllmsherpa_api_url: Parameter
Returns: None
_log_available_capabilities(self)
Purpose: Log which document processing libraries are available
Returns: None
get_available_formats(self)
Purpose: Get list of supported file formats based on available libraries
Returns: None
_save_extraction_debug_log(self, file_path, processed_result, extraction_method)
Purpose: Save extraction results to debug log file for performance analysis Args: file_path: Original file path processed_result: Result from document processing extraction_method: Method used for extraction (e.g., 'llmsherpa', 'pypdf2', 'python-docx')
Parameters:
file_path: Parameterprocessed_result: Parameterextraction_method: Parameter
Returns: None
_get_file_extension(self, file_path)
Purpose: Get lowercase file extension including the dot
Parameters:
file_path: Parameter
Returns: None
_get_file_type(self, file_path)
Purpose: Determine file type based on extension
Parameters:
file_path: Parameter
Returns: None
count_tokens(self, text)
Purpose: Count tokens in text using tiktoken
Parameters:
text: Parameter
Returns: None
sanitize_text(self, text)
Purpose: Sanitize text by encoding to UTF-8 with error replacement
Parameters:
text: Parameter
Returns: None
chunk_text(self, text, max_chunk_size, overlap)
Purpose: Split text into chunks with overlap for better context preservation Args: text: Text to chunk max_chunk_size: Maximum characters per chunk overlap: Character overlap between chunks Returns: List of text chunks
Parameters:
text: Parametermax_chunk_size: Parameteroverlap: Parameter
Returns: See docstring for return details
_clean_pdf_text(self, text)
Purpose: Clean and normalize PDF extracted text Removes excessive whitespace, fixes common OCR issues, and improves readability
Parameters:
text: Parameter
Returns: None
_ocr_pdf_extraction(self, file_path)
Purpose: OCR-based PDF extraction using pymupdf (fitz) or other OCR libraries This is a fallback for scanned PDFs or when other methods fail
Parameters:
file_path: Parameter
Returns: None
_convert_to_pdf(self, input_file)
Purpose: Convert a document to PDF using LibreOffice
Parameters:
input_file: Parameter
Returns: None
_process_pdf_document(self, file_path)
Purpose: Enhanced PDF processing with multiple extraction strategies and table detection Uses a cascading approach to get the best possible text and table extraction
Parameters:
file_path: Parameter
Returns: None
_process_word_document(self, file_path)
Purpose: Process Word documents
Parameters:
file_path: Parameter
Returns: None
_process_powerpoint(self, file_path)
Purpose: Process PowerPoint documents
Parameters:
file_path: Parameter
Returns: None
_process_excel(self, file_path)
Purpose: Process Excel documents
Parameters:
file_path: Parameter
Returns: None
_table_to_markdown(self, table_data)
Purpose: Convert a 2D array to a markdown table
Parameters:
table_data: Parameter
Returns: None
process_document(self, file_path)
Purpose: Process a document and extract text content Args: file_path: Path to the document file Returns: dict: { 'text_chunks': [list of text chunks], 'tables': [list of table content], 'metadata': {file info}, 'error': optional error message }
Parameters:
file_path: Parameter
Returns: See docstring for return details
get_combined_text(self, processed_result)
Purpose: Get combined text content from processed document result Args: processed_result: Result from process_document() Returns: str: Combined text content
Parameters:
processed_result: Parameter
Returns: See docstring for return details
Required Imports
import os
import tempfile
import logging
from pathlib import Path
from uuid import uuid4
Usage Example
# Example usage:
# result = DocumentProcessor(bases)
Tags
Similar Components
AI-powered semantic similarity - components with related functionality:
-
class DocumentProcessor_v7 67.3% similar
-
class DocumentProcessor 64.9% similar
-
class DocumentProcessor_v5 62.2% similar
-
class DocumentProcessor_v3 62.2% similar
-
function api_chat_upload_document 62.1% similar