class DocumentAnalyzer
Analyze PDF documents using OCR and LLM
/tf/active/vicechatdev/mailsearch/document_analyzer.py
51 - 560
moderate
Purpose
Analyze PDF documents using OCR and LLM
Source Code
class DocumentAnalyzer:
"""Analyze PDF documents using OCR and LLM"""
def __init__(self, output_dir: str = "./output"):
"""Initialize the document analyzer"""
self.output_dir = Path(output_dir)
self.output_dir.mkdir(exist_ok=True)
# Initialize OpenAI client
if not OPENAI_AVAILABLE:
raise RuntimeError("OpenAI library is required")
self.openai_client = OpenAI(api_key=OPENAI_API_KEY)
logger.info(f"Initialized OpenAI client with model: {MODEL_NAME}")
# Initialize EasyOCR reader (supports multiple languages including Chinese)
if OCR_AVAILABLE:
logger.info("Initializing EasyOCR reader...")
self.ocr_reader = easyocr.Reader(['en', 'ch_sim'], gpu=False)
logger.info("EasyOCR reader initialized")
else:
raise RuntimeError("OCR libraries are required")
def extract_text_from_pdf(self, pdf_path: str, max_pages: int = 20) -> str:
"""
Extract text from PDF using OCR with enhanced signature detection
Args:
pdf_path: Path to PDF file
max_pages: Maximum number of pages to process
Returns:
Extracted text content
"""
logger.info(f"Extracting text from: {pdf_path}")
try:
# Convert PDF to images at higher DPI for better signature capture
images = convert_from_path(
pdf_path,
dpi=400, # Increased DPI for better signature recognition
first_page=1,
last_page=max_pages
)
logger.info(f"Converted {len(images)} pages to images")
all_text = []
all_detections = [] # Store all detections with positions for signature analysis
for i, image in enumerate(images):
logger.debug(f"Processing page {i+1} with OCR")
# Convert PIL Image to numpy array for EasyOCR
image_np = np.array(image)
# Use EasyOCR with lower confidence threshold to catch handwritten signatures
result = self.ocr_reader.readtext(
image_np,
paragraph=False, # Get individual text elements
text_threshold=0.6, # Lower threshold for handwritten text
low_text=0.3 # Even lower for very faint text
)
# Extract text from OCR results
page_text = []
page_detections = []
for detection in result:
bbox = detection[0] # Bounding box coordinates
text = detection[1] # Detected text
confidence = detection[2] # Confidence score
# Store detection with metadata
page_detections.append({
'text': text,
'confidence': confidence,
'bbox': bbox,
'page': i + 1
})
# Include text with lower confidence for signatures
if confidence > 0.2: # Lower threshold
page_text.append(text)
page_content = "\n".join(page_text)
if page_content.strip():
all_text.append(f"=== Page {i+1} ===\n{page_content}")
all_detections.extend(page_detections)
logger.debug(f"Page {i+1}: Extracted {len(page_text)} text elements")
full_text = "\n\n".join(all_text)
logger.info(f"Total text extracted: {len(full_text)} characters")
# Try to extract signatures from detections
signatures = self._extract_signatures_from_detections(all_detections)
if signatures:
logger.info(f"Found {len(signatures)} potential signatures")
signature_section = "\n\n=== DETECTED SIGNATURES ===\n" + "\n".join(signatures)
full_text += signature_section
return full_text
except Exception as e:
logger.error(f"Error extracting text from {pdf_path}: {e}")
return ""
def _extract_signatures_from_detections(self, detections: List[Dict]) -> List[str]:
"""
Extract signature information from OCR detections
Looks for text near 'DocuSigned by' or signature-related keywords
Args:
detections: List of OCR detections with bounding boxes
Returns:
List of potential signature names
"""
signatures = []
signature_keywords = [
'docusigned by', 'signed by', 'signature', 'signer',
'cto', 'coo', 'ceo', 'director', 'manager', 'approver',
'vicebio', 'client'
]
# Find detections that might be signature-related
for i, detection in enumerate(detections):
text_lower = detection['text'].lower()
# Check if this detection contains signature keywords
is_signature_context = any(keyword in text_lower for keyword in signature_keywords)
if is_signature_context:
# Look at nearby detections (within same page and close proximity)
page = detection['page']
bbox = detection['bbox']
# Calculate rough vertical position
y_pos = (bbox[0][1] + bbox[2][1]) / 2
# Look for nearby text that could be names
nearby_texts = []
for j, other in enumerate(detections):
if other['page'] == page and abs(j - i) < 10: # Within 10 detections
other_y = (other['bbox'][0][1] + other['bbox'][2][1]) / 2
# Within reasonable vertical distance
if abs(other_y - y_pos) < 150:
# Check if it looks like a name (mixed case, reasonable length)
other_text = other['text'].strip()
if (len(other_text) > 3 and
any(c.isupper() for c in other_text) and
any(c.islower() for c in other_text)):
nearby_texts.append(other_text)
# Add found signatures
for name in nearby_texts:
if name not in signatures and len(name) > 3:
signatures.append(f"Signature detected near '{detection['text']}': {name}")
return signatures
def analyze_document_with_llm(
self,
text: str,
email_date: str,
email_subject: str
) -> Dict[str, Any]:
"""
Analyze extracted text using LLM to extract structured information
Args:
text: Extracted OCR text
email_date: Date email was received
email_subject: Email subject line
Returns:
Dictionary with analyzed information
"""
logger.info("Analyzing document with LLM")
# Prepare prompt for LLM
prompt = f"""You are analyzing a PDF document that was received via email. Extract the following information:
EMAIL CONTEXT:
- Email Subject: {email_subject}
- Email Received Date: {email_date}
DOCUMENT TEXT (from OCR):
{text[:20000]} # Increased limit to capture more signature info
TASK: Extract and provide the following information in JSON format:
1. document_title: The title or name of the document (look for headers, titles at top of document)
2. document_summary: A concise 2-3 sentence summary of what this document is about
3. signatories: List of people who signed the document (look for signature blocks, "Signed by", names near signature lines, CTO/COO/CEO titles)
4. signature_dates: Dates associated with signatures if available
5. document_type: Type of document (e.g., Contract, Protocol, Report, Agreement, Certificate, etc.)
6. key_information: Any other important information (IDs, reference numbers, organizations, etc.)
CRITICAL FOR SIGNATURE EXTRACTION:
- Look in MULTIPLE places for signatory names:
1. "DocuSigned by:" followed by name
2. Role/Title lines like "CTO ViceBio", "COO ViceBio", "批准人/Approver"
3. Names appearing after roles in signature tables
4. Email addresses in signature blocks (name often before @domain.com)
5. Certificate sections listing "Signed by" with names and emails
6. Any section marked "=== DETECTED SIGNATURES ===" contains OCR-detected handwritten signatures
- For Chinese documents: Look for 角色/Role, 签名/Signature, 起草人/Author, 审核人/Reviewer, 批准人/Approver
- Match names with their roles when possible (e.g., "Jean Smal - CTO ViceBio")
- If you see a role but no name in the signature table, check the certificate/completion sections for the actual signer
- DocuSign completion certificates often have the format: "Completed by [Name] ([email]) on [date]"
IMPORTANT:
- If you find "CTO ViceBio" or "COO ViceBio" as roles, look for corresponding names in certificate sections
- The actual signer names are often in DocuSign certificate pages, not just the signature table
- Include both the structured signature table names AND certificate-listed signers
Return ONLY valid JSON in this exact format:
{{
"document_title": "title here",
"document_summary": "summary here",
"signatories": ["name1", "name2"],
"signature_dates": ["date1", "date2"],
"document_type": "type here",
"key_information": ["info1", "info2"]
}}"""
try:
response = self.openai_client.chat.completions.create(
model=MODEL_NAME,
messages=[
{
"role": "system",
"content": "You are a document analysis expert. Extract information accurately and return only valid JSON."
},
{
"role": "user",
"content": prompt
}
],
temperature=0.1,
max_tokens=1500
)
result_text = response.choices[0].message.content.strip()
# Parse JSON response
# Remove markdown code blocks if present
if result_text.startswith("```"):
result_text = result_text.split("```")[1]
if result_text.startswith("json"):
result_text = result_text[4:]
result_text = result_text.strip()
result = json.loads(result_text)
logger.info("Successfully analyzed document with LLM")
return result
except Exception as e:
logger.error(f"Error analyzing with LLM: {e}")
return {
"document_title": "Error extracting",
"document_summary": f"Error: {str(e)}",
"signatories": [],
"signature_dates": [],
"document_type": "Unknown",
"key_information": []
}
def process_document(
self,
pdf_path: str,
email_date: str,
email_subject: str,
sender: str
) -> Dict[str, Any]:
"""
Process a single document: extract text and analyze
Also attempts to find and process associated DocuSign certificate
Args:
pdf_path: Path to PDF file
email_date: Date email was received
email_subject: Email subject
sender: Email sender
Returns:
Complete analysis result
"""
logger.info(f"Processing document: {pdf_path}")
result = {
"pdf_path": pdf_path,
"filename": os.path.basename(pdf_path),
"email_date": email_date,
"email_subject": email_subject,
"sender": sender,
"processing_date": datetime.now().isoformat(),
"success": False,
"error": None
}
try:
# Check if file exists
if not os.path.exists(pdf_path):
result["error"] = "File not found"
return result
# Extract text with OCR
text = self.extract_text_from_pdf(pdf_path)
# Check for DocuSign Summary/Certificate file in multiple locations
# These often have the actual signer names
pdf_dir = os.path.dirname(pdf_path)
# Try multiple possible locations for Summary files
summary_locations = [
os.path.join(pdf_dir, "Summary.pdf"), # Same directory
os.path.join(pdf_dir, "summary", "Summary.pdf"), # summary subdirectory
]
# Also look for numbered Summary files that might correspond to this document
# Extract any numbers from the current filename to find matching summary
import re
filename = os.path.basename(pdf_path)
summary_found = False
for summary_path in summary_locations:
if os.path.exists(summary_path) and summary_path != pdf_path:
logger.info(f"Found DocuSign Summary at: {summary_path}")
try:
summary_text = self.extract_text_from_pdf(summary_path, max_pages=5)
if summary_text:
text += "\n\n=== DOCUSIGN CERTIFICATE ===\n" + summary_text
logger.info("Added DocuSign certificate information to text")
summary_found = True
break
except Exception as e:
logger.warning(f"Could not process Summary at {summary_path}: {e}")
if not summary_found:
logger.debug("No Summary.pdf found for additional signature extraction")
if not text or len(text.strip()) < 50:
result["error"] = "Insufficient text extracted from PDF"
return result
result["extracted_text_length"] = len(text)
# Analyze with LLM
analysis = self.analyze_document_with_llm(text, email_date, email_subject)
# Add analysis results to result
result.update(analysis)
result["success"] = True
logger.info(f"Successfully processed: {pdf_path}")
except Exception as e:
logger.error(f"Error processing document {pdf_path}: {e}")
result["error"] = str(e)
return result
def load_download_register(self, register_path: str) -> List[Dict[str, str]]:
"""
Load the download register CSV
Args:
register_path: Path to download_register.csv
Returns:
List of document records
"""
logger.info(f"Loading download register from: {register_path}")
documents = []
try:
with open(register_path, 'r', encoding='utf-8') as f:
reader = csv.DictReader(f)
for row in reader:
documents.append(row)
logger.info(f"Loaded {len(documents)} documents from register")
except Exception as e:
logger.error(f"Error loading register: {e}")
return documents
def process_documents_from_register(
self,
register_path: str,
limit: Optional[int] = None
) -> List[Dict[str, Any]]:
"""
Process documents listed in the download register
Automatically skips Summary files (DocuSign certificates)
Args:
register_path: Path to download_register.csv
limit: Optional limit on number of documents to process
Returns:
List of analysis results
"""
# Load register
documents = self.load_download_register(register_path)
# Filter out Summary files - these are DocuSign certificates, not documents to analyze
original_count = len(documents)
documents = [doc for doc in documents if not doc.get('filename', '').startswith('Summary')]
filtered_count = original_count - len(documents)
if filtered_count > 0:
logger.info(f"Filtered out {filtered_count} Summary certificate files")
logger.info(f"Processing {len(documents)} actual documents")
if limit:
documents = documents[:limit]
logger.info(f"Processing limited to {limit} documents")
results = []
for i, doc in enumerate(documents, 1):
logger.info(f"\n{'='*80}")
logger.info(f"Processing document {i}/{len(documents)}")
logger.info(f"{'='*80}")
result = self.process_document(
pdf_path=doc['filepath'],
email_date=doc['received_date'],
email_subject=doc['subject'],
sender=doc['sender']
)
results.append(result)
# Print summary
if result['success']:
print(f"\n✓ Successfully processed: {result['filename']}")
print(f" Title: {result.get('document_title', 'N/A')}")
print(f" Type: {result.get('document_type', 'N/A')}")
print(f" Signatories: {', '.join(result.get('signatories', []))}")
else:
print(f"\n✗ Failed to process: {result['filename']}")
print(f" Error: {result.get('error', 'Unknown')}")
return results
def save_results(self, results: List[Dict[str, Any]], output_name: str = "analysis_results"):
"""
Save analysis results to CSV and JSON
Args:
results: List of analysis results
output_name: Base name for output files
"""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
# Save as JSON (full details)
json_path = self.output_dir / f"{timestamp}_{output_name}.json"
with open(json_path, 'w', encoding='utf-8') as f:
json.dump(results, f, indent=2, ensure_ascii=False)
logger.info(f"Saved full results to: {json_path}")
# Save as CSV (flattened)
csv_path = self.output_dir / f"{timestamp}_{output_name}.csv"
if results:
fieldnames = [
'filename', 'email_date', 'email_subject', 'sender',
'document_title', 'document_type', 'document_summary',
'signatories', 'signature_dates', 'key_information',
'success', 'error', 'processing_date'
]
with open(csv_path, 'w', encoding='utf-8', newline='') as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
for result in results:
row = {
'filename': result.get('filename', ''),
'email_date': result.get('email_date', ''),
'email_subject': result.get('email_subject', ''),
'sender': result.get('sender', ''),
'document_title': result.get('document_title', ''),
'document_type': result.get('document_type', ''),
'document_summary': result.get('document_summary', ''),
'signatories': ', '.join(result.get('signatories', [])),
'signature_dates': ', '.join(result.get('signature_dates', [])),
'key_information': '; '.join(result.get('key_information', [])),
'success': result.get('success', False),
'error': result.get('error', ''),
'processing_date': result.get('processing_date', '')
}
writer.writerow(row)
logger.info(f"Saved CSV results to: {csv_path}")
print(f"\n{'='*80}")
print(f"Results saved:")
print(f" JSON: {json_path}")
print(f" CSV: {csv_path}")
print(f"{'='*80}")
Parameters
| Name | Type | Default | Kind |
|---|---|---|---|
bases |
- | - |
Parameter Details
bases: Parameter of type
Return Value
Returns unspecified type
Class Interface
Methods
__init__(self, output_dir)
Purpose: Initialize the document analyzer
Parameters:
output_dir: Type: str
Returns: None
extract_text_from_pdf(self, pdf_path, max_pages) -> str
Purpose: Extract text from PDF using OCR with enhanced signature detection Args: pdf_path: Path to PDF file max_pages: Maximum number of pages to process Returns: Extracted text content
Parameters:
pdf_path: Type: strmax_pages: Type: int
Returns: Returns str
_extract_signatures_from_detections(self, detections) -> List[str]
Purpose: Extract signature information from OCR detections Looks for text near 'DocuSigned by' or signature-related keywords Args: detections: List of OCR detections with bounding boxes Returns: List of potential signature names
Parameters:
detections: Type: List[Dict]
Returns: Returns List[str]
analyze_document_with_llm(self, text, email_date, email_subject) -> Dict[str, Any]
Purpose: Analyze extracted text using LLM to extract structured information Args: text: Extracted OCR text email_date: Date email was received email_subject: Email subject line Returns: Dictionary with analyzed information
Parameters:
text: Type: stremail_date: Type: stremail_subject: Type: str
Returns: Returns Dict[str, Any]
process_document(self, pdf_path, email_date, email_subject, sender) -> Dict[str, Any]
Purpose: Process a single document: extract text and analyze Also attempts to find and process associated DocuSign certificate Args: pdf_path: Path to PDF file email_date: Date email was received email_subject: Email subject sender: Email sender Returns: Complete analysis result
Parameters:
pdf_path: Type: stremail_date: Type: stremail_subject: Type: strsender: Type: str
Returns: Returns Dict[str, Any]
load_download_register(self, register_path) -> List[Dict[str, str]]
Purpose: Load the download register CSV Args: register_path: Path to download_register.csv Returns: List of document records
Parameters:
register_path: Type: str
Returns: Returns List[Dict[str, str]]
process_documents_from_register(self, register_path, limit) -> List[Dict[str, Any]]
Purpose: Process documents listed in the download register Automatically skips Summary files (DocuSign certificates) Args: register_path: Path to download_register.csv limit: Optional limit on number of documents to process Returns: List of analysis results
Parameters:
register_path: Type: strlimit: Type: Optional[int]
Returns: Returns List[Dict[str, Any]]
save_results(self, results, output_name)
Purpose: Save analysis results to CSV and JSON Args: results: List of analysis results output_name: Base name for output files
Parameters:
results: Type: List[Dict[str, Any]]output_name: Type: str
Returns: None
Required Imports
import os
import sys
import csv
import json
from pathlib import Path
Usage Example
# Example usage:
# result = DocumentAnalyzer(bases)
Tags
Similar Components
AI-powered semantic similarity - components with related functionality:
-
function main_v11 65.4% similar
-
class DocumentProcessor_v3 63.6% similar
-
class DocumentProcessor_v1 62.8% similar
-
class DocumentProcessor_v2 62.6% similar
-
class TestDocumentProcessor 60.4% similar