class DocumentProcessor_v5
Process different document types for RAG context extraction
/tf/active/vicechatdev/offline_docstore_multi.py
189 - 1177
moderate
Purpose
Process different document types for RAG context extraction
Source Code
class DocumentProcessor:
"""Process different document types for RAG context extraction"""
# Supported file extensions by type
WORD_EXTENSIONS = ['.doc', '.docx', '.docm', '.dot', '.dotx', '.dotm', '.rtf', '.odt']
PPT_EXTENSIONS = ['.ppt', '.pptx', '.pptm', '.pot', '.potx', '.potm', '.pps', '.ppsx', '.odp']
EXCEL_EXTENSIONS = ['.xls', '.xlsx', '.xlsm', '.xlt', '.xltx', '.xltm', '.xlsb', '.ods']
PDF_EXTENSIONS = ['.pdf']
def __init__(self, input_dir, output_dir, temp_dir=None, llmsherpa_api_url=None):
"""
Initialize the document processor
Args:
input_dir: Directory with source documents
output_dir: Directory to save processed chunks
temp_dir: Directory for temporary files (optional)
llmsherpa_api_url: URL for llmsherpa API
"""
self.input_dir = Path(input_dir).absolute()
self.output_dir = Path(output_dir).absolute()
self.temp_dir = Path(temp_dir) if temp_dir else Path(tempfile.mkdtemp())
self.llmsherpa_api_url = llmsherpa_api_url or "http://llmsherpa:5001/api/parseDocument?renderFormat=all&useNewIndentParser=yes"
# Create directories if they don't exist
os.makedirs(self.output_dir, exist_ok=True)
os.makedirs(self.temp_dir, exist_ok=True)
def _get_file_extension(self, file_path):
"""Get lowercase file extension including the dot"""
return Path(file_path).suffix.lower()
def _is_valid_file(self, file_path):
"""Check if a file appears to be valid and processable with better format detection"""
try:
# Check if file exists and has non-zero size
path = Path(file_path)
if not path.exists() or path.stat().st_size == 0:
logger.warning(f"File doesn't exist or is empty: {file_path}")
return False
# Check if file is readable
with open(file_path, 'rb') as f:
# Read first few bytes to see if it's readable
header = f.read(16)
if not header:
logger.warning(f"File appears unreadable: {file_path}")
return False
# Additional checks for specific file types
extension = path.suffix.lower()
# PowerPoint files (.pptx) should start with PK header (zip file)
if extension == '.pptx':
if header[:2] != b'PK':
logger.warning(f"File has .pptx extension but isn't a ZIP file: {file_path}")
return True # Still return True to allow fallback handling
# Excel files (.xlsx) should start with PK header (zip file)
if extension == '.xlsx':
if header[:2] != b'PK':
logger.warning(f"File has .xlsx extension but isn't a ZIP file. "
f"It might be an old format Excel file: {file_path}")
return True # Still return True to allow fallback handling
# Check if the file is an old Excel format (.xls) with the wrong extension
# Old Excel files often start with D0CF11E0 (in hex)
if extension == '.xlsx' and header[:8] == b'\xD0\xCF\x11\xE0\xA1\xB1\x1A\xE1':
logger.warning(f"File has .xlsx extension but appears to be an old .xls format: {file_path}")
return True # Return True to allow processing with xlrd
return True
except Exception as e:
logger.error(f"Error checking file validity: {file_path} - {str(e)}")
return False
def _get_file_type(self, file_path):
"""Determine file type based on extension"""
ext = self._get_file_extension(file_path)
if ext in self.WORD_EXTENSIONS:
return "word"
elif ext in self.PPT_EXTENSIONS:
return "powerpoint"
elif ext in self.EXCEL_EXTENSIONS:
return "excel"
elif ext in self.PDF_EXTENSIONS:
return "pdf"
else:
return "unknown"
def _convert_to_pdf(self, input_file):
"""Convert a document to PDF using LibreOffice with unoconv fallback"""
input_path = Path(input_file)
output_pdf = self.temp_dir / f"{input_path.stem}.pdf"
# First attempt: Use LibreOffice directly
try:
# First check if the input file exists
if not input_path.exists():
logger.error(f"Input file does not exist: {input_path}")
return None
# Make sure the temp directory exists
os.makedirs(self.temp_dir, exist_ok=True)
# Absolute paths to avoid directory issues
abs_input = input_path.absolute()
abs_output_dir = self.temp_dir.absolute()
logger.info(f"Converting {abs_input} to PDF in {abs_output_dir}")
# Use LibreOffice for conversion with expanded command
cmd = [
'libreoffice',
'--headless',
'--norestore',
'--nofirststartwizard',
'--convert-to', 'pdf',
'--outdir', str(abs_output_dir),
str(abs_input)
]
# Run with increased timeout
process = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=180 # 3 minute timeout
)
# Check if there was an error
if process.returncode != 0:
logger.error(f"LibreOffice error: {process.stderr}")
# Don't return None here, try unoconv instead
else:
# Verify the file was actually created
if not output_pdf.exists():
# Sometimes LibreOffice creates files with slightly different names
# Try to find a matching PDF
potential_pdfs = list(self.temp_dir.glob(f"{input_path.stem}*.pdf"))
if potential_pdfs:
output_pdf = potential_pdfs[0]
logger.info(f"Found PDF with alternative name: {output_pdf}")
return output_pdf
# If no file was found, continue to unoconv
else:
logger.info(f"Successfully converted to PDF with LibreOffice: {output_pdf}")
return output_pdf
# If we get here, LibreOffice failed or didn't create the file
logger.info(f"Trying alternative conversion with unoconv for {input_path}")
# Second attempt: Use unoconv
try:
alt_output_pdf = self.temp_dir / f"{input_path.stem}_unoconv.pdf"
# Run unoconv with timeout
unoconv_cmd = [
'unoconv',
'-f', 'pdf',
'-o', str(alt_output_pdf),
str(abs_input)
]
unoconv_process = subprocess.run(
unoconv_cmd,
capture_output=True,
text=True,
timeout=180 # 3 minute timeout
)
if unoconv_process.returncode != 0:
logger.error(f"Unoconv error: {unoconv_process.stderr}")
return None
if alt_output_pdf.exists():
logger.info(f"Successfully converted to PDF with unoconv: {alt_output_pdf}")
return alt_output_pdf
else:
# Check for alternative names again
potential_pdfs = list(self.temp_dir.glob(f"{input_path.stem}*unoconv*.pdf"))
if potential_pdfs:
alt_output_pdf = potential_pdfs[0]
logger.info(f"Found PDF with alternative name from unoconv: {alt_output_pdf}")
return alt_output_pdf
logger.error(f"Unoconv did not create a PDF file.")
return None
except subprocess.TimeoutExpired:
logger.error(f"Timeout while converting with unoconv: {input_path}")
return None
except Exception as unoconv_err:
logger.error(f"Failed to convert with unoconv: {str(unoconv_err)}")
return None
except subprocess.TimeoutExpired:
logger.error(f"Timeout while converting with LibreOffice: {input_path}")
# Try unoconv as a fallback for timeout
try:
logger.info(f"Trying unoconv after LibreOffice timeout for {input_path}")
alt_output_pdf = self.temp_dir / f"{input_path.stem}_unoconv.pdf"
subprocess.run(
['unoconv', '-f', 'pdf', '-o', str(alt_output_pdf), str(input_path)],
check=True,
timeout=180
)
if alt_output_pdf.exists():
logger.info(f"Successfully converted to PDF with unoconv after timeout: {alt_output_pdf}")
return alt_output_pdf
else:
return None
except:
return None
except Exception as e:
logger.error(f"Failed to convert with LibreOffice: {str(e)}")
# Try unoconv as a fallback for general errors
try:
logger.info(f"Trying unoconv after LibreOffice error for {input_path}")
alt_output_pdf = self.temp_dir / f"{input_path.stem}_unoconv.pdf"
subprocess.run(
['unoconv', '-f', 'pdf', '-o', str(alt_output_pdf), str(input_path)],
check=True,
timeout=180
)
if alt_output_pdf.exists():
logger.info(f"Successfully converted to PDF with unoconv after error: {alt_output_pdf}")
return alt_output_pdf
else:
return None
except Exception as unoconv_err:
logger.error(f"All conversion methods failed for {input_path}: {str(unoconv_err)}")
return None
def _process_word_document(self, file_path):
"""Process Word documents by converting to PDF and using llmsherpa"""
logger.info(f"Processing Word document: {file_path}")
# Convert to PDF
pdf_path = self._convert_to_pdf(file_path)
if not pdf_path:
return None
# Process using llmsherpa (same as PDF processing)
return self._process_pdf_document(pdf_path)
def _process_pdf_document(self, file_path):
"""Process PDF documents using llmsherpa"""
logger.info(f"Processing PDF: {file_path}")
# Use llmsherpa to extract content
llmsherpa_api_url = "http://llmsherpa:5001/api/parseDocument?renderFormat=all&useNewIndentParser=yes"
pdf_reader = LayoutPDFReader(llmsherpa_api_url)
min_chunk_len=4000
try:
doc = pdf_reader.read_pdf(str(file_path))
except Exception as e:
print(f"Error processing PDF {file_path}: {str(e)}")
return None
try:
text_chunks=[]
table_chunks=[]
text_chunk_interim=""
for d in doc.chunks():
## Adding a minimum chunk lenght
if isinstance(d,llmsherpa.readers.layout_reader.Paragraph):
clean_text=d.to_text().replace("- ","").replace("\n","")
text_chunk_interim=clean_text if text_chunk_interim=="" else text_chunk_interim+"\n"+clean_text
if len(text_chunk_interim)>min_chunk_len:
newuid=str(uuid4())
text_chunks.append([d.parent_text(),text_chunk_interim,newuid])
text_chunk_interim=""
if isinstance(d,llmsherpa.readers.layout_reader.Table):
newuid=str(uuid4())
table_chunks.append([d.parent_text(),d.to_html(),d.to_text(),newuid])
if text_chunk_interim!="":
newuid=str(uuid4())
text_chunks.append([d.parent_text(),text_chunk_interim,newuid])
return {"text_chunks": text_chunks, "table_chunks": table_chunks}
except Exception as e:
print(f"Error processing PDF {file_path}: {str(e)}")
return None
def _process_powerpoint(self, file_path):
logger.info(f"Processing PowerPoint: {file_path}")
print("processing ", file_path)
# First try the standard python-pptx approach
try:
text_chunks = []
table_chunks = []
# Try to open the presentation with increased timeout and error handling
try:
presentation = pptx.Presentation(file_path)
except Exception as e:
logger.error(f"Error opening PowerPoint with python-pptx: {str(e)}")
# Fall back to conversion method
return self._process_powerpoint_via_pdf(file_path)
# Process each slide
for i, slide in enumerate(presentation.slides):
# Get slide title or default
try:
slide_title = next((shape.text for shape in slide.shapes
if shape.has_text_frame and shape.text and
getattr(shape, "is_title", False)), f"Slide {i+1}")
except Exception as slide_err:
logger.warning(f"Error getting slide title: {str(slide_err)}")
slide_title = f"Slide {i+1}"
# First identify all tables to exclude them from text extraction
tables = []
try:
tables = [shape for shape in slide.shapes if hasattr(shape, "has_table") and shape.has_table]
table_ids = set(id(table) for table in tables)
except Exception as table_err:
logger.warning(f"Error identifying tables in slide {i+1}: {str(table_err)}")
table_ids = set()
# Extract all text from shapes on this slide (excluding tables and titles)
text_content = []
for shape in slide.shapes:
try:
# Skip tables and titles
if (id(shape) in table_ids or
(hasattr(shape, "text") and shape.text == slide_title)):
continue
# Add text from non-table shapes
if hasattr(shape, "text") and shape.text:
text_content.append(shape.text)
except Exception as shape_err:
logger.warning(f"Error processing shape in slide {i+1}: {str(shape_err)}")
# Combine all text from this slide into a single chunk
combined_text = "\n".join(text_content)
if combined_text.strip(): # Only add if there's meaningful text
newuid = str(uuid4())
parent_text = f"Slide {i+1}: {slide_title}"
text_chunks.append([parent_text.replace("'","`"), combined_text.replace("'","`"), newuid])
# Process tables separately
for shape in tables:
try:
table_data = []
for r, row in enumerate(shape.table.rows):
row_data = []
for c, cell in enumerate(row.cells):
if cell.text:
row_data.append(cell.text)
else:
row_data.append("")
table_data.append(row_data)
# Convert to markdown
markdown_table = self._table_to_markdown(table_data)
newuid = str(uuid4())
parent_text = f"Slide {i+1}: {slide_title}"
table_chunks.append([parent_text.replace("'","`"), markdown_table.replace("'","`"), "", newuid])
except Exception as table_process_err:
logger.warning(f"Error processing table in slide {i+1}: {str(table_process_err)}")
return {"text_chunks": text_chunks, "table_chunks": table_chunks}
except Exception as e:
logger.error(f"Error processing PowerPoint {file_path}: {str(e)}")
# Fall back to conversion method
return self._process_powerpoint_via_pdf(file_path)
def _process_powerpoint_via_pdf(self, file_path):
"""Fall back method to process PowerPoint by converting to PDF first"""
logger.info(f"Falling back to PDF conversion for PowerPoint: {file_path}")
try:
# Convert PowerPoint to PDF
pdf_path = self._convert_to_pdf(file_path)
if not pdf_path:
return None
# Process the PDF with llmsherpa
result = self._process_pdf_document(pdf_path)
# Clean up the temporary PDF file
try:
os.remove(pdf_path)
except:
pass
return result
except Exception as e:
logger.error(f"Error in PowerPoint fallback processing: {str(e)}")
return None
def _table_to_markdown(self, table_data):
"""Convert a 2D array to a markdown table"""
if not table_data or not table_data[0]:
return "| |"
# Create header
markdown = "| " + " | ".join([str(cell) for cell in table_data[0]]) + " |\n"
# Add separator line
markdown += "| " + " | ".join(["---" for _ in table_data[0]]) + " |\n"
# Add data rows
for row in table_data[1:]:
markdown += "| " + " | ".join([str(cell) for cell in row]) + " |\n"
return markdown
def _process_excel(self, file_path):
"""Process Excel files with fallback for problematic files"""
logger.info(f"Processing Excel: {file_path}")
# Try pandas first (for modern Excel formats)
try:
# Load the Excel file using pandas
excel_file = pd.ExcelFile(file_path)
table_chunks = []
# Process each sheet
for sheet_name in excel_file.sheet_names:
df = pd.read_excel(excel_file, sheet_name=sheet_name)
# Convert to markdown
markdown = df.to_markdown(index=False).replace("'","`")
newuid = str(uuid4())
table_chunks.append([sheet_name.replace("'","`"), markdown, "", newuid])
return {"table_chunks": table_chunks}
except Exception as e:
logger.warning(f"Error processing Excel with pandas: {file_path}: {str(e)}")
logger.info(f"Trying fallback method for Excel: {file_path}")
# Try xlrd as fallback for older or problematic Excel files
return self._process_excel_with_xlrd(file_path)
def _process_excel_with_xlrd(self, file_path):
"""Fallback processing for Excel files using xlrd with extension adaptation"""
try:
# Import xlrd here to avoid dependency issues if not installed
try:
import xlrd
import shutil
except ImportError:
logger.error("xlrd library not installed. Install with: pip install xlrd==1.2.0")
return None
# Create a temporary file with .xls extension regardless of original extension
# This helps xlrd handle the file correctly based on content rather than extension
temp_xls_path = self.temp_dir / f"{Path(file_path).stem}_copy.xls"
try:
# Copy to a temporary file with .xls extension
shutil.copy2(file_path, temp_xls_path)
logger.info(f"Created temporary .xls copy at {temp_xls_path}")
# Open the workbook with xlrd using the .xls copy
workbook = xlrd.open_workbook(str(temp_xls_path))
except Exception as copy_err:
logger.error(f"Error creating .xls copy: {str(copy_err)}")
# Try with original file as last resort
workbook = xlrd.open_workbook(str(file_path))
table_chunks = []
# Process each sheet
for sheet_name in workbook.sheet_names():
sheet = workbook.sheet_by_name(sheet_name)
# Convert sheet data to a list of lists
data = []
headers = []
# Get headers (first row) if sheet has data
if sheet.nrows > 0:
for col in range(sheet.ncols):
headers.append(str(sheet.cell_value(0, col)))
data.append(headers)
# Get data rows
for row in range(1, sheet.nrows):
row_data = []
for col in range(sheet.ncols):
cell_value = sheet.cell_value(row, col)
# Handle date values which are stored as floats in xls
if sheet.cell_type(row, col) == xlrd.XL_CELL_DATE:
try:
dt_tuple = xlrd.xldate_as_tuple(cell_value, workbook.datemode)
# Format as ISO date string
if dt_tuple[3] == 0 and dt_tuple[4] == 0 and dt_tuple[5] == 0:
# Just date without time
cell_value = f"{dt_tuple[0]}-{dt_tuple[1]:02d}-{dt_tuple[2]:02d}"
else:
# Date and time
cell_value = f"{dt_tuple[0]}-{dt_tuple[1]:02d}-{dt_tuple[2]:02d} {dt_tuple[3]:02d}:{dt_tuple[4]:02d}:{dt_tuple[5]:02d}"
except:
# If date conversion fails, just use the string value
cell_value = str(cell_value)
row_data.append(str(cell_value).replace("'","`"))
data.append(row_data)
else:
# Handle empty sheets
data = [["Empty Sheet"]]
# Convert to markdown table
markdown = self._list_to_markdown(data)
newuid = str(uuid4())
table_chunks.append([sheet_name.replace("'","`"), markdown, "", newuid])
# Clean up temporary file
try:
if temp_xls_path.exists():
os.remove(temp_xls_path)
except:
pass
return {"table_chunks": table_chunks}
except Exception as e:
logger.error(f"Error processing Excel with xlrd fallback: {file_path}: {str(e)}")
# If even xlrd fails, try LibreOffice conversion to CSV as last resort
return self._process_excel_via_conversion(file_path)
def _list_to_markdown(self, data):
"""Convert a 2D list to a markdown table"""
if not data or len(data) == 0:
return "| Empty Table |"
if len(data[0]) == 0:
return "| Empty Table |"
# Create header
markdown = "| " + " | ".join([str(cell) for cell in data[0]]) + " |\n"
# Add separator line
markdown += "| " + " | ".join(["---" for _ in data[0]]) + " |\n"
# Add data rows
for row in data[1:]:
# Make sure row has enough cells to match header
while len(row) < len(data[0]):
row.append("")
# Limit row to number of header columns
row = row[:len(data[0])]
markdown += "| " + " | ".join([str(cell) for cell in row]) + " |\n"
return markdown
def _process_excel_via_conversion(self, file_path):
"""Last resort for Excel processing via LibreOffice conversion to CSV"""
logger.info(f"Attempting to convert Excel to CSV as last resort: {file_path}")
try:
# Create a temp directory for CSV output
csv_dir = self.temp_dir / "csv_output"
os.makedirs(csv_dir, exist_ok=True)
# Use LibreOffice to convert to CSV
cmd = [
'libreoffice',
'--headless',
'--convert-to', 'csv',
'--outdir', str(csv_dir),
str(file_path)
]
process = subprocess.run(cmd, capture_output=True, text=True, timeout=180)
# Find the generated CSV files
csv_files = list(csv_dir.glob("*.csv"))
if not csv_files:
logger.error(f"No CSV files generated from {file_path}")
return self._create_minimal_excel_representation(file_path)
table_chunks = []
# Process each CSV file
for csv_file in csv_files:
try:
# Try to read the CSV
df = pd.read_csv(csv_file, encoding='utf-8')
# Convert to markdown
markdown = df.to_markdown(index=False).replace("'","`")
sheet_name = csv_file.stem.replace(Path(file_path).stem, "").lstrip("_").replace("'","`")
if not sheet_name:
sheet_name = "Sheet1"
newuid = str(uuid4())
table_chunks.append([sheet_name, markdown, "", newuid])
except:
# If CSV read fails, create basic representation
content = "CSV conversion failed for this sheet"
newuid = str(uuid4())
sheet_name = csv_file.stem.replace(Path(file_path).stem, "").lstrip("_")
if not sheet_name:
sheet_name = "Sheet1"
table_chunks.append([sheet_name.replace("'","`"), content.replace("'","`"), "", newuid])
# Clean up CSV files
for csv_file in csv_files:
try:
os.remove(csv_file)
except:
pass
if table_chunks:
return {"table_chunks": table_chunks}
else:
return self._create_minimal_excel_representation(file_path)
except Exception as e:
logger.error(f"Error in Excel CSV conversion: {str(e)}")
return self._create_minimal_excel_representation(file_path)
def _create_minimal_excel_representation(self, file_path):
"""Create minimal representation for Excel files that cannot be processed"""
logger.warning(f"Creating minimal representation for: {file_path}")
newuid = str(uuid4())
error_text = f"| Error Processing Excel File |\n|---|\n| The file {Path(file_path).name} could not be processed. |"
table_chunks = [[
"Error",
error_text.replace("'","`"),
"",
newuid
]]
return {"table_chunks": table_chunks}
def process_document(self, file_path):
"""Process a document based on its type and return structured data"""
file_path = Path(file_path)
# Validate file before processing
if not self._is_valid_file(file_path):
logger.error(f"Invalid or corrupted file, skipping: {file_path}")
return None
file_type = self._get_file_type(file_path)
if file_type == "unknown":
logger.warning(f"Unsupported file type: {file_path}")
return None
# Process based on file type
if file_type == "word":
result = self._process_word_document(file_path)
elif file_type == "powerpoint":
result = self._process_powerpoint(file_path)
elif file_type == "excel":
result = self._process_excel(file_path)
elif file_type == "pdf":
result = self._process_pdf_document(file_path)
# Add metadata
if result:
result["file_path"] = str(file_path)
result["file_type"] = file_type
result["file_name"] = file_path.name
return result
def process_documents(self, file_paths, max_workers=4):
"""Process multiple documents in parallel"""
results = {}
with ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_file = {executor.submit(self.process_document, file): file for file in file_paths}
for future in concurrent.futures.as_completed(future_to_file):
file = future_to_file[future]
try:
result = future.result()
if result:
results[file] = result
except Exception as e:
logger.error(f"Error processing {file}: {str(e)}")
return results
def sanitize_text(self,text):
"""
Sanitize text by encoding to UTF-8 with error replacement and decoding back.
This replaces any characters that might cause ASCII encoding errors.
"""
return text.encode("utf-8", errors="replace").decode("utf-8")
def summarize_text(self,text, api_key, max_tokens_summary=8192):
"""
Summarize the input text using the GPT-4o-mini summarizer.
The summary will be limited to under max_tokens_summary tokens.
"""
# Prepare the summarization prompt
llm = ChatOpenAI(model_name="gpt-4o-mini", temperature=0,api_key=api_key)
text=self.sanitize_text(text)
prompt = (
f"Please summarize the following text such that the summary is under {max_tokens_summary} tokens:\n\n{text}"
)
# Call the ChatCompletion API with the GPT-4o-mini model
response = llm.invoke(prompt)
summary = response.content.strip()
return summary
def count_tokens(self,text):
encoding = tiktoken.get_encoding("cl100k_base")
return len(encoding.encode(text))
def generate_embeddings(self, session, api_key):
"""Generate embeddings for text and table chunks using OpenAI API"""
import openai
import time
openai.api_key = api_key
# Get chunks without embeddings
text_chunks = run_query(session,"""
MATCH (c:Text_chunk)
WHERE c.embedding IS NULL
RETURN c.UID as uid, c.Text as text
LIMIT 100
""").data()
table_chunks = run_query(session,"""
MATCH (c:Table_chunk)
WHERE c.embedding IS NULL
RETURN c.UID as uid, c.Text as text, c.Html as html
LIMIT 100
""").data()
logger.info(f"Found {len(text_chunks)} text chunks and {len(table_chunks)} table chunks without embeddings")
# Process text chunks
for chunk in text_chunks:
try:
# Rate limit to avoid API errors (3 requests per second)
#time.sleep(0.1)
content=chunk['text']
if len(content) > 1000000:
content = content[:1000000]
logger.warning(f"Shrinking table chunk {chunk['uid']} due to token limit")
while self.count_tokens(content) > 110000:
content = content[:-1000]
# Create embedding
if self.count_tokens(chunk['text']) > 8192:
logger.warning(f"Summarizing text chunk {chunk['uid']} due to token limit")
content=self.summarize_text(content, api_key)
response = openai.embeddings.create(
model="text-embedding-3-small",
input=content
)
embedding = response.data[0].embedding
# Update the node
run_query(session,f"""
MATCH (c:Text_chunk {{UID: '{chunk['uid']}'}})
SET c.embedding = {embedding}
""")
logger.info(f"Added embedding for Text_chunk {chunk['uid']}")
except Exception as e:
logger.error(f"Error creating embedding for text chunk {chunk['uid']}: {str(e)}")
# Process table chunks
for chunk in table_chunks:
try:
# Rate limit to avoid API errors (3 requests per second)
#time.sleep(0.1)
# Use text if available, otherwise HTML
content = chunk['text'] if chunk['text'] and len(chunk['text']) > 0 else chunk['html']
if len(content) > 1000000:
content = content[:1000000]
logger.warning(f"Shrinking table chunk {chunk['uid']} due to token limit")
while self.count_tokens(content) > 110000:
content = content[:-1000]
# Create embedding
if self.count_tokens(content) > 8192:
logger.warning(f"Summarizing text chunk {chunk['uid']} due to token limit")
content=self.summarize_text(content, api_key)
response = openai.embeddings.create(
model="text-embedding-3-small",
input=content
)
embedding = response.data[0].embedding
# Update the node
run_query(session,f"""
MATCH (c:Table_chunk {{UID: '{chunk['uid']}'}})
SET c.embedding = {embedding}
""")
logger.info(f"Added embedding for Table_chunk {chunk['uid']}")
except Exception as e:
logger.error(f"Error creating embedding for table chunk {chunk['uid']}: {str(e)}")
# Return counts of updated nodes
return len(text_chunks), len(table_chunks)
def get_all_bibtex(self, session):
"""
Get BibTeX entries from all Publication nodes
Parameters:
-----------
session : Neo4j session
Active database session
Returns:
--------
dict
Dictionary mapping document UIDs to their BibTeX entries
"""
# Query to get all Publication nodes with their BibTeX entries
query = """
MATCH (doc:Publication)
WHERE doc.BibTex IS NOT NULL
RETURN doc.UID AS doc_uid, doc.BibTex AS bibtex
"""
results = run_query(session, query).data()
# Create a dictionary mapping document UIDs to BibTeX entries
bibtex_entries = {}
for result in results:
doc_uid = result['doc_uid']
bibtex = result['bibtex']
bibtex_entries[doc_uid] = bibtex
return bibtex_entries
def get_all_document_paths(self,session):
"""
Get the full paths from Rootfolder to all Documents
Parameters:
-----------
session : Neo4j session
Active database session
Returns:
--------
dict
Dictionary mapping document UIDs to their full paths
"""
# Query to get path nodes from Rootfolder to all Documents
query = """
MATCH path = (root:Rootfolder {Name: 'T001'})-[:PATH*]->(doc:Document)
WITH doc.UID AS doc_uid, nodes(path) AS pathNodes
UNWIND range(0, size(pathNodes)-1) AS i
WITH doc_uid, pathNodes[i].Name AS name, i
ORDER BY doc_uid, i
RETURN doc_uid, collect(name) AS path_names
"""
results = run_query(session, query).data()
# Create a dictionary mapping document UIDs to path strings
document_paths = {}
for result in results:
doc_uid = result['doc_uid']
path_components = result['path_names']
path_string = '/'.join(path_components)
document_paths[doc_uid] = path_string
return document_paths
def batch_generate_embeddings(self, session, api_key, chroma_collection ,batch_size=100):
#self.doc_paths=self.get_all_document_paths(session)
self.doc_paths=self.get_all_bibtex(session)
total_text = 0
total_tables = 0
while True:
#text_count, table_count = self.generate_embeddings(session, api_key)
text_count, table_count = self.embed_in_chroma(chroma_collection,session)
total_text += text_count
total_tables += table_count
# If no more chunks to process, break
if text_count == 0 and table_count == 0:
break
logger.info(f"Processed batch: {text_count} text chunks, {table_count} table chunks")
logger.info(f"Completed embedding generation: {total_text} text chunks, {total_tables} table chunks")
return total_text, total_tables
def embed_in_chroma(self,chroma_collection,session):
# Get chunks without embeddings
text_chunks = run_query(session,"""
MATCH (d:Publication)-->(c:Text_chunk)
WHERE c.embedding IS NULL
RETURN c.UID as uid, c.Text as text,d.UID as puid
LIMIT 100
""").data()
table_chunks = run_query(session,"""
MATCH (d:Publication)-->(c:Table_chunk)
WHERE c.embedding IS NULL
RETURN c.UID as uid, c.Text as text, c.Html as html,d.UID as puid
LIMIT 100
""").data()
doclist=[]
id_list=[]
metadata_list=[]
for chunk in text_chunks:
doclist.append(chunk['text'])
id_list.append(chunk['uid'])
metadata_list.append({'bibtex':str(self.doc_paths[chunk['puid']])})
# Update the node
run_query(session,f"""
MATCH (c:Text_chunk {{UID: '{chunk['uid']}'}})
SET c.embedding = 'yes'
""")
try:
# Create embedding
collection.add(
documents=doclist,
metadatas=metadata_list ,
ids=id_list
)
logger.info(f"Added embedding for list of table chunks")
except Exception as e:
logger.error(f"Error creating embedding for text chunk with Chroma: {str(e)}")
doclist=[]
id_list=[]
metadata_list=[]
# Process table chunks
for chunk in table_chunks:
doclist.append(chunk['text'] if chunk['text'] and len(chunk['text']) > 0 else chunk['html'])
id_list.append(chunk['uid'])
metadata_list.append({'bibtex':str(self.doc_paths[chunk['puid']])})
# Update the node
run_query(session,f"""
MATCH (c:Table_chunk {{UID: '{chunk['uid']}'}})
SET c.embedding = 'yes'
""")
try:
# Create embedding
collection.add(
documents=doclist,
metadatas=metadata_list ,
ids=id_list
)
logger.info(f"Added embedding for list of table chunks")
except Exception as e:
logger.error(f"Error creating embedding for text chunk with Chroma: {str(e)}")
return len(text_chunks), len(table_chunks)
Parameters
| Name | Type | Default | Kind |
|---|---|---|---|
bases |
- | - |
Parameter Details
bases: Parameter of type
Return Value
Returns unspecified type
Class Interface
Methods
__init__(self, input_dir, output_dir, temp_dir, llmsherpa_api_url)
Purpose: Initialize the document processor Args: input_dir: Directory with source documents output_dir: Directory to save processed chunks temp_dir: Directory for temporary files (optional) llmsherpa_api_url: URL for llmsherpa API
Parameters:
input_dir: Parameteroutput_dir: Parametertemp_dir: Parameterllmsherpa_api_url: Parameter
Returns: None
_get_file_extension(self, file_path)
Purpose: Get lowercase file extension including the dot
Parameters:
file_path: Parameter
Returns: None
_is_valid_file(self, file_path)
Purpose: Check if a file appears to be valid and processable with better format detection
Parameters:
file_path: Parameter
Returns: None
_get_file_type(self, file_path)
Purpose: Determine file type based on extension
Parameters:
file_path: Parameter
Returns: None
_convert_to_pdf(self, input_file)
Purpose: Convert a document to PDF using LibreOffice with unoconv fallback
Parameters:
input_file: Parameter
Returns: None
_process_word_document(self, file_path)
Purpose: Process Word documents by converting to PDF and using llmsherpa
Parameters:
file_path: Parameter
Returns: None
_process_pdf_document(self, file_path)
Purpose: Process PDF documents using llmsherpa
Parameters:
file_path: Parameter
Returns: None
_process_powerpoint(self, file_path)
Purpose: Internal method: process powerpoint
Parameters:
file_path: Parameter
Returns: None
_process_powerpoint_via_pdf(self, file_path)
Purpose: Fall back method to process PowerPoint by converting to PDF first
Parameters:
file_path: Parameter
Returns: None
_table_to_markdown(self, table_data)
Purpose: Convert a 2D array to a markdown table
Parameters:
table_data: Parameter
Returns: None
_process_excel(self, file_path)
Purpose: Process Excel files with fallback for problematic files
Parameters:
file_path: Parameter
Returns: None
_process_excel_with_xlrd(self, file_path)
Purpose: Fallback processing for Excel files using xlrd with extension adaptation
Parameters:
file_path: Parameter
Returns: None
_list_to_markdown(self, data)
Purpose: Convert a 2D list to a markdown table
Parameters:
data: Parameter
Returns: None
_process_excel_via_conversion(self, file_path)
Purpose: Last resort for Excel processing via LibreOffice conversion to CSV
Parameters:
file_path: Parameter
Returns: None
_create_minimal_excel_representation(self, file_path)
Purpose: Create minimal representation for Excel files that cannot be processed
Parameters:
file_path: Parameter
Returns: None
process_document(self, file_path)
Purpose: Process a document based on its type and return structured data
Parameters:
file_path: Parameter
Returns: See docstring for return details
process_documents(self, file_paths, max_workers)
Purpose: Process multiple documents in parallel
Parameters:
file_paths: Parametermax_workers: Parameter
Returns: None
sanitize_text(self, text)
Purpose: Sanitize text by encoding to UTF-8 with error replacement and decoding back. This replaces any characters that might cause ASCII encoding errors.
Parameters:
text: Parameter
Returns: None
summarize_text(self, text, api_key, max_tokens_summary)
Purpose: Summarize the input text using the GPT-4o-mini summarizer. The summary will be limited to under max_tokens_summary tokens.
Parameters:
text: Parameterapi_key: Parametermax_tokens_summary: Parameter
Returns: None
count_tokens(self, text)
Purpose: Performs count tokens
Parameters:
text: Parameter
Returns: None
generate_embeddings(self, session, api_key)
Purpose: Generate embeddings for text and table chunks using OpenAI API
Parameters:
session: Parameterapi_key: Parameter
Returns: None
get_all_bibtex(self, session)
Purpose: Get BibTeX entries from all Publication nodes Parameters: ----------- session : Neo4j session Active database session Returns: -------- dict Dictionary mapping document UIDs to their BibTeX entries
Parameters:
session: Parameter
Returns: See docstring for return details
get_all_document_paths(self, session)
Purpose: Get the full paths from Rootfolder to all Documents Parameters: ----------- session : Neo4j session Active database session Returns: -------- dict Dictionary mapping document UIDs to their full paths
Parameters:
session: Parameter
Returns: See docstring for return details
batch_generate_embeddings(self, session, api_key, chroma_collection, batch_size)
Purpose: Performs batch generate embeddings
Parameters:
session: Parameterapi_key: Parameterchroma_collection: Parameterbatch_size: Parameter
Returns: None
embed_in_chroma(self, chroma_collection, session)
Purpose: Performs embed in chroma
Parameters:
chroma_collection: Parametersession: Parameter
Returns: None
Required Imports
import os
import subprocess
import tempfile
from pathlib import Path
import pandas as pd
Usage Example
# Example usage:
# result = DocumentProcessor(bases)
Tags
Similar Components
AI-powered semantic similarity - components with related functionality:
-
class DocumentProcessor_v4 99.0% similar
-
class DocumentProcessor_v7 81.2% similar
-
class DocChatRAG 65.3% similar
-
class DocumentProcessor 62.9% similar
-
class DocumentProcessor_v3 62.4% similar