class ContractAnalyzer
Main class for analyzing contract validity from FileCloud documents.
/tf/active/vicechatdev/contract_validity_analyzer/core/analyzer.py
28 - 567
moderate
Purpose
Main class for analyzing contract validity from FileCloud documents.
Source Code
class ContractAnalyzer:
"""Main class for analyzing contract validity from FileCloud documents."""
def __init__(self, config: Dict[str, Any]):
"""
Initialize the contract validity analyzer.
Args:
config: Configuration dictionary
"""
self.config = config
# Initialize components
self.filecloud_client = FileCloudClient(config['filecloud'])
self.document_processor = DocumentProcessor(config.get('document_processing', {}))
self.llm_client = LLMClient(config['llm'])
# Analysis settings
self.analysis_config = config.get('analysis', {})
self.output_config = config['output']
# Results storage
self.results = []
self.errors = []
logger.info("Contract Validity Analyzer initialized")
def analyze_contracts(self, max_concurrent: int = 3, dry_run: bool = False, max_files: int = None) -> List[Dict[str, Any]]:
"""
Analyze all contracts in the configured FileCloud path.
Args:
max_concurrent: Maximum number of concurrent document processing threads
dry_run: If True, only discover documents without processing them
max_files: Maximum number of files to process (for testing)
Returns:
List of analysis results
"""
with PerformanceLogger("full_contract_analysis") as perf:
# Step 1: Connect to FileCloud and discover documents
logger.info("Connecting to FileCloud...")
if not self.filecloud_client.connect():
raise Exception("Failed to connect to FileCloud")
logger.info("Searching for contract documents...")
documents = self.filecloud_client.search_documents()
if not documents:
logger.warning("No documents found in the specified path")
return []
# Limit documents if max_files is specified
if max_files:
documents = documents[:max_files]
perf.add_metric("documents_found", len(documents))
logger.info(f"Found {len(documents)} documents to analyze")
# If dry run, return empty results with document count
if dry_run:
self.filecloud_client.disconnect()
return [{'dry_run': True, 'document_count': len(documents)}]
# Step 2: Process documents
self.results = []
self.errors = []
# Use ThreadPoolExecutor for concurrent processing
with concurrent.futures.ThreadPoolExecutor(max_workers=max_concurrent) as executor:
with ProgressLogger(len(documents), "Document Analysis") as progress:
# Submit all tasks
future_to_doc = {
executor.submit(self._analyze_single_document, doc): doc
for doc in documents
}
# Process completed tasks
for future in concurrent.futures.as_completed(future_to_doc):
doc = future_to_doc[future]
try:
result = future.result()
if result:
if result.get('error'):
self.errors.append(result)
progress.update(error=True)
else:
self.results.append(result)
progress.update()
else:
self.errors.append({
'filename': doc.get('filename', 'unknown'),
'filecloud_path': doc.get('path', ''),
'file_size_bytes': doc.get('size', 0),
'error': 'No result returned'
})
progress.update(error=True)
except Exception as e:
logger.error(f"Error processing document {doc.get('filename', 'unknown')}: {e}")
self.errors.append({
'filename': doc.get('filename', 'unknown'),
'filecloud_path': doc.get('path', ''),
'file_size_bytes': doc.get('size', 0),
'error': str(e)
})
progress.update(error=True)
# Step 3: Generate reports
self._save_results()
perf.add_metric("successful_analyses", len(self.results))
perf.add_metric("failed_analyses", len(self.errors))
# Step 4: Cleanup
self.filecloud_client.disconnect()
logger.info(f"Analysis complete: {len(self.results)} successful, {len(self.errors)} failed")
return self.results
def _analyze_single_document(self, doc_info: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""
Analyze a single document.
Args:
doc_info: Document information from FileCloud
Returns:
Analysis result dictionary
"""
filename = doc_info.get('filename', 'unknown')
file_path = doc_info.get('full_path', '')
try:
logger.debug(f"Analyzing document: {filename}")
# Step 1: Download document
temp_file_path = self.filecloud_client.download_to_temp_file(file_path)
if not temp_file_path:
return {
'filename': filename,
'filecloud_path': file_path,
'filecloud_url': self.generate_filecloud_url(file_path),
'file_size_bytes': doc_info.get('size', 0),
'error': 'Failed to download document from FileCloud'
}
try:
# Step 2: Process document and extract text
processing_result = self.document_processor.process_document(
temp_file_path, filename
)
if not processing_result.get('success'):
return {
'filename': filename,
'filecloud_path': file_path,
'filecloud_url': self.generate_filecloud_url(file_path),
'file_size_bytes': doc_info.get('size', 0),
'error': processing_result.get('error', 'Document processing failed')
}
document_text = processing_result.get('text', '')
if not document_text:
return {
'filename': filename,
'filecloud_path': file_path,
'filecloud_url': self.generate_filecloud_url(file_path),
'file_size_bytes': doc_info.get('size', 0),
'error': 'No text extracted from document'
}
# Step 3: Analyze with LLM (first attempt)
analysis_result = self.llm_client.analyze_contract(document_text, filename)
# Step 3.5: Validate and fix consistency
analysis_result = self._validate_and_fix_consistency(analysis_result)
# Step 4: Check if end date was found, if not retry with OCR
end_date = analysis_result.get('end_date')
if not end_date or end_date in ['null', None, '']:
logger.info(f"No end date found for {filename}, attempting OCR retry...")
# Try OCR extraction
ocr_result = self.document_processor.process_document_with_ocr(
temp_file_path, filename
)
if ocr_result.get('success') and ocr_result.get('text'):
ocr_text = ocr_result.get('text', '')
if ocr_text and len(ocr_text) > len(document_text) * 0.5: # OCR found substantial additional text
logger.info(f"OCR extracted {len(ocr_text)} chars vs original {len(document_text)} chars for {filename}")
# Re-analyze with OCR text
ocr_analysis_result = self.llm_client.analyze_contract(ocr_text, filename)
ocr_end_date = ocr_analysis_result.get('end_date')
if ocr_end_date and ocr_end_date not in ['null', None, '']:
logger.info(f"OCR retry successful! Found end date: {ocr_end_date} for {filename}")
analysis_result = ocr_analysis_result
analysis_result['_ocr_retry'] = True
analysis_result['_original_text_length'] = len(document_text)
analysis_result['_ocr_text_length'] = len(ocr_text)
else:
logger.info(f"OCR retry did not find end date for {filename}")
analysis_result['_ocr_retry_attempted'] = True
else:
logger.info(f"OCR did not provide substantial additional text for {filename}")
analysis_result['_ocr_retry_attempted'] = True
else:
logger.warning(f"OCR processing failed for {filename}")
analysis_result['_ocr_retry_failed'] = True
# Step 5: Add metadata
analysis_result.update({
'filecloud_path': file_path,
'filecloud_url': self.generate_filecloud_url(file_path),
'file_size_bytes': doc_info.get('size', 0),
'analysis_timestamp': datetime.now().isoformat(),
'document_metadata': processing_result.get('metadata', {})
})
# Step 6: Validate and fix consistency
analysis_result = self._validate_and_fix_consistency(analysis_result)
return analysis_result
finally:
# Clean up temporary file
self.document_processor.cleanup_temp_file(temp_file_path)
except Exception as e:
logger.error(f"Error analyzing document {filename}: {e}")
return {
'filename': filename,
'filecloud_path': file_path,
'filecloud_url': self.generate_filecloud_url(file_path),
'file_size_bytes': doc_info.get('size', 0),
'error': str(e)
}
def _save_results(self):
"""Save analysis results to CSV and JSON files."""
try:
timestamp = datetime.now().strftime(self.output_config.get('timestamp_format', '%Y%m%d_%H%M%S'))
# Create output directory if it doesn't exist
output_dir = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'output')
os.makedirs(output_dir, exist_ok=True)
# Save Excel report
excel_filename = self.output_config.get('excel_filename', 'contract_validity_analysis.xlsx')
excel_path = os.path.join(output_dir, f"{timestamp}_{excel_filename}")
self._save_excel_report(excel_path)
# Save detailed JSON report
json_path = os.path.join(output_dir, f"{timestamp}_detailed_analysis.json")
self._save_json_report(json_path)
# Save error report if there are errors
if self.errors:
error_path = os.path.join(output_dir, f"{timestamp}_analysis_errors.json")
with open(error_path, 'w') as f:
json.dump(self.errors, f, indent=2)
logger.info(f"Error report saved to: {error_path}")
# Save failed documents Excel report
failed_docs_path = os.path.join(output_dir, f"{timestamp}_failed_documents.xlsx")
self._save_failed_documents_report(failed_docs_path)
logger.info(f"Results saved to: {excel_path}")
logger.info(f"Detailed report saved to: {json_path}")
except Exception as e:
logger.error(f"Error saving results: {e}")
def _save_excel_report(self, excel_path: str):
"""Save results to Excel file following reg_extractor pattern."""
# Prepare data for DataFrame
data_rows = []
for result in self.results:
# Format the row for Excel
row = {
'filename': result.get('filename', ''),
'contract_type': result.get('contract_type', 'Unknown'),
'third_parties': '; '.join(result.get('third_parties', [])),
'third_party_emails': '; '.join(result.get('third_party_emails', [])),
'third_party_tax_ids': '; '.join(result.get('third_party_tax_ids', [])),
'start_date': result.get('start_date', ''),
'end_date': result.get('end_date', ''),
'is_in_effect': result.get('is_in_effect', False),
'confidence': result.get('confidence', 0.0),
'analysis_notes': result.get('analysis_notes', ''),
'filecloud_path': result.get('filecloud_path', ''),
'filecloud_url': result.get('filecloud_url', ''),
'file_size_mb': round(result.get('file_size_bytes', 0) / (1024 * 1024), 2),
'analysis_timestamp': result.get('analysis_timestamp', '')
}
data_rows.append(row)
# Create DataFrame
df = pd.DataFrame(data_rows)
# Write to Excel
try:
df.to_excel(excel_path, index=False)
logger.info(f"Successfully saved {len(data_rows)} contract analyses to Excel")
except Exception as e:
logger.error(f"Error writing to Excel file: {str(e)}")
# Fallback to CSV if Excel fails
csv_path = excel_path.replace('.xlsx', '.csv')
df.to_csv(csv_path, index=False)
logger.info(f"Saved to CSV as fallback: {csv_path}")
raise
def _save_json_report(self, json_path: str):
"""Save detailed results to JSON file."""
report = {
'analysis_summary': {
'total_documents': len(self.results) + len(self.errors),
'successful_analyses': len(self.results),
'failed_analyses': len(self.errors),
'documents_in_effect': len([r for r in self.results if r.get('is_in_effect', False)]),
'documents_expired': len([r for r in self.results if not r.get('is_in_effect', False)]),
'generation_timestamp': datetime.now().isoformat()
},
'llm_usage_stats': self.llm_client.get_usage_stats(),
'analysis_results': self.results,
'processing_errors': self.errors
}
with open(json_path, 'w', encoding='utf-8') as f:
json.dump(report, f, indent=2, ensure_ascii=False)
def get_summary_stats(self) -> Dict[str, Any]:
"""Get summary statistics of the analysis."""
if not self.results:
return {}
in_effect_count = len([r for r in self.results if r.get('is_in_effect', False)])
expired_count = len(self.results) - in_effect_count
avg_confidence = sum(r.get('confidence', 0.0) for r in self.results) / len(self.results)
return {
'total_analyzed': len(self.results),
'in_effect': in_effect_count,
'expired': expired_count,
'failed_analyses': len(self.errors),
'average_confidence': round(avg_confidence, 3),
'in_effect_percentage': round((in_effect_count / len(self.results)) * 100, 1)
}
def generate_filecloud_url(self, filepath: str) -> str:
"""
Generate a FileCloud URL for the given file path using the same logic
from reg_extractor.
Args:
filepath: Path to the file
Returns:
FileCloud URL for the file
"""
# Create file basename for display
filename = os.path.basename(filepath)
# Escape spaces in filename with + for the first part
encoded_filename = filename.replace(' ', '+')
# Extract directory path without filename
directory_path = os.path.dirname(filepath)
# Ensure path ends with '/'
if directory_path and not directory_path.endswith('/'):
directory_path += '/'
# Encode path for the second part (after #expl-tabl.)
encoded_path = directory_path
encoded_path = encoded_path.replace(' ', '%20')
# Construct the full URL
file_url = f"https://filecloud.vicebio.com/ui/core/index.html?filter={encoded_filename}#expl-tabl.{encoded_path}"
return file_url
def _save_failed_documents_report(self, failed_docs_path: str):
"""Save failed documents to Excel file with filename and FileCloud URL."""
# Prepare data for failed documents
failed_data = []
for error in self.errors:
# Get filename from error data
filename = error.get('filename', 'Unknown')
filecloud_path = error.get('filecloud_path', '')
# Generate FileCloud URL
filecloud_url = ''
if filecloud_path:
filecloud_url = self.generate_filecloud_url(filecloud_path)
# Get error reason
error_reason = error.get('error', error.get('analysis_notes', 'Unknown error'))
failed_data.append({
'filename': filename,
'filecloud_path': filecloud_path,
'filecloud_url': filecloud_url,
'error_reason': error_reason,
'file_size_bytes': error.get('file_size_bytes', 0),
'file_size_mb': round(error.get('file_size_bytes', 0) / (1024 * 1024), 2),
'analysis_timestamp': error.get('analysis_timestamp', '')
})
# Create DataFrame
df = pd.DataFrame(failed_data)
# Write to Excel
try:
df.to_excel(failed_docs_path, index=False)
logger.info(f"Successfully saved {len(failed_data)} failed documents to Excel: {failed_docs_path}")
except Exception as e:
logger.error(f"Error writing failed documents to Excel file: {str(e)}")
# Fallback to CSV if Excel fails
csv_path = failed_docs_path.replace('.xlsx', '.csv')
df.to_csv(csv_path, index=False)
logger.info(f"Saved failed documents to CSV as fallback: {csv_path}")
def _validate_and_fix_consistency(self, analysis_result: Dict[str, Any]) -> Dict[str, Any]:
"""
Validate and fix inconsistencies between is_in_effect flag and analysis dates/notes.
Args:
analysis_result: The analysis result from LLM
Returns:
Corrected analysis result
"""
try:
# Extract key fields
start_date = analysis_result.get('start_date')
end_date = analysis_result.get('end_date')
is_in_effect = analysis_result.get('is_in_effect', False)
analysis_notes = analysis_result.get('analysis_notes', '')
filename = analysis_result.get('filename', 'unknown')
# Define today's date consistently
from datetime import datetime, date
today = date(2025, 7, 2) # July 2, 2025
# Parse dates if they exist
start_parsed = None
end_parsed = None
if start_date and start_date not in ['null', None, '']:
try:
start_parsed = datetime.strptime(str(start_date), '%Y-%m-%d').date()
except:
logger.warning(f"Could not parse start_date '{start_date}' for {filename}")
if end_date and end_date not in ['null', None, '']:
try:
end_parsed = datetime.strptime(str(end_date), '%Y-%m-%d').date()
except:
logger.warning(f"Could not parse end_date '{end_date}' for {filename}")
# Determine what is_in_effect should be based on dates
calculated_in_effect = None
if start_parsed and end_parsed:
# Both dates available - check if today is between them
calculated_in_effect = start_parsed <= today <= end_parsed
reason = f"start_date ({start_date}) <= today (2025-07-02) <= end_date ({end_date})"
elif start_parsed and not end_parsed:
# Only start date - check if started and analyze notes for indefinite/perpetual
if start_parsed <= today:
if any(word in analysis_notes.lower() for word in ['indefinite', 'perpetual', 'ongoing', 'until terminated']):
calculated_in_effect = True
reason = f"started ({start_date}) and appears indefinite/perpetual"
else:
calculated_in_effect = None # Unclear without end date
reason = f"started ({start_date}) but no clear end date"
else:
calculated_in_effect = False
reason = f"start_date ({start_date}) is in the future"
elif not start_parsed and end_parsed:
# Only end date - assume started unless otherwise indicated
calculated_in_effect = today <= end_parsed
reason = f"assuming started, end_date ({end_date}) {'has not' if calculated_in_effect else 'has'} passed"
else:
# No clear dates - analyze notes
if any(word in analysis_notes.lower() for word in ['in effect', 'currently valid', 'active', 'valid until']):
calculated_in_effect = True
reason = "analysis notes indicate contract is active"
elif any(word in analysis_notes.lower() for word in ['expired', 'not in effect', 'terminated']):
calculated_in_effect = False
reason = "analysis notes indicate contract is inactive"
else:
calculated_in_effect = None
reason = "unclear from available information"
# Check for inconsistency and fix if needed
if calculated_in_effect is not None and calculated_in_effect != is_in_effect:
logger.warning(f"Inconsistency detected for {filename}:")
logger.warning(f" LLM set is_in_effect={is_in_effect}")
logger.warning(f" Calculated should be {calculated_in_effect} ({reason})")
logger.warning(f" Analysis notes: {analysis_notes[:100]}...")
# Fix the inconsistency
analysis_result['is_in_effect'] = calculated_in_effect
analysis_result['_consistency_fix'] = {
'original_value': is_in_effect,
'corrected_value': calculated_in_effect,
'reason': reason
}
logger.info(f"Corrected is_in_effect from {is_in_effect} to {calculated_in_effect} for {filename}")
# Ensure new fields have default values if missing
if 'third_party_emails' not in analysis_result:
analysis_result['third_party_emails'] = []
if 'third_party_tax_ids' not in analysis_result:
analysis_result['third_party_tax_ids'] = []
# Ensure these are lists
if not isinstance(analysis_result.get('third_party_emails'), list):
analysis_result['third_party_emails'] = []
if not isinstance(analysis_result.get('third_party_tax_ids'), list):
analysis_result['third_party_tax_ids'] = []
return analysis_result
except Exception as e:
logger.error(f"Error in consistency validation: {e}")
return analysis_result
Parameters
| Name | Type | Default | Kind |
|---|---|---|---|
bases |
- | - |
Parameter Details
bases: Parameter of type
Return Value
Returns unspecified type
Class Interface
Methods
__init__(self, config)
Purpose: Initialize the contract validity analyzer. Args: config: Configuration dictionary
Parameters:
config: Type: Dict[str, Any]
Returns: None
analyze_contracts(self, max_concurrent, dry_run, max_files) -> List[Dict[str, Any]]
Purpose: Analyze all contracts in the configured FileCloud path. Args: max_concurrent: Maximum number of concurrent document processing threads dry_run: If True, only discover documents without processing them max_files: Maximum number of files to process (for testing) Returns: List of analysis results
Parameters:
max_concurrent: Type: intdry_run: Type: boolmax_files: Type: int
Returns: Returns List[Dict[str, Any]]
_analyze_single_document(self, doc_info) -> Optional[Dict[str, Any]]
Purpose: Analyze a single document. Args: doc_info: Document information from FileCloud Returns: Analysis result dictionary
Parameters:
doc_info: Type: Dict[str, Any]
Returns: Returns Optional[Dict[str, Any]]
_save_results(self)
Purpose: Save analysis results to CSV and JSON files.
Returns: None
_save_excel_report(self, excel_path)
Purpose: Save results to Excel file following reg_extractor pattern.
Parameters:
excel_path: Type: str
Returns: None
_save_json_report(self, json_path)
Purpose: Save detailed results to JSON file.
Parameters:
json_path: Type: str
Returns: None
get_summary_stats(self) -> Dict[str, Any]
Purpose: Get summary statistics of the analysis.
Returns: Returns Dict[str, Any]
generate_filecloud_url(self, filepath) -> str
Purpose: Generate a FileCloud URL for the given file path using the same logic from reg_extractor. Args: filepath: Path to the file Returns: FileCloud URL for the file
Parameters:
filepath: Type: str
Returns: Returns str
_save_failed_documents_report(self, failed_docs_path)
Purpose: Save failed documents to Excel file with filename and FileCloud URL.
Parameters:
failed_docs_path: Type: str
Returns: None
_validate_and_fix_consistency(self, analysis_result) -> Dict[str, Any]
Purpose: Validate and fix inconsistencies between is_in_effect flag and analysis dates/notes. Args: analysis_result: The analysis result from LLM Returns: Corrected analysis result
Parameters:
analysis_result: Type: Dict[str, Any]
Returns: Returns Dict[str, Any]
Required Imports
import logging
import os
import sys
import csv
import json
Usage Example
# Example usage:
# result = ContractAnalyzer(bases)
Tags
Similar Components
AI-powered semantic similarity - components with related functionality:
-
function main_v5 71.5% similar
-
function main_v6 65.1% similar
-
function main 65.1% similar
-
function main_v21 62.3% similar
-
function test_full_analyzer 56.8% similar