class DocumentDownloader
A client class for downloading documents (primarily PDFs) from various sources, managing download caching, respecting rate limits per domain, and processing documents using llmsherpa for content extraction.
/tf/active/vicechatdev/QA_updater/data_access/document_downloader.py
14 - 312
complex
Purpose
DocumentDownloader provides a comprehensive solution for fetching and processing academic and research documents from multiple sources (arXiv, PubMed, Semantic Scholar, patent databases, clinical trials). It handles rate limiting for different domains, caches downloaded files to avoid redundant downloads, and uses llmsherpa's LayoutPDFReader to extract structured content including text chunks, metadata, and bounding boxes. The class is designed for research workflows that need to download and process large numbers of documents while respecting API rate limits and efficiently managing local storage.
Source Code
class DocumentDownloader:
"""Client for downloading documents from various sources and processing them with llmsherpa."""
def __init__(self, config: ConfigParser, download_dir: str = None, cache_ttl: int = 86400):
"""
Initialize document downloader.
Args:
config: Configuration object containing path settings.
download_dir: Directory to store downloaded documents (default: system temp directory)
cache_ttl: How long to cache downloaded files in seconds (default: 24 hours)
"""
self.logger = logging.getLogger(__name__)
self.config = config
self.download_dir = download_dir or tempfile.gettempdir()
self.cache_ttl = cache_ttl
# Create download directory if it doesn't exist
os.makedirs(self.download_dir, exist_ok=True)
# Initialize llmsherpa PDF reader
llmsherpa_api_url = "http://llmsherpa:5001/api/parseDocument?renderFormat=all&useNewIndentParser=yes"
self.pdf_reader = llmsherpa.readers.LayoutPDFReader(llmsherpa_api_url)
# Track rate limits per domain
self.domain_last_request = {}
self.domain_rate_limits = {
"arxiv.org": 3.0, # 1 request per 3 seconds
"pubmed.ncbi.nlm.nih.gov": 0.33, # 3 requests per second
"api.semanticscholar.org": 0.33, # 3 requests per second
"clinicaltrials.gov": 0.5, # 2 requests per second
"patents.google.com": 1.0, # 1 request per second
"worldwide.espacenet.com": 1.0, # 1 request per second
"uspto.gov": 0.5, # 2 requests per second
"default": 1.0 # Default: 1 request per second
}
self.logger.info("DocumentDownloader initialized.")
def _get_cache_path(self, url: str) -> Path:
"""Generate a unique cache path for a URL."""
# Create a hash of the URL to use as filename
url_hash = hashlib.md5(url.encode()).hexdigest()
# Get file extension from URL or default to .pdf
path = urlparse(url).path
ext = os.path.splitext(path)[1].lower() or ".pdf"
# Ensure extension starts with period
if not ext.startswith('.'):
ext = '.' + ext
return Path(self.download_dir) / f"{url_hash}{ext}"
def _respect_rate_limit(self, url: str):
"""Respect rate limits for different domains."""
domain = urlparse(url).netloc
# Get rate limit for this domain
rate_limit = self.domain_rate_limits.get(domain, self.domain_rate_limits["default"])
# Check if we need to wait
current_time = time.time()
if domain in self.domain_last_request:
time_since_last = current_time - self.domain_last_request[domain]
if time_since_last < rate_limit:
wait_time = rate_limit - time_since_last
time.sleep(wait_time)
# Update last request time
self.domain_last_request[domain] = time.time()
def download_document(self, url: str, headers: Dict[str, str] = None) -> Tuple[Optional[str], Optional[str]]:
"""
Download document from URL, respecting rate limits and using cache.
Args:
url: URL to download
headers: Additional headers for the request
Returns:
Tuple of (local_path, error_message)
"""
cache_path = self._get_cache_path(url)
# Check if we have a valid cached version
if cache_path.exists():
cache_age = time.time() - os.path.getmtime(cache_path)
if cache_age < self.cache_ttl:
self.logger.info(f"Using cached version of {url} at {cache_path}")
return str(cache_path), None
# Respect rate limits
self._respect_rate_limit(url)
# Prepare headers
request_headers = {
"User-Agent": "Mozilla/5.0 (compatible; Research/1.0; +https://example.org/research-bot)"
}
if headers:
request_headers.update(headers)
# Attempt download
try:
self.logger.info(f"Downloading {url} to {cache_path}")
response = requests.get(url, headers=request_headers, stream=True, timeout=30)
response.raise_for_status()
# Check content type
content_type = response.headers.get('Content-Type', '').lower()
if 'pdf' not in content_type and 'application/octet-stream' not in content_type:
self.logger.warning(f"Warning: {url} content type is {content_type}, which may not be a PDF")
# Save file
with open(cache_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
return str(cache_path), None
except requests.exceptions.RequestException as e:
error_msg = f"Error downloading {url}: {str(e)}"
self.logger.error(error_msg)
return None, error_msg
def process_pdf(self, path: str) -> Dict[str, Any]:
"""
Process a PDF file using llmsherpa.
Args:
path: Path to PDF file
Returns:
Dictionary with extracted content
"""
try:
# Process PDF with llmsherpa
document = self.pdf_reader.read_pdf(path)
# Extract chunks
chunks = []
for i, chunk in enumerate(document.chunks()):
chunks.append({
"chunk_id": i,
"text": chunk.text,
"page_num": chunk.page_num,
"bbox": chunk.bbox
})
# Get metadata
metadata = {
"title": document.metadata.get("title", ""),
"author": document.metadata.get("author", ""),
"creation_date": document.metadata.get("creationDate", ""),
"page_count": document.metadata.get("pageCount", 0)
}
return {
"path": path,
"metadata": metadata,
"chunks": chunks,
"error": None
}
except Exception as e:
error_msg = f"Error processing PDF {path}: {str(e)}"
self.logger.error(error_msg)
return {
"path": path,
"metadata": {},
"chunks": [],
"error": error_msg
}
def download_and_process(self, url: str, headers: Dict[str, str] = None) -> Dict[str, Any]:
"""
Download and process a document from URL.
Args:
url: URL to download
headers: Additional headers for the request
Returns:
Dictionary with processed content
"""
# Download document
local_path, error = self.download_document(url, headers)
if error:
return {
"url": url,
"local_path": None,
"metadata": {},
"chunks": [],
"error": error
}
# Process PDF
result = self.process_pdf(local_path)
result["url"] = url
result["local_path"] = local_path
return result
def process_literature_results(self, results: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
Process literature results by downloading and extracting content.
Args:
results: List of literature results from API
Returns:
List of results with added content
"""
processed_results = []
for result in results:
# Skip if no URL available
url = None
# Check different URL fields depending on source
if result.get('source') == 'arxiv':
url = result.get('pdf_url')
elif result.get('source') == 'semantic_scholar':
url = result.get('url')
elif result.get('source') == 'pubmed':
# PubMed results often need to construct a URL to the full text
if result.get('doi'):
url = f"https://doi.org/{result['doi']}"
elif 'url' in result:
url = result['url']
if not url:
result['content'] = None
result['content_error'] = "No URL available for download"
processed_results.append(result)
continue
# Download and process
content_result = self.download_and_process(url)
# Add content to result
if content_result.get('error'):
result['content'] = None
result['content_error'] = content_result['error']
else:
result['content'] = {
'local_path': content_result['local_path'],
'metadata': content_result['metadata'],
'chunks': content_result['chunks']
}
processed_results.append(result)
return processed_results
def process_patent_results(self, results: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Process patent results by downloading and extracting content."""
# Similar implementation as process_literature_results, with patent-specific URL handling
processed_results = []
for result in results:
url = result.get('url')
if not url:
result['content'] = None
result['content_error'] = "No URL available for download"
processed_results.append(result)
continue
# Handle different patent sources with custom headers if needed
headers = {}
if 'uspto.gov' in url:
headers = {
"User-Agent": "ResearchProject/1.0 (academic.research@example.edu)"
}
content_result = self.download_and_process(url, headers)
# Add content to result
if content_result.get('error'):
result['content'] = None
result['content_error'] = content_result['error']
else:
result['content'] = {
'local_path': content_result['local_path'],
'metadata': content_result['metadata'],
'chunks': content_result['chunks']
}
processed_results.append(result)
return processed_results
def process_clinical_trial_results(self, results: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Process clinical trial results."""
# Clinical trials typically don't have PDFs to download, but sometimes have protocols
# This would require customized handling per source
return results
Parameters
| Name | Type | Default | Kind |
|---|---|---|---|
bases |
- | - |
Parameter Details
config: ConfigParser object containing path settings and configuration. Used for application-wide configuration management.
download_dir: Optional string specifying the directory path where downloaded documents will be stored. If not provided, defaults to the system's temporary directory (from tempfile.gettempdir()). The directory will be created if it doesn't exist.
cache_ttl: Optional integer specifying the cache time-to-live in seconds. Determines how long downloaded files are considered valid before re-downloading. Defaults to 86400 seconds (24 hours).
Return Value
Instantiation returns a DocumentDownloader object. Key method returns: download_document() returns a tuple of (local_path: Optional[str], error_message: Optional[str]); process_pdf() returns a dictionary with keys 'path', 'metadata', 'chunks', and 'error'; download_and_process() returns a dictionary with keys 'url', 'local_path', 'metadata', 'chunks', and 'error'; process_literature_results() and process_patent_results() return lists of dictionaries with added 'content' and 'content_error' fields.
Class Interface
Methods
__init__(self, config: ConfigParser, download_dir: str = None, cache_ttl: int = 86400)
Purpose: Initialize the DocumentDownloader with configuration, download directory, and cache settings. Sets up logging, creates download directory, initializes llmsherpa PDF reader, and configures rate limits.
Parameters:
config: ConfigParser object containing path settingsdownload_dir: Optional directory path for storing downloads (defaults to system temp)cache_ttl: Cache time-to-live in seconds (defaults to 86400/24 hours)
Returns: None (constructor)
_get_cache_path(self, url: str) -> Path
Purpose: Generate a unique cache file path for a given URL using MD5 hashing and preserving file extension.
Parameters:
url: URL string to generate cache path for
Returns: Path object representing the cache file location
_respect_rate_limit(self, url: str)
Purpose: Enforce rate limiting for the domain of the given URL by sleeping if necessary. Updates last request time for the domain.
Parameters:
url: URL string whose domain will be rate-limited
Returns: None (side effect: may sleep to respect rate limits)
download_document(self, url: str, headers: Dict[str, str] = None) -> Tuple[Optional[str], Optional[str]]
Purpose: Download a document from URL with caching and rate limiting. Returns cached version if available and fresh, otherwise downloads new copy.
Parameters:
url: URL string of the document to downloadheaders: Optional dictionary of additional HTTP headers for the request
Returns: Tuple of (local_path, error_message). If successful, local_path is a string and error_message is None. If failed, local_path is None and error_message contains the error description.
process_pdf(self, path: str) -> Dict[str, Any]
Purpose: Process a PDF file using llmsherpa to extract text chunks, metadata, and structural information.
Parameters:
path: String path to the PDF file to process
Returns: Dictionary with keys: 'path' (input path), 'metadata' (dict with title, author, creation_date, page_count), 'chunks' (list of dicts with chunk_id, text, page_num, bbox), 'error' (None if successful, error message if failed)
download_and_process(self, url: str, headers: Dict[str, str] = None) -> Dict[str, Any]
Purpose: Combined method to download a document from URL and process it with llmsherpa in one call.
Parameters:
url: URL string of the document to download and processheaders: Optional dictionary of additional HTTP headers for the request
Returns: Dictionary with keys: 'url' (original URL), 'local_path' (downloaded file path or None), 'metadata' (extracted metadata dict), 'chunks' (list of extracted chunks), 'error' (None if successful, error message if failed)
process_literature_results(self, results: List[Dict[str, Any]]) -> List[Dict[str, Any]]
Purpose: Process a list of literature search results by downloading and extracting content from each. Handles different sources (arXiv, Semantic Scholar, PubMed) with source-specific URL extraction.
Parameters:
results: List of dictionaries containing literature search results with fields like 'source', 'pdf_url', 'url', 'doi'
Returns: List of dictionaries with original fields plus 'content' (dict with local_path, metadata, chunks if successful, or None if failed) and 'content_error' (error message if download/processing failed)
process_patent_results(self, results: List[Dict[str, Any]]) -> List[Dict[str, Any]]
Purpose: Process a list of patent search results by downloading and extracting content. Handles patent-specific sources with custom headers for USPTO.
Parameters:
results: List of dictionaries containing patent search results with 'url' field
Returns: List of dictionaries with original fields plus 'content' (dict with local_path, metadata, chunks if successful, or None if failed) and 'content_error' (error message if download/processing failed)
process_clinical_trial_results(self, results: List[Dict[str, Any]]) -> List[Dict[str, Any]]
Purpose: Process clinical trial results. Currently a placeholder that returns results unchanged, as clinical trials typically don't have PDFs to download.
Parameters:
results: List of dictionaries containing clinical trial search results
Returns: The input results list unchanged (no processing performed)
Attributes
| Name | Type | Description | Scope |
|---|---|---|---|
logger |
logging.Logger | Logger instance for the class, used to log download operations, errors, and cache hits | instance |
config |
ConfigParser | Configuration object containing path settings and other configuration parameters | instance |
download_dir |
str | Directory path where downloaded documents are stored and cached | instance |
cache_ttl |
int | Cache time-to-live in seconds; determines how long cached files are considered valid | instance |
pdf_reader |
llmsherpa.readers.LayoutPDFReader | llmsherpa PDF reader instance configured to parse documents with layout information | instance |
domain_last_request |
Dict[str, float] | Dictionary mapping domain names to timestamps of last request, used for rate limiting | instance |
domain_rate_limits |
Dict[str, float] | Dictionary mapping domain names to minimum seconds between requests. Includes specific limits for arxiv.org (3.0s), pubmed.ncbi.nlm.nih.gov (0.33s), api.semanticscholar.org (0.33s), clinicaltrials.gov (0.5s), patents.google.com (1.0s), worldwide.espacenet.com (1.0s), uspto.gov (0.5s), and default (1.0s) | instance |
Dependencies
ostimerequeststempfiletypingurllib.parseloggingpathlibhashlibllmsherpaconfigparser
Required Imports
import os
import time
import requests
import tempfile
from typing import List, Dict, Any, Optional, Tuple
from urllib.parse import urlparse
import logging
from pathlib import Path
import hashlib
import llmsherpa.readers
from configparser import ConfigParser
Usage Example
from configparser import ConfigParser
import logging
# Setup logging
logging.basicConfig(level=logging.INFO)
# Create config
config = ConfigParser()
# Initialize downloader
downloader = DocumentDownloader(
config=config,
download_dir='./downloads',
cache_ttl=86400
)
# Download and process a single document
result = downloader.download_and_process(
url='https://arxiv.org/pdf/2301.00001.pdf'
)
if result['error']:
print(f"Error: {result['error']}")
else:
print(f"Downloaded to: {result['local_path']}")
print(f"Title: {result['metadata']['title']}")
print(f"Number of chunks: {len(result['chunks'])}")
# Process literature results from API
literature_results = [
{'source': 'arxiv', 'pdf_url': 'https://arxiv.org/pdf/2301.00001.pdf'},
{'source': 'semantic_scholar', 'url': 'https://example.com/paper.pdf'}
]
processed = downloader.process_literature_results(literature_results)
for item in processed:
if item.get('content'):
print(f"Processed: {item['content']['metadata']['title']}")
else:
print(f"Error: {item.get('content_error')}")
Best Practices
- Always ensure the llmsherpa service is running before instantiating the class, as it initializes the PDF reader in __init__
- The class maintains state for rate limiting (domain_last_request), so reuse the same instance for multiple downloads to benefit from proper rate limiting
- Cache files are stored with MD5 hashes of URLs as filenames; ensure download_dir has sufficient space for your use case
- The cache_ttl parameter controls how long files are cached; adjust based on whether you need fresh downloads or can tolerate stale data
- Rate limits are hardcoded per domain; modify domain_rate_limits dictionary if you have different API agreements
- Methods like process_literature_results() and process_patent_results() modify the input dictionaries by adding 'content' and 'content_error' fields
- Error handling returns error messages rather than raising exceptions, allowing batch processing to continue even if individual downloads fail
- The class respects robots.txt implicitly through rate limiting but does not explicitly check robots.txt files
- For large batch processing, consider the cumulative effect of rate limits on processing time
- The User-Agent header can be customized per request via the headers parameter to identify your application properly
Tags
Similar Components
AI-powered semantic similarity - components with related functionality:
-
class DocumentProcessor_v1 64.9% similar
-
class DocumentProcessor_v2 64.6% similar
-
class DocumentProcessor_v4 57.0% similar
-
class TestDocumentProcessor 55.9% similar
-
class RegulatoryExtractor 55.7% similar