🔍 Code Extractor

class DocumentAnalyzer

Maturity: 25

Analyze PDF documents using OCR and LLM

File:
/tf/active/vicechatdev/mailsearch/document_analyzer.py
Lines:
51 - 560
Complexity:
moderate

Purpose

Analyze PDF documents using OCR and LLM

Source Code

class DocumentAnalyzer:
    """Analyze PDF documents using OCR and LLM"""
    
    def __init__(self, output_dir: str = "./output"):
        """Initialize the document analyzer"""
        self.output_dir = Path(output_dir)
        self.output_dir.mkdir(exist_ok=True)
        
        # Initialize OpenAI client
        if not OPENAI_AVAILABLE:
            raise RuntimeError("OpenAI library is required")
        
        self.openai_client = OpenAI(api_key=OPENAI_API_KEY)
        logger.info(f"Initialized OpenAI client with model: {MODEL_NAME}")
        
        # Initialize EasyOCR reader (supports multiple languages including Chinese)
        if OCR_AVAILABLE:
            logger.info("Initializing EasyOCR reader...")
            self.ocr_reader = easyocr.Reader(['en', 'ch_sim'], gpu=False)
            logger.info("EasyOCR reader initialized")
        else:
            raise RuntimeError("OCR libraries are required")
    
    def extract_text_from_pdf(self, pdf_path: str, max_pages: int = 20) -> str:
        """
        Extract text from PDF using OCR with enhanced signature detection
        
        Args:
            pdf_path: Path to PDF file
            max_pages: Maximum number of pages to process
            
        Returns:
            Extracted text content
        """
        logger.info(f"Extracting text from: {pdf_path}")
        
        try:
            # Convert PDF to images at higher DPI for better signature capture
            images = convert_from_path(
                pdf_path, 
                dpi=400,  # Increased DPI for better signature recognition
                first_page=1,
                last_page=max_pages
            )
            logger.info(f"Converted {len(images)} pages to images")
            
            all_text = []
            all_detections = []  # Store all detections with positions for signature analysis
            
            for i, image in enumerate(images):
                logger.debug(f"Processing page {i+1} with OCR")
                
                # Convert PIL Image to numpy array for EasyOCR
                image_np = np.array(image)
                
                # Use EasyOCR with lower confidence threshold to catch handwritten signatures
                result = self.ocr_reader.readtext(
                    image_np,
                    paragraph=False,  # Get individual text elements
                    text_threshold=0.6,  # Lower threshold for handwritten text
                    low_text=0.3  # Even lower for very faint text
                )
                
                # Extract text from OCR results
                page_text = []
                page_detections = []
                
                for detection in result:
                    bbox = detection[0]  # Bounding box coordinates
                    text = detection[1]  # Detected text
                    confidence = detection[2]  # Confidence score
                    
                    # Store detection with metadata
                    page_detections.append({
                        'text': text,
                        'confidence': confidence,
                        'bbox': bbox,
                        'page': i + 1
                    })
                    
                    # Include text with lower confidence for signatures
                    if confidence > 0.2:  # Lower threshold
                        page_text.append(text)
                
                page_content = "\n".join(page_text)
                
                if page_content.strip():
                    all_text.append(f"=== Page {i+1} ===\n{page_content}")
                
                all_detections.extend(page_detections)
                logger.debug(f"Page {i+1}: Extracted {len(page_text)} text elements")
            
            full_text = "\n\n".join(all_text)
            logger.info(f"Total text extracted: {len(full_text)} characters")
            
            # Try to extract signatures from detections
            signatures = self._extract_signatures_from_detections(all_detections)
            if signatures:
                logger.info(f"Found {len(signatures)} potential signatures")
                signature_section = "\n\n=== DETECTED SIGNATURES ===\n" + "\n".join(signatures)
                full_text += signature_section
            
            return full_text
            
        except Exception as e:
            logger.error(f"Error extracting text from {pdf_path}: {e}")
            return ""
    
    def _extract_signatures_from_detections(self, detections: List[Dict]) -> List[str]:
        """
        Extract signature information from OCR detections
        Looks for text near 'DocuSigned by' or signature-related keywords
        
        Args:
            detections: List of OCR detections with bounding boxes
            
        Returns:
            List of potential signature names
        """
        signatures = []
        signature_keywords = [
            'docusigned by', 'signed by', 'signature', 'signer',
            'cto', 'coo', 'ceo', 'director', 'manager', 'approver',
            'vicebio', 'client'
        ]
        
        # Find detections that might be signature-related
        for i, detection in enumerate(detections):
            text_lower = detection['text'].lower()
            
            # Check if this detection contains signature keywords
            is_signature_context = any(keyword in text_lower for keyword in signature_keywords)
            
            if is_signature_context:
                # Look at nearby detections (within same page and close proximity)
                page = detection['page']
                bbox = detection['bbox']
                
                # Calculate rough vertical position
                y_pos = (bbox[0][1] + bbox[2][1]) / 2
                
                # Look for nearby text that could be names
                nearby_texts = []
                for j, other in enumerate(detections):
                    if other['page'] == page and abs(j - i) < 10:  # Within 10 detections
                        other_y = (other['bbox'][0][1] + other['bbox'][2][1]) / 2
                        
                        # Within reasonable vertical distance
                        if abs(other_y - y_pos) < 150:
                            # Check if it looks like a name (mixed case, reasonable length)
                            other_text = other['text'].strip()
                            if (len(other_text) > 3 and 
                                any(c.isupper() for c in other_text) and
                                any(c.islower() for c in other_text)):
                                nearby_texts.append(other_text)
                
                # Add found signatures
                for name in nearby_texts:
                    if name not in signatures and len(name) > 3:
                        signatures.append(f"Signature detected near '{detection['text']}': {name}")
        
        return signatures
    
    def analyze_document_with_llm(
        self, 
        text: str, 
        email_date: str,
        email_subject: str
    ) -> Dict[str, Any]:
        """
        Analyze extracted text using LLM to extract structured information
        
        Args:
            text: Extracted OCR text
            email_date: Date email was received
            email_subject: Email subject line
            
        Returns:
            Dictionary with analyzed information
        """
        logger.info("Analyzing document with LLM")
        
        # Prepare prompt for LLM
        prompt = f"""You are analyzing a PDF document that was received via email. Extract the following information:

EMAIL CONTEXT:
- Email Subject: {email_subject}
- Email Received Date: {email_date}

DOCUMENT TEXT (from OCR):
{text[:20000]}  # Increased limit to capture more signature info

TASK: Extract and provide the following information in JSON format:
1. document_title: The title or name of the document (look for headers, titles at top of document)
2. document_summary: A concise 2-3 sentence summary of what this document is about
3. signatories: List of people who signed the document (look for signature blocks, "Signed by", names near signature lines, CTO/COO/CEO titles)
4. signature_dates: Dates associated with signatures if available
5. document_type: Type of document (e.g., Contract, Protocol, Report, Agreement, Certificate, etc.)
6. key_information: Any other important information (IDs, reference numbers, organizations, etc.)

CRITICAL FOR SIGNATURE EXTRACTION:
- Look in MULTIPLE places for signatory names:
  1. "DocuSigned by:" followed by name
  2. Role/Title lines like "CTO ViceBio", "COO ViceBio", "批准人/Approver"
  3. Names appearing after roles in signature tables
  4. Email addresses in signature blocks (name often before @domain.com)
  5. Certificate sections listing "Signed by" with names and emails
  6. Any section marked "=== DETECTED SIGNATURES ===" contains OCR-detected handwritten signatures
  
- For Chinese documents: Look for 角色/Role, 签名/Signature, 起草人/Author, 审核人/Reviewer, 批准人/Approver
- Match names with their roles when possible (e.g., "Jean Smal - CTO ViceBio")
- If you see a role but no name in the signature table, check the certificate/completion sections for the actual signer
- DocuSign completion certificates often have the format: "Completed by [Name] ([email]) on [date]"

IMPORTANT: 
- If you find "CTO ViceBio" or "COO ViceBio" as roles, look for corresponding names in certificate sections
- The actual signer names are often in DocuSign certificate pages, not just the signature table
- Include both the structured signature table names AND certificate-listed signers

Return ONLY valid JSON in this exact format:
{{
  "document_title": "title here",
  "document_summary": "summary here",
  "signatories": ["name1", "name2"],
  "signature_dates": ["date1", "date2"],
  "document_type": "type here",
  "key_information": ["info1", "info2"]
}}"""

        try:
            response = self.openai_client.chat.completions.create(
                model=MODEL_NAME,
                messages=[
                    {
                        "role": "system",
                        "content": "You are a document analysis expert. Extract information accurately and return only valid JSON."
                    },
                    {
                        "role": "user",
                        "content": prompt
                    }
                ],
                temperature=0.1,
                max_tokens=1500
            )
            
            result_text = response.choices[0].message.content.strip()
            
            # Parse JSON response
            # Remove markdown code blocks if present
            if result_text.startswith("```"):
                result_text = result_text.split("```")[1]
                if result_text.startswith("json"):
                    result_text = result_text[4:]
                result_text = result_text.strip()
            
            result = json.loads(result_text)
            logger.info("Successfully analyzed document with LLM")
            
            return result
            
        except Exception as e:
            logger.error(f"Error analyzing with LLM: {e}")
            return {
                "document_title": "Error extracting",
                "document_summary": f"Error: {str(e)}",
                "signatories": [],
                "signature_dates": [],
                "document_type": "Unknown",
                "key_information": []
            }
    
    def process_document(
        self,
        pdf_path: str,
        email_date: str,
        email_subject: str,
        sender: str
    ) -> Dict[str, Any]:
        """
        Process a single document: extract text and analyze
        Also attempts to find and process associated DocuSign certificate
        
        Args:
            pdf_path: Path to PDF file
            email_date: Date email was received
            email_subject: Email subject
            sender: Email sender
            
        Returns:
            Complete analysis result
        """
        logger.info(f"Processing document: {pdf_path}")
        
        result = {
            "pdf_path": pdf_path,
            "filename": os.path.basename(pdf_path),
            "email_date": email_date,
            "email_subject": email_subject,
            "sender": sender,
            "processing_date": datetime.now().isoformat(),
            "success": False,
            "error": None
        }
        
        try:
            # Check if file exists
            if not os.path.exists(pdf_path):
                result["error"] = "File not found"
                return result
            
            # Extract text with OCR
            text = self.extract_text_from_pdf(pdf_path)
            
            # Check for DocuSign Summary/Certificate file in multiple locations
            # These often have the actual signer names
            pdf_dir = os.path.dirname(pdf_path)
            
            # Try multiple possible locations for Summary files
            summary_locations = [
                os.path.join(pdf_dir, "Summary.pdf"),  # Same directory
                os.path.join(pdf_dir, "summary", "Summary.pdf"),  # summary subdirectory
            ]
            
            # Also look for numbered Summary files that might correspond to this document
            # Extract any numbers from the current filename to find matching summary
            import re
            filename = os.path.basename(pdf_path)
            
            summary_found = False
            for summary_path in summary_locations:
                if os.path.exists(summary_path) and summary_path != pdf_path:
                    logger.info(f"Found DocuSign Summary at: {summary_path}")
                    try:
                        summary_text = self.extract_text_from_pdf(summary_path, max_pages=5)
                        if summary_text:
                            text += "\n\n=== DOCUSIGN CERTIFICATE ===\n" + summary_text
                            logger.info("Added DocuSign certificate information to text")
                            summary_found = True
                            break
                    except Exception as e:
                        logger.warning(f"Could not process Summary at {summary_path}: {e}")
            
            if not summary_found:
                logger.debug("No Summary.pdf found for additional signature extraction")
            
            if not text or len(text.strip()) < 50:
                result["error"] = "Insufficient text extracted from PDF"
                return result
            
            result["extracted_text_length"] = len(text)
            
            # Analyze with LLM
            analysis = self.analyze_document_with_llm(text, email_date, email_subject)
            
            # Add analysis results to result
            result.update(analysis)
            result["success"] = True
            
            logger.info(f"Successfully processed: {pdf_path}")
            
        except Exception as e:
            logger.error(f"Error processing document {pdf_path}: {e}")
            result["error"] = str(e)
        
        return result
    
    def load_download_register(self, register_path: str) -> List[Dict[str, str]]:
        """
        Load the download register CSV
        
        Args:
            register_path: Path to download_register.csv
            
        Returns:
            List of document records
        """
        logger.info(f"Loading download register from: {register_path}")
        
        documents = []
        
        try:
            with open(register_path, 'r', encoding='utf-8') as f:
                reader = csv.DictReader(f)
                for row in reader:
                    documents.append(row)
            
            logger.info(f"Loaded {len(documents)} documents from register")
            
        except Exception as e:
            logger.error(f"Error loading register: {e}")
        
        return documents
    
    def process_documents_from_register(
        self,
        register_path: str,
        limit: Optional[int] = None
    ) -> List[Dict[str, Any]]:
        """
        Process documents listed in the download register
        Automatically skips Summary files (DocuSign certificates)
        
        Args:
            register_path: Path to download_register.csv
            limit: Optional limit on number of documents to process
            
        Returns:
            List of analysis results
        """
        # Load register
        documents = self.load_download_register(register_path)
        
        # Filter out Summary files - these are DocuSign certificates, not documents to analyze
        original_count = len(documents)
        documents = [doc for doc in documents if not doc.get('filename', '').startswith('Summary')]
        filtered_count = original_count - len(documents)
        
        if filtered_count > 0:
            logger.info(f"Filtered out {filtered_count} Summary certificate files")
            logger.info(f"Processing {len(documents)} actual documents")
        
        if limit:
            documents = documents[:limit]
            logger.info(f"Processing limited to {limit} documents")
        
        results = []
        
        for i, doc in enumerate(documents, 1):
            logger.info(f"\n{'='*80}")
            logger.info(f"Processing document {i}/{len(documents)}")
            logger.info(f"{'='*80}")
            
            result = self.process_document(
                pdf_path=doc['filepath'],
                email_date=doc['received_date'],
                email_subject=doc['subject'],
                sender=doc['sender']
            )
            
            results.append(result)
            
            # Print summary
            if result['success']:
                print(f"\n✓ Successfully processed: {result['filename']}")
                print(f"  Title: {result.get('document_title', 'N/A')}")
                print(f"  Type: {result.get('document_type', 'N/A')}")
                print(f"  Signatories: {', '.join(result.get('signatories', []))}")
            else:
                print(f"\n✗ Failed to process: {result['filename']}")
                print(f"  Error: {result.get('error', 'Unknown')}")
        
        return results
    
    def save_results(self, results: List[Dict[str, Any]], output_name: str = "analysis_results"):
        """
        Save analysis results to CSV and JSON
        
        Args:
            results: List of analysis results
            output_name: Base name for output files
        """
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        
        # Save as JSON (full details)
        json_path = self.output_dir / f"{timestamp}_{output_name}.json"
        with open(json_path, 'w', encoding='utf-8') as f:
            json.dump(results, f, indent=2, ensure_ascii=False)
        logger.info(f"Saved full results to: {json_path}")
        
        # Save as CSV (flattened)
        csv_path = self.output_dir / f"{timestamp}_{output_name}.csv"
        
        if results:
            fieldnames = [
                'filename', 'email_date', 'email_subject', 'sender',
                'document_title', 'document_type', 'document_summary',
                'signatories', 'signature_dates', 'key_information',
                'success', 'error', 'processing_date'
            ]
            
            with open(csv_path, 'w', encoding='utf-8', newline='') as f:
                writer = csv.DictWriter(f, fieldnames=fieldnames)
                writer.writeheader()
                
                for result in results:
                    row = {
                        'filename': result.get('filename', ''),
                        'email_date': result.get('email_date', ''),
                        'email_subject': result.get('email_subject', ''),
                        'sender': result.get('sender', ''),
                        'document_title': result.get('document_title', ''),
                        'document_type': result.get('document_type', ''),
                        'document_summary': result.get('document_summary', ''),
                        'signatories': ', '.join(result.get('signatories', [])),
                        'signature_dates': ', '.join(result.get('signature_dates', [])),
                        'key_information': '; '.join(result.get('key_information', [])),
                        'success': result.get('success', False),
                        'error': result.get('error', ''),
                        'processing_date': result.get('processing_date', '')
                    }
                    writer.writerow(row)
            
            logger.info(f"Saved CSV results to: {csv_path}")
        
        print(f"\n{'='*80}")
        print(f"Results saved:")
        print(f"  JSON: {json_path}")
        print(f"  CSV: {csv_path}")
        print(f"{'='*80}")

