🔍 Code Extractor

class DocumentDownloader

Maturity: 46

A client class for downloading documents (primarily PDFs) from various sources, managing download caching, respecting rate limits per domain, and processing documents using llmsherpa for content extraction.

File:
/tf/active/vicechatdev/QA_updater/data_access/document_downloader.py
Lines:
14 - 312
Complexity:
complex

Purpose

DocumentDownloader provides a comprehensive solution for fetching and processing academic and research documents from multiple sources (arXiv, PubMed, Semantic Scholar, patent databases, clinical trials). It handles rate limiting for different domains, caches downloaded files to avoid redundant downloads, and uses llmsherpa's LayoutPDFReader to extract structured content including text chunks, metadata, and bounding boxes. The class is designed for research workflows that need to download and process large numbers of documents while respecting API rate limits and efficiently managing local storage.

Source Code

class DocumentDownloader:
    """Client for downloading documents from various sources and processing them with llmsherpa."""
    
    def __init__(self, config: ConfigParser, download_dir: str = None, cache_ttl: int = 86400):
        """
        Initialize document downloader.
        
        Args:
            config: Configuration object containing path settings.
            download_dir: Directory to store downloaded documents (default: system temp directory)
            cache_ttl: How long to cache downloaded files in seconds (default: 24 hours)
        """
        self.logger = logging.getLogger(__name__)
        self.config = config
        self.download_dir = download_dir or tempfile.gettempdir()
        self.cache_ttl = cache_ttl
        
        # Create download directory if it doesn't exist
        os.makedirs(self.download_dir, exist_ok=True)
        
        # Initialize llmsherpa PDF reader
        llmsherpa_api_url = "http://llmsherpa:5001/api/parseDocument?renderFormat=all&useNewIndentParser=yes"
        self.pdf_reader = llmsherpa.readers.LayoutPDFReader(llmsherpa_api_url)
        
        # Track rate limits per domain
        self.domain_last_request = {}
        self.domain_rate_limits = {
            "arxiv.org": 3.0,            # 1 request per 3 seconds
            "pubmed.ncbi.nlm.nih.gov": 0.33,  # 3 requests per second
            "api.semanticscholar.org": 0.33,  # 3 requests per second
            "clinicaltrials.gov": 0.5,   # 2 requests per second
            "patents.google.com": 1.0,   # 1 request per second
            "worldwide.espacenet.com": 1.0,  # 1 request per second
            "uspto.gov": 0.5,           # 2 requests per second
            "default": 1.0              # Default: 1 request per second
        }

        self.logger.info("DocumentDownloader initialized.")

    def _get_cache_path(self, url: str) -> Path:
        """Generate a unique cache path for a URL."""
        # Create a hash of the URL to use as filename
        url_hash = hashlib.md5(url.encode()).hexdigest()
        
        # Get file extension from URL or default to .pdf
        path = urlparse(url).path
        ext = os.path.splitext(path)[1].lower() or ".pdf"
        
        # Ensure extension starts with period
        if not ext.startswith('.'):
            ext = '.' + ext
            
        return Path(self.download_dir) / f"{url_hash}{ext}"

    def _respect_rate_limit(self, url: str):
        """Respect rate limits for different domains."""
        domain = urlparse(url).netloc
        
        # Get rate limit for this domain
        rate_limit = self.domain_rate_limits.get(domain, self.domain_rate_limits["default"])
        
        # Check if we need to wait
        current_time = time.time()
        if domain in self.domain_last_request:
            time_since_last = current_time - self.domain_last_request[domain]
            if time_since_last < rate_limit:
                wait_time = rate_limit - time_since_last
                time.sleep(wait_time)
                
        # Update last request time
        self.domain_last_request[domain] = time.time()

    def download_document(self, url: str, headers: Dict[str, str] = None) -> Tuple[Optional[str], Optional[str]]:
        """
        Download document from URL, respecting rate limits and using cache.
        
        Args:
            url: URL to download
            headers: Additional headers for the request
            
        Returns:
            Tuple of (local_path, error_message)
        """
        cache_path = self._get_cache_path(url)
        
        # Check if we have a valid cached version
        if cache_path.exists():
            cache_age = time.time() - os.path.getmtime(cache_path)
            if cache_age < self.cache_ttl:
                self.logger.info(f"Using cached version of {url} at {cache_path}")
                return str(cache_path), None
                
        # Respect rate limits
        self._respect_rate_limit(url)
        
        # Prepare headers
        request_headers = {
            "User-Agent": "Mozilla/5.0 (compatible; Research/1.0; +https://example.org/research-bot)"
        }
        
        if headers:
            request_headers.update(headers)
            
        # Attempt download
        try:
            self.logger.info(f"Downloading {url} to {cache_path}")
            response = requests.get(url, headers=request_headers, stream=True, timeout=30)
            response.raise_for_status()
            
            # Check content type
            content_type = response.headers.get('Content-Type', '').lower()
            if 'pdf' not in content_type and 'application/octet-stream' not in content_type:
                self.logger.warning(f"Warning: {url} content type is {content_type}, which may not be a PDF")
            
            # Save file
            with open(cache_path, 'wb') as f:
                for chunk in response.iter_content(chunk_size=8192):
                    f.write(chunk)
                    
            return str(cache_path), None
                
        except requests.exceptions.RequestException as e:
            error_msg = f"Error downloading {url}: {str(e)}"
            self.logger.error(error_msg)
            return None, error_msg

    def process_pdf(self, path: str) -> Dict[str, Any]:
        """
        Process a PDF file using llmsherpa.
        
        Args:
            path: Path to PDF file
            
        Returns:
            Dictionary with extracted content
        """
        try:
            # Process PDF with llmsherpa
            document = self.pdf_reader.read_pdf(path)
            
            # Extract chunks
            chunks = []
            for i, chunk in enumerate(document.chunks()):
                chunks.append({
                    "chunk_id": i,
                    "text": chunk.text,
                    "page_num": chunk.page_num,
                    "bbox": chunk.bbox
                })
                
            # Get metadata
            metadata = {
                "title": document.metadata.get("title", ""),
                "author": document.metadata.get("author", ""),
                "creation_date": document.metadata.get("creationDate", ""),
                "page_count": document.metadata.get("pageCount", 0)
            }
            
            return {
                "path": path,
                "metadata": metadata,
                "chunks": chunks,
                "error": None
            }
            
        except Exception as e:
            error_msg = f"Error processing PDF {path}: {str(e)}"
            self.logger.error(error_msg)
            return {
                "path": path,
                "metadata": {},
                "chunks": [],
                "error": error_msg
            }

    def download_and_process(self, url: str, headers: Dict[str, str] = None) -> Dict[str, Any]:
        """
        Download and process a document from URL.
        
        Args:
            url: URL to download
            headers: Additional headers for the request
            
        Returns:
            Dictionary with processed content
        """
        # Download document
        local_path, error = self.download_document(url, headers)
        
        if error:
            return {
                "url": url,
                "local_path": None,
                "metadata": {},
                "chunks": [],
                "error": error
            }
            
        # Process PDF
        result = self.process_pdf(local_path)
        result["url"] = url
        result["local_path"] = local_path
        
        return result

    def process_literature_results(self, results: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
        """
        Process literature results by downloading and extracting content.
        
        Args:
            results: List of literature results from API
            
        Returns:
            List of results with added content
        """
        processed_results = []
        
        for result in results:
            # Skip if no URL available
            url = None
            
            # Check different URL fields depending on source
            if result.get('source') == 'arxiv':
                url = result.get('pdf_url')
            elif result.get('source') == 'semantic_scholar':
                url = result.get('url')
            elif result.get('source') == 'pubmed':
                # PubMed results often need to construct a URL to the full text
                if result.get('doi'):
                    url = f"https://doi.org/{result['doi']}"
            elif 'url' in result:
                url = result['url']
                
            if not url:
                result['content'] = None
                result['content_error'] = "No URL available for download"
                processed_results.append(result)
                continue
                
            # Download and process
            content_result = self.download_and_process(url)
            
            # Add content to result
            if content_result.get('error'):
                result['content'] = None
                result['content_error'] = content_result['error']
            else:
                result['content'] = {
                    'local_path': content_result['local_path'],
                    'metadata': content_result['metadata'],
                    'chunks': content_result['chunks']
                }
                
            processed_results.append(result)
            
        return processed_results

    def process_patent_results(self, results: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
        """Process patent results by downloading and extracting content."""
        # Similar implementation as process_literature_results, with patent-specific URL handling
        processed_results = []
        
        for result in results:
            url = result.get('url')
            if not url:
                result['content'] = None
                result['content_error'] = "No URL available for download"
                processed_results.append(result)
                continue
            
            # Handle different patent sources with custom headers if needed
            headers = {}
            if 'uspto.gov' in url:
                headers = {
                    "User-Agent": "ResearchProject/1.0 (academic.research@example.edu)"
                }
                
            content_result = self.download_and_process(url, headers)
            
            # Add content to result
            if content_result.get('error'):
                result['content'] = None
                result['content_error'] = content_result['error']
            else:
                result['content'] = {
                    'local_path': content_result['local_path'],
                    'metadata': content_result['metadata'],
                    'chunks': content_result['chunks']
                }
                
            processed_results.append(result)
            
        return processed_results
    
    def process_clinical_trial_results(self, results: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
        """Process clinical trial results."""
        # Clinical trials typically don't have PDFs to download, but sometimes have protocols
        # This would require customized handling per source
        return results

Parameters

Name Type Default Kind
bases - -

Parameter Details

config: ConfigParser object containing path settings and configuration. Used for application-wide configuration management.

download_dir: Optional string specifying the directory path where downloaded documents will be stored. If not provided, defaults to the system's temporary directory (from tempfile.gettempdir()). The directory will be created if it doesn't exist.

cache_ttl: Optional integer specifying the cache time-to-live in seconds. Determines how long downloaded files are considered valid before re-downloading. Defaults to 86400 seconds (24 hours).

Return Value

Instantiation returns a DocumentDownloader object. Key method returns: download_document() returns a tuple of (local_path: Optional[str], error_message: Optional[str]); process_pdf() returns a dictionary with keys 'path', 'metadata', 'chunks', and 'error'; download_and_process() returns a dictionary with keys 'url', 'local_path', 'metadata', 'chunks', and 'error'; process_literature_results() and process_patent_results() return lists of dictionaries with added 'content' and 'content_error' fields.

Class Interface

Methods

__init__(self, config: ConfigParser, download_dir: str = None, cache_ttl: int = 86400)

Purpose: Initialize the DocumentDownloader with configuration, download directory, and cache settings. Sets up logging, creates download directory, initializes llmsherpa PDF reader, and configures rate limits.

Parameters:

  • config: ConfigParser object containing path settings
  • download_dir: Optional directory path for storing downloads (defaults to system temp)
  • cache_ttl: Cache time-to-live in seconds (defaults to 86400/24 hours)

Returns: None (constructor)

_get_cache_path(self, url: str) -> Path

Purpose: Generate a unique cache file path for a given URL using MD5 hashing and preserving file extension.

Parameters:

  • url: URL string to generate cache path for

Returns: Path object representing the cache file location

_respect_rate_limit(self, url: str)

Purpose: Enforce rate limiting for the domain of the given URL by sleeping if necessary. Updates last request time for the domain.

Parameters:

  • url: URL string whose domain will be rate-limited

Returns: None (side effect: may sleep to respect rate limits)

download_document(self, url: str, headers: Dict[str, str] = None) -> Tuple[Optional[str], Optional[str]]

Purpose: Download a document from URL with caching and rate limiting. Returns cached version if available and fresh, otherwise downloads new copy.

Parameters:

  • url: URL string of the document to download
  • headers: Optional dictionary of additional HTTP headers for the request

Returns: Tuple of (local_path, error_message). If successful, local_path is a string and error_message is None. If failed, local_path is None and error_message contains the error description.

process_pdf(self, path: str) -> Dict[str, Any]

Purpose: Process a PDF file using llmsherpa to extract text chunks, metadata, and structural information.

Parameters:

  • path: String path to the PDF file to process

Returns: Dictionary with keys: 'path' (input path), 'metadata' (dict with title, author, creation_date, page_count), 'chunks' (list of dicts with chunk_id, text, page_num, bbox), 'error' (None if successful, error message if failed)

download_and_process(self, url: str, headers: Dict[str, str] = None) -> Dict[str, Any]

Purpose: Combined method to download a document from URL and process it with llmsherpa in one call.

Parameters:

  • url: URL string of the document to download and process
  • headers: Optional dictionary of additional HTTP headers for the request

Returns: Dictionary with keys: 'url' (original URL), 'local_path' (downloaded file path or None), 'metadata' (extracted metadata dict), 'chunks' (list of extracted chunks), 'error' (None if successful, error message if failed)

process_literature_results(self, results: List[Dict[str, Any]]) -> List[Dict[str, Any]]

Purpose: Process a list of literature search results by downloading and extracting content from each. Handles different sources (arXiv, Semantic Scholar, PubMed) with source-specific URL extraction.

Parameters:

  • results: List of dictionaries containing literature search results with fields like 'source', 'pdf_url', 'url', 'doi'

Returns: List of dictionaries with original fields plus 'content' (dict with local_path, metadata, chunks if successful, or None if failed) and 'content_error' (error message if download/processing failed)

process_patent_results(self, results: List[Dict[str, Any]]) -> List[Dict[str, Any]]

Purpose: Process a list of patent search results by downloading and extracting content. Handles patent-specific sources with custom headers for USPTO.

Parameters:

  • results: List of dictionaries containing patent search results with 'url' field

Returns: List of dictionaries with original fields plus 'content' (dict with local_path, metadata, chunks if successful, or None if failed) and 'content_error' (error message if download/processing failed)

process_clinical_trial_results(self, results: List[Dict[str, Any]]) -> List[Dict[str, Any]]

Purpose: Process clinical trial results. Currently a placeholder that returns results unchanged, as clinical trials typically don't have PDFs to download.

Parameters:

  • results: List of dictionaries containing clinical trial search results

Returns: The input results list unchanged (no processing performed)

Attributes

Name Type Description Scope
logger logging.Logger Logger instance for the class, used to log download operations, errors, and cache hits instance
config ConfigParser Configuration object containing path settings and other configuration parameters instance
download_dir str Directory path where downloaded documents are stored and cached instance
cache_ttl int Cache time-to-live in seconds; determines how long cached files are considered valid instance
pdf_reader llmsherpa.readers.LayoutPDFReader llmsherpa PDF reader instance configured to parse documents with layout information instance
domain_last_request Dict[str, float] Dictionary mapping domain names to timestamps of last request, used for rate limiting instance
domain_rate_limits Dict[str, float] Dictionary mapping domain names to minimum seconds between requests. Includes specific limits for arxiv.org (3.0s), pubmed.ncbi.nlm.nih.gov (0.33s), api.semanticscholar.org (0.33s), clinicaltrials.gov (0.5s), patents.google.com (1.0s), worldwide.espacenet.com (1.0s), uspto.gov (0.5s), and default (1.0s) instance

Dependencies

  • os
  • time
  • requests
  • tempfile
  • typing
  • urllib.parse
  • logging
  • pathlib
  • hashlib
  • llmsherpa
  • configparser

Required Imports

import os
import time
import requests
import tempfile
from typing import List, Dict, Any, Optional, Tuple
from urllib.parse import urlparse
import logging
from pathlib import Path
import hashlib
import llmsherpa.readers
from configparser import ConfigParser

Usage Example

from configparser import ConfigParser
import logging

# Setup logging
logging.basicConfig(level=logging.INFO)

# Create config
config = ConfigParser()

# Initialize downloader
downloader = DocumentDownloader(
    config=config,
    download_dir='./downloads',
    cache_ttl=86400
)

# Download and process a single document
result = downloader.download_and_process(
    url='https://arxiv.org/pdf/2301.00001.pdf'
)

if result['error']:
    print(f"Error: {result['error']}")
else:
    print(f"Downloaded to: {result['local_path']}")
    print(f"Title: {result['metadata']['title']}")
    print(f"Number of chunks: {len(result['chunks'])}")

# Process literature results from API
literature_results = [
    {'source': 'arxiv', 'pdf_url': 'https://arxiv.org/pdf/2301.00001.pdf'},
    {'source': 'semantic_scholar', 'url': 'https://example.com/paper.pdf'}
]

processed = downloader.process_literature_results(literature_results)
for item in processed:
    if item.get('content'):
        print(f"Processed: {item['content']['metadata']['title']}")
    else:
        print(f"Error: {item.get('content_error')}")

Best Practices

  • Always ensure the llmsherpa service is running before instantiating the class, as it initializes the PDF reader in __init__
  • The class maintains state for rate limiting (domain_last_request), so reuse the same instance for multiple downloads to benefit from proper rate limiting
  • Cache files are stored with MD5 hashes of URLs as filenames; ensure download_dir has sufficient space for your use case
  • The cache_ttl parameter controls how long files are cached; adjust based on whether you need fresh downloads or can tolerate stale data
  • Rate limits are hardcoded per domain; modify domain_rate_limits dictionary if you have different API agreements
  • Methods like process_literature_results() and process_patent_results() modify the input dictionaries by adding 'content' and 'content_error' fields
  • Error handling returns error messages rather than raising exceptions, allowing batch processing to continue even if individual downloads fail
  • The class respects robots.txt implicitly through rate limiting but does not explicitly check robots.txt files
  • For large batch processing, consider the cumulative effect of rate limits on processing time
  • The User-Agent header can be customized per request via the headers parameter to identify your application properly

Similar Components

AI-powered semantic similarity - components with related functionality:

  • class DocumentProcessor_v1 64.9% similar

    A document processing class that extracts text from PDF and Word documents using llmsherpa as the primary method with fallback support for PyPDF2, pdfplumber, and python-docx.

    From: /tf/active/vicechatdev/contract_validity_analyzer/utils/document_processor_new.py
  • class DocumentProcessor_v2 64.6% similar

    A document processing class that extracts text from PDF and Word documents using llmsherpa as the primary method with fallback support for PyPDF2, pdfplumber, and python-docx.

    From: /tf/active/vicechatdev/contract_validity_analyzer/utils/document_processor_old.py
  • class DocumentProcessor_v4 57.0% similar

    Handles document processing and text extraction using llmsherpa (same approach as offline_docstore_multi_vice.py).

    From: /tf/active/vicechatdev/docchat/document_processor.py
  • class TestDocumentProcessor 55.9% similar

    A test subclass of DocumentProcessor that simulates llmsherpa PDF processing failures and triggers OCR fallback mechanisms for testing purposes.

    From: /tf/active/vicechatdev/contract_validity_analyzer/test_ocr_fallback.py
  • class RegulatoryExtractor 55.7% similar

    A class for extracting structured metadata from regulatory guideline PDF documents using LLM-based analysis and storing the results in an Excel tracking spreadsheet.

    From: /tf/active/vicechatdev/reg_extractor.py
← Back to Browse