function test_with_real_documents
Tests a contract analyzer system by processing real documents from FileCloud, extracting contract information, and generating analysis reports with performance metrics.
/tf/active/vicechatdev/contract_validity_analyzer/test_real_documents.py
37 - 164
complex
Purpose
This function serves as an end-to-end integration test for a contract validity analysis pipeline. It connects to FileCloud to retrieve contract documents, processes them through a ContractAnalyzer that uses LLM-based analysis to extract key contract information (third parties, dates, validity status), and generates CSV/JSON output reports. The function provides detailed console output showing progress, results summary, performance metrics, and determines if the pipeline is ready for production use based on a 70% success rate threshold.
Source Code
def test_with_real_documents(max_documents=5):
"""
Test the contract analyzer with real documents from FileCloud.
Args:
max_documents: Maximum number of documents to process for testing
"""
print("=" * 60)
print("Contract Validity Analyzer - Real Document Test")
print("=" * 60)
log_file = setup_logging()
print(f"Detailed logs will be written to: {log_file}")
try:
# Load configuration
print("\n1. Loading configuration...")
config = Config()
config_dict = config.config
print(f"ā Configuration loaded")
print(f" - FileCloud server: {config_dict['filecloud']['server_url']}")
print(f" - Base path: {config_dict['filecloud']['base_path']}")
print(f" - Max documents for test: {max_documents}")
# Initialize analyzer
print("\n2. Initializing Contract Analyzer...")
analyzer = ContractAnalyzer(config_dict)
print("ā Analyzer initialized")
# Run analysis on real documents
print(f"\n3. Analyzing up to {max_documents} real documents...")
print(" This will:")
print(" - Connect to FileCloud")
print(" - Search for contract documents")
print(" - Download and process documents")
print(" - Extract text content")
print(" - Analyze with LLM for contract validity")
print(" - Generate CSV output")
# Run the analysis
results = analyzer.analyze_contracts(max_files=max_documents)
# Display results summary
print(f"\n4. Analysis Results Summary:")
print(f" - Total documents processed: {len(results)}")
successful_analyses = [r for r in results if r.get('analysis_result', {}).get('success', False)]
failed_analyses = [r for r in results if not r.get('analysis_result', {}).get('success', False)]
print(f" - Successful analyses: {len(successful_analyses)}")
print(f" - Failed analyses: {len(failed_analyses)}")
if successful_analyses:
print(f"\n Sample successful analyses:")
for i, result in enumerate(successful_analyses[:3]):
analysis = result.get('analysis_result', {})
print(f" {i+1}. {result.get('filename', 'Unknown')}")
print(f" Third parties: {analysis.get('third_parties', 'N/A')}")
print(f" Start date: {analysis.get('start_date', 'N/A')}")
print(f" End date: {analysis.get('end_date', 'N/A')}")
print(f" In effect: {analysis.get('in_effect', 'N/A')}")
if failed_analyses:
print(f"\n Sample failed analyses:")
for i, result in enumerate(failed_analyses[:3]):
error = result.get('analysis_result', {}).get('error', 'Unknown error')
print(f" {i+1}. {result.get('filename', 'Unknown')}: {error}")
# Show output files
output_dir = Path(config_dict['output'].get('directory', './output'))
csv_files = list(output_dir.glob("*.csv"))
json_files = list(output_dir.glob("*.json"))
print(f"\n5. Output Files Generated:")
if csv_files:
for csv_file in csv_files:
file_size = csv_file.stat().st_size
print(f" - CSV: {csv_file.name} ({file_size} bytes)")
if json_files:
for json_file in json_files:
file_size = json_file.stat().st_size
print(f" - JSON: {json_file.name} ({file_size} bytes)")
# Performance metrics
total_time = sum(r.get('processing_time', 0) for r in results)
avg_time = total_time / len(results) if results else 0
print(f"\n6. Performance Metrics:")
print(f" - Total processing time: {total_time:.2f} seconds")
print(f" - Average time per document: {avg_time:.2f} seconds")
# Show latest CSV content preview if available
if csv_files:
latest_csv = max(csv_files, key=lambda f: f.stat().st_mtime)
print(f"\n7. Sample CSV Output (from {latest_csv.name}):")
try:
with open(latest_csv, 'r', encoding='utf-8') as f:
lines = f.readlines()
for i, line in enumerate(lines[:6]): # Show header + first 5 rows
print(f" {line.strip()}")
if i == 0: # After header
print(" " + "-" * 50)
if len(lines) > 6:
print(f" ... and {len(lines) - 6} more rows")
except Exception as e:
print(f" Error reading CSV: {e}")
print(f"\n" + "=" * 60)
if len(results) > 0:
success_rate = len(successful_analyses) / len(results) * 100
print(f"š Real document test completed successfully!")
print(f" Success rate: {success_rate:.1f}% ({len(successful_analyses)}/{len(results)} documents)")
if success_rate >= 70:
print("ā
Pipeline is ready for full dataset processing")
return True
else:
print("ā ļø Pipeline needs improvement before full dataset processing")
return False
else:
print("ā No documents were processed")
return False
except Exception as e:
print(f"\nā Real document test failed: {e}")
logging.exception("Real document test failed")
return False
Parameters
| Name | Type | Default | Kind |
|---|---|---|---|
max_documents |
- | 5 | positional_or_keyword |
Parameter Details
max_documents: Maximum number of contract documents to retrieve and process from FileCloud during the test run. Defaults to 5 to keep test execution time reasonable. Should be a positive integer. Higher values provide more comprehensive testing but increase execution time and API costs.
Return Value
Returns a boolean value indicating overall test success. Returns True if documents were processed and the success rate is >= 70% (indicating pipeline readiness for production). Returns False if no documents were processed, the success rate is < 70%, or an exception occurred during execution. This return value can be used in automated testing pipelines to gate deployment.
Dependencies
pathlibdatetimeloggingjsonossys
Required Imports
import os
import sys
import logging
import json
from pathlib import Path
from datetime import datetime
from config.config import Config
from core.analyzer import ContractAnalyzer
Usage Example
# Basic usage with default 5 documents
from test_module import test_with_real_documents
# Run test with default settings
success = test_with_real_documents()
if success:
print("Pipeline is production-ready")
else:
print("Pipeline needs improvement")
# Test with more documents for comprehensive validation
success = test_with_real_documents(max_documents=20)
# Use in automated testing
import sys
if __name__ == "__main__":
result = test_with_real_documents(max_documents=10)
sys.exit(0 if result else 1)
Best Practices
- Ensure FileCloud credentials are properly configured before running this test to avoid authentication failures
- Start with a small max_documents value (5-10) to validate the pipeline before processing larger batches
- Monitor the log file output for detailed error information when analyses fail
- Check that sufficient disk space is available in the output directory for generated CSV/JSON files
- Verify LLM API rate limits and quotas can handle the max_documents setting to avoid throttling
- The 70% success rate threshold is hardcoded; consider making it configurable for different quality requirements
- Review failed analyses in the output to identify patterns (e.g., specific document formats causing issues)
- Use this function in CI/CD pipelines with appropriate max_documents settings to validate deployments
- Ensure the setup_logging() function is defined in the same module before calling this function
- Consider running this test in a staging environment before production to validate FileCloud connectivity
Tags
Similar Components
AI-powered semantic similarity - components with related functionality:
-
function test_end_date_extraction 70.8% similar
-
function test_single_document 68.3% similar
-
function main_v24 63.3% similar
-
function main_v22 63.2% similar
-
function test_full_analyzer 63.1% similar