Parameters

Name Type Default Kind
bases - -

Parameter Details

bases: Parameter of type

Return Value

Returns unspecified type

Class Interface

Methods

__init__(self, output_dir)

Purpose: Initialize the document analyzer

Parameters:

  • output_dir: Type: str

Returns: None

extract_text_from_pdf(self, pdf_path, max_pages) -> str

Purpose: Extract text from PDF using OCR with enhanced signature detection Args: pdf_path: Path to PDF file max_pages: Maximum number of pages to process Returns: Extracted text content

Parameters:

  • pdf_path: Type: str
  • max_pages: Type: int

Returns: Returns str

_extract_signatures_from_detections(self, detections) -> List[str]

Purpose: Extract signature information from OCR detections Looks for text near 'DocuSigned by' or signature-related keywords Args: detections: List of OCR detections with bounding boxes Returns: List of potential signature names

Parameters:

  • detections: Type: List[Dict]

Returns: Returns List[str]

analyze_document_with_llm(self, text, email_date, email_subject) -> Dict[str, Any]

Purpose: Analyze extracted text using LLM to extract structured information Args: text: Extracted OCR text email_date: Date email was received email_subject: Email subject line Returns: Dictionary with analyzed information

Parameters:

  • text: Type: str
  • email_date: Type: str
  • email_subject: Type: str

Returns: Returns Dict[str, Any]

process_document(self, pdf_path, email_date, email_subject, sender) -> Dict[str, Any]

Purpose: Process a single document: extract text and analyze Also attempts to find and process associated DocuSign certificate Args: pdf_path: Path to PDF file email_date: Date email was received email_subject: Email subject sender: Email sender Returns: Complete analysis result

Parameters:

  • pdf_path: Type: str
  • email_date: Type: str
  • email_subject: Type: str
  • sender: Type: str

Returns: Returns Dict[str, Any]

load_download_register(self, register_path) -> List[Dict[str, str]]

Purpose: Load the download register CSV Args: register_path: Path to download_register.csv Returns: List of document records

Parameters:

  • register_path: Type: str

Returns: Returns List[Dict[str, str]]

process_documents_from_register(self, register_path, limit) -> List[Dict[str, Any]]

Purpose: Process documents listed in the download register Automatically skips Summary files (DocuSign certificates) Args: register_path: Path to download_register.csv limit: Optional limit on number of documents to process Returns: List of analysis results

Parameters:

  • register_path: Type: str
  • limit: Type: Optional[int]

Returns: Returns List[Dict[str, Any]]

save_results(self, results, output_name)

Purpose: Save analysis results to CSV and JSON Args: results: List of analysis results output_name: Base name for output files

Parameters:

  • results: Type: List[Dict[str, Any]]
  • output_name: Type: str

Returns: None

Required Imports

import os
import sys
import csv
import json
from pathlib import Path

Usage Example

# Example usage:
# result = DocumentAnalyzer(bases)

Similar Components

AI-powered semantic similarity - components with related functionality:

  • function main_v11 65.4% similar

    Command-line interface function that orchestrates PDF document analysis using OCR and LLM processing, with configurable input/output paths and processing limits.

    From: /tf/active/vicechatdev/mailsearch/document_analyzer.py
  • class DocumentProcessor_v3 63.6% similar

    A comprehensive PDF document processor that handles text extraction, OCR (Optical Character Recognition), layout analysis, table detection, and metadata extraction from PDF files.

    From: /tf/active/vicechatdev/invoice_extraction/core/document_processor.py
  • class DocumentProcessor_v1 62.8% similar

    A document processing class that extracts text from PDF and Word documents using llmsherpa as the primary method with fallback support for PyPDF2, pdfplumber, and python-docx.

    From: /tf/active/vicechatdev/contract_validity_analyzer/utils/document_processor_new.py
  • class DocumentProcessor_v2 62.6% similar

    A document processing class that extracts text from PDF and Word documents using llmsherpa as the primary method with fallback support for PyPDF2, pdfplumber, and python-docx.

    From: /tf/active/vicechatdev/contract_validity_analyzer/utils/document_processor_old.py
  • class TestDocumentProcessor 60.4% similar

    A test subclass of DocumentProcessor that simulates llmsherpa PDF processing failures and triggers OCR fallback mechanisms for testing purposes.

    From: /tf/active/vicechatdev/contract_validity_analyzer/test_ocr_fallback.py
← Back to Browse