🔍 Code Extractor

class ContractAnalyzer

Maturity: 26

Main class for analyzing contract validity from FileCloud documents.

File:
/tf/active/vicechatdev/contract_validity_analyzer/core/analyzer.py
Lines:
28 - 567
Complexity:
moderate

Purpose

Main class for analyzing contract validity from FileCloud documents.

Source Code

class ContractAnalyzer:
    """Main class for analyzing contract validity from FileCloud documents."""
    
    def __init__(self, config: Dict[str, Any]):
        """
        Initialize the contract validity analyzer.
        
        Args:
            config: Configuration dictionary
        """
        self.config = config
        
        # Initialize components
        self.filecloud_client = FileCloudClient(config['filecloud'])
        self.document_processor = DocumentProcessor(config.get('document_processing', {}))
        self.llm_client = LLMClient(config['llm'])
        
        # Analysis settings
        self.analysis_config = config.get('analysis', {})
        self.output_config = config['output']
        
        # Results storage
        self.results = []
        self.errors = []
        
        logger.info("Contract Validity Analyzer initialized")
    
    def analyze_contracts(self, max_concurrent: int = 3, dry_run: bool = False, max_files: int = None) -> List[Dict[str, Any]]:
        """
        Analyze all contracts in the configured FileCloud path.
        
        Args:
            max_concurrent: Maximum number of concurrent document processing threads
            dry_run: If True, only discover documents without processing them
            max_files: Maximum number of files to process (for testing)
            
        Returns:
            List of analysis results
        """
        with PerformanceLogger("full_contract_analysis") as perf:
            
            # Step 1: Connect to FileCloud and discover documents
            logger.info("Connecting to FileCloud...")
            if not self.filecloud_client.connect():
                raise Exception("Failed to connect to FileCloud")
            
            logger.info("Searching for contract documents...")
            documents = self.filecloud_client.search_documents()
            
            if not documents:
                logger.warning("No documents found in the specified path")
                return []
            
            # Limit documents if max_files is specified
            if max_files:
                documents = documents[:max_files]
            
            perf.add_metric("documents_found", len(documents))
            logger.info(f"Found {len(documents)} documents to analyze")
            
            # If dry run, return empty results with document count
            if dry_run:
                self.filecloud_client.disconnect()
                return [{'dry_run': True, 'document_count': len(documents)}]
            
            # Step 2: Process documents
            self.results = []
            self.errors = []
            
            # Use ThreadPoolExecutor for concurrent processing
            with concurrent.futures.ThreadPoolExecutor(max_workers=max_concurrent) as executor:
                with ProgressLogger(len(documents), "Document Analysis") as progress:
                    
                    # Submit all tasks
                    future_to_doc = {
                        executor.submit(self._analyze_single_document, doc): doc 
                        for doc in documents
                    }
                    
                    # Process completed tasks
                    for future in concurrent.futures.as_completed(future_to_doc):
                        doc = future_to_doc[future]
                        try:
                            result = future.result()
                            if result:
                                if result.get('error'):
                                    self.errors.append(result)
                                    progress.update(error=True)
                                else:
                                    self.results.append(result)
                                    progress.update()
                            else:
                                self.errors.append({
                                    'filename': doc.get('filename', 'unknown'),
                                    'filecloud_path': doc.get('path', ''),
                                    'file_size_bytes': doc.get('size', 0),
                                    'error': 'No result returned'
                                })
                                progress.update(error=True)
                                
                        except Exception as e:
                            logger.error(f"Error processing document {doc.get('filename', 'unknown')}: {e}")
                            self.errors.append({
                                'filename': doc.get('filename', 'unknown'),
                                'filecloud_path': doc.get('path', ''),
                                'file_size_bytes': doc.get('size', 0),
                                'error': str(e)
                            })
                            progress.update(error=True)
            
            # Step 3: Generate reports
            self._save_results()
            
            perf.add_metric("successful_analyses", len(self.results))
            perf.add_metric("failed_analyses", len(self.errors))
            
            # Step 4: Cleanup
            self.filecloud_client.disconnect()
            
            logger.info(f"Analysis complete: {len(self.results)} successful, {len(self.errors)} failed")
            return self.results
    
    def _analyze_single_document(self, doc_info: Dict[str, Any]) -> Optional[Dict[str, Any]]:
        """
        Analyze a single document.
        
        Args:
            doc_info: Document information from FileCloud
            
        Returns:
            Analysis result dictionary
        """
        filename = doc_info.get('filename', 'unknown')
        file_path = doc_info.get('full_path', '')
        
        try:
            logger.debug(f"Analyzing document: {filename}")
            
            # Step 1: Download document
            temp_file_path = self.filecloud_client.download_to_temp_file(file_path)
            if not temp_file_path:
                return {
                    'filename': filename,
                    'filecloud_path': file_path,
                    'filecloud_url': self.generate_filecloud_url(file_path),
                    'file_size_bytes': doc_info.get('size', 0),
                    'error': 'Failed to download document from FileCloud'
                }
            
            try:
                # Step 2: Process document and extract text
                processing_result = self.document_processor.process_document(
                    temp_file_path, filename
                )
                
                if not processing_result.get('success'):
                    return {
                        'filename': filename,
                        'filecloud_path': file_path,
                        'filecloud_url': self.generate_filecloud_url(file_path),
                        'file_size_bytes': doc_info.get('size', 0),
                        'error': processing_result.get('error', 'Document processing failed')
                    }

                document_text = processing_result.get('text', '')
                if not document_text:
                    return {
                        'filename': filename,
                        'filecloud_path': file_path,
                        'filecloud_url': self.generate_filecloud_url(file_path),
                        'file_size_bytes': doc_info.get('size', 0),
                        'error': 'No text extracted from document'
                    }

                # Step 3: Analyze with LLM (first attempt)
                analysis_result = self.llm_client.analyze_contract(document_text, filename)
                
                # Step 3.5: Validate and fix consistency
                analysis_result = self._validate_and_fix_consistency(analysis_result)
                
                # Step 4: Check if end date was found, if not retry with OCR
                end_date = analysis_result.get('end_date')
                if not end_date or end_date in ['null', None, '']:
                    logger.info(f"No end date found for {filename}, attempting OCR retry...")
                    
                    # Try OCR extraction
                    ocr_result = self.document_processor.process_document_with_ocr(
                        temp_file_path, filename
                    )
                    
                    if ocr_result.get('success') and ocr_result.get('text'):
                        ocr_text = ocr_result.get('text', '')
                        if ocr_text and len(ocr_text) > len(document_text) * 0.5:  # OCR found substantial additional text
                            logger.info(f"OCR extracted {len(ocr_text)} chars vs original {len(document_text)} chars for {filename}")
                            
                            # Re-analyze with OCR text
                            ocr_analysis_result = self.llm_client.analyze_contract(ocr_text, filename)
                            ocr_end_date = ocr_analysis_result.get('end_date')
                            
                            if ocr_end_date and ocr_end_date not in ['null', None, '']:
                                logger.info(f"OCR retry successful! Found end date: {ocr_end_date} for {filename}")
                                analysis_result = ocr_analysis_result
                                analysis_result['_ocr_retry'] = True
                                analysis_result['_original_text_length'] = len(document_text)
                                analysis_result['_ocr_text_length'] = len(ocr_text)
                            else:
                                logger.info(f"OCR retry did not find end date for {filename}")
                                analysis_result['_ocr_retry_attempted'] = True
                        else:
                            logger.info(f"OCR did not provide substantial additional text for {filename}")
                            analysis_result['_ocr_retry_attempted'] = True
                    else:
                        logger.warning(f"OCR processing failed for {filename}")
                        analysis_result['_ocr_retry_failed'] = True

                # Step 5: Add metadata
                analysis_result.update({
                    'filecloud_path': file_path,
                    'filecloud_url': self.generate_filecloud_url(file_path),
                    'file_size_bytes': doc_info.get('size', 0),
                    'analysis_timestamp': datetime.now().isoformat(),
                    'document_metadata': processing_result.get('metadata', {})
                })

                # Step 6: Validate and fix consistency
                analysis_result = self._validate_and_fix_consistency(analysis_result)
                
                return analysis_result
                
            finally:
                # Clean up temporary file
                self.document_processor.cleanup_temp_file(temp_file_path)
                
        except Exception as e:
            logger.error(f"Error analyzing document {filename}: {e}")
            return {
                'filename': filename,
                'filecloud_path': file_path,
                'filecloud_url': self.generate_filecloud_url(file_path),
                'file_size_bytes': doc_info.get('size', 0),
                'error': str(e)
            }
    
    def _save_results(self):
        """Save analysis results to CSV and JSON files."""
        try:
            timestamp = datetime.now().strftime(self.output_config.get('timestamp_format', '%Y%m%d_%H%M%S'))
            
            # Create output directory if it doesn't exist
            output_dir = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'output')
            os.makedirs(output_dir, exist_ok=True)
            
            # Save Excel report
            excel_filename = self.output_config.get('excel_filename', 'contract_validity_analysis.xlsx')
            excel_path = os.path.join(output_dir, f"{timestamp}_{excel_filename}")
            
            self._save_excel_report(excel_path)
            
            # Save detailed JSON report
            json_path = os.path.join(output_dir, f"{timestamp}_detailed_analysis.json")
            self._save_json_report(json_path)
            
            # Save error report if there are errors
            if self.errors:
                error_path = os.path.join(output_dir, f"{timestamp}_analysis_errors.json")
                with open(error_path, 'w') as f:
                    json.dump(self.errors, f, indent=2)
                logger.info(f"Error report saved to: {error_path}")
                
                # Save failed documents Excel report
                failed_docs_path = os.path.join(output_dir, f"{timestamp}_failed_documents.xlsx")
                self._save_failed_documents_report(failed_docs_path)
            
            logger.info(f"Results saved to: {excel_path}")
            logger.info(f"Detailed report saved to: {json_path}")
            
        except Exception as e:
            logger.error(f"Error saving results: {e}")
    
    def _save_excel_report(self, excel_path: str):
        """Save results to Excel file following reg_extractor pattern."""
        
        # Prepare data for DataFrame
        data_rows = []
        
        for result in self.results:
            # Format the row for Excel
            row = {
                'filename': result.get('filename', ''),
                'contract_type': result.get('contract_type', 'Unknown'),
                'third_parties': '; '.join(result.get('third_parties', [])),
                'third_party_emails': '; '.join(result.get('third_party_emails', [])),
                'third_party_tax_ids': '; '.join(result.get('third_party_tax_ids', [])),
                'start_date': result.get('start_date', ''),
                'end_date': result.get('end_date', ''),
                'is_in_effect': result.get('is_in_effect', False),
                'confidence': result.get('confidence', 0.0),
                'analysis_notes': result.get('analysis_notes', ''),
                'filecloud_path': result.get('filecloud_path', ''),
                'filecloud_url': result.get('filecloud_url', ''),
                'file_size_mb': round(result.get('file_size_bytes', 0) / (1024 * 1024), 2),
                'analysis_timestamp': result.get('analysis_timestamp', '')
            }
            data_rows.append(row)
        
        # Create DataFrame
        df = pd.DataFrame(data_rows)
        
        # Write to Excel
        try:
            df.to_excel(excel_path, index=False)
            logger.info(f"Successfully saved {len(data_rows)} contract analyses to Excel")
        except Exception as e:
            logger.error(f"Error writing to Excel file: {str(e)}")
            # Fallback to CSV if Excel fails
            csv_path = excel_path.replace('.xlsx', '.csv')
            df.to_csv(csv_path, index=False)
            logger.info(f"Saved to CSV as fallback: {csv_path}")
            raise
    
    def _save_json_report(self, json_path: str):
        """Save detailed results to JSON file."""
        report = {
            'analysis_summary': {
                'total_documents': len(self.results) + len(self.errors),
                'successful_analyses': len(self.results),
                'failed_analyses': len(self.errors),
                'documents_in_effect': len([r for r in self.results if r.get('is_in_effect', False)]),
                'documents_expired': len([r for r in self.results if not r.get('is_in_effect', False)]),
                'generation_timestamp': datetime.now().isoformat()
            },
            'llm_usage_stats': self.llm_client.get_usage_stats(),
            'analysis_results': self.results,
            'processing_errors': self.errors
        }
        
        with open(json_path, 'w', encoding='utf-8') as f:
            json.dump(report, f, indent=2, ensure_ascii=False)
    
    def get_summary_stats(self) -> Dict[str, Any]:
        """Get summary statistics of the analysis."""
        if not self.results:
            return {}
        
        in_effect_count = len([r for r in self.results if r.get('is_in_effect', False)])
        expired_count = len(self.results) - in_effect_count
        
        avg_confidence = sum(r.get('confidence', 0.0) for r in self.results) / len(self.results)
        
        return {
            'total_analyzed': len(self.results),
            'in_effect': in_effect_count,
            'expired': expired_count,
            'failed_analyses': len(self.errors),
            'average_confidence': round(avg_confidence, 3),
            'in_effect_percentage': round((in_effect_count / len(self.results)) * 100, 1)
        }
    
    def generate_filecloud_url(self, filepath: str) -> str:
        """
        Generate a FileCloud URL for the given file path using the same logic 
        from reg_extractor.
        
        Args:
            filepath: Path to the file
            
        Returns:
            FileCloud URL for the file
        """
        # Create file basename for display
        filename = os.path.basename(filepath)
        
        # Escape spaces in filename with + for the first part
        encoded_filename = filename.replace(' ', '+')
        
        # Extract directory path without filename
        directory_path = os.path.dirname(filepath)
        # Ensure path ends with '/'
        if directory_path and not directory_path.endswith('/'):
            directory_path += '/'
        
        # Encode path for the second part (after #expl-tabl.)
        encoded_path = directory_path
        encoded_path = encoded_path.replace(' ', '%20')
        
        # Construct the full URL
        file_url = f"https://filecloud.vicebio.com/ui/core/index.html?filter={encoded_filename}#expl-tabl.{encoded_path}"
        
        return file_url

    def _save_failed_documents_report(self, failed_docs_path: str):
        """Save failed documents to Excel file with filename and FileCloud URL."""
        
        # Prepare data for failed documents
        failed_data = []
        
        for error in self.errors:
            # Get filename from error data
            filename = error.get('filename', 'Unknown')
            filecloud_path = error.get('filecloud_path', '')
            
            # Generate FileCloud URL
            filecloud_url = ''
            if filecloud_path:
                filecloud_url = self.generate_filecloud_url(filecloud_path)
            
            # Get error reason
            error_reason = error.get('error', error.get('analysis_notes', 'Unknown error'))
            
            failed_data.append({
                'filename': filename,
                'filecloud_path': filecloud_path,
                'filecloud_url': filecloud_url,
                'error_reason': error_reason,
                'file_size_bytes': error.get('file_size_bytes', 0),
                'file_size_mb': round(error.get('file_size_bytes', 0) / (1024 * 1024), 2),
                'analysis_timestamp': error.get('analysis_timestamp', '')
            })
        
        # Create DataFrame
        df = pd.DataFrame(failed_data)
        
        # Write to Excel
        try:
            df.to_excel(failed_docs_path, index=False)
            logger.info(f"Successfully saved {len(failed_data)} failed documents to Excel: {failed_docs_path}")
        except Exception as e:
            logger.error(f"Error writing failed documents to Excel file: {str(e)}")
            # Fallback to CSV if Excel fails
            csv_path = failed_docs_path.replace('.xlsx', '.csv')
            df.to_csv(csv_path, index=False)
            logger.info(f"Saved failed documents to CSV as fallback: {csv_path}")
    
    def _validate_and_fix_consistency(self, analysis_result: Dict[str, Any]) -> Dict[str, Any]:
        """
        Validate and fix inconsistencies between is_in_effect flag and analysis dates/notes.
        
        Args:
            analysis_result: The analysis result from LLM
            
        Returns:
            Corrected analysis result
        """
        try:
            # Extract key fields
            start_date = analysis_result.get('start_date')
            end_date = analysis_result.get('end_date')
            is_in_effect = analysis_result.get('is_in_effect', False)
            analysis_notes = analysis_result.get('analysis_notes', '')
            filename = analysis_result.get('filename', 'unknown')
            
            # Define today's date consistently
            from datetime import datetime, date
            today = date(2025, 7, 2)  # July 2, 2025
            
            # Parse dates if they exist
            start_parsed = None
            end_parsed = None
            
            if start_date and start_date not in ['null', None, '']:
                try:
                    start_parsed = datetime.strptime(str(start_date), '%Y-%m-%d').date()
                except:
                    logger.warning(f"Could not parse start_date '{start_date}' for {filename}")
            
            if end_date and end_date not in ['null', None, '']:
                try:
                    end_parsed = datetime.strptime(str(end_date), '%Y-%m-%d').date()
                except:
                    logger.warning(f"Could not parse end_date '{end_date}' for {filename}")
            
            # Determine what is_in_effect should be based on dates
            calculated_in_effect = None
            
            if start_parsed and end_parsed:
                # Both dates available - check if today is between them
                calculated_in_effect = start_parsed <= today <= end_parsed
                reason = f"start_date ({start_date}) <= today (2025-07-02) <= end_date ({end_date})"
            elif start_parsed and not end_parsed:
                # Only start date - check if started and analyze notes for indefinite/perpetual
                if start_parsed <= today:
                    if any(word in analysis_notes.lower() for word in ['indefinite', 'perpetual', 'ongoing', 'until terminated']):
                        calculated_in_effect = True
                        reason = f"started ({start_date}) and appears indefinite/perpetual"
                    else:
                        calculated_in_effect = None  # Unclear without end date
                        reason = f"started ({start_date}) but no clear end date"
                else:
                    calculated_in_effect = False
                    reason = f"start_date ({start_date}) is in the future"
            elif not start_parsed and end_parsed:
                # Only end date - assume started unless otherwise indicated
                calculated_in_effect = today <= end_parsed
                reason = f"assuming started, end_date ({end_date}) {'has not' if calculated_in_effect else 'has'} passed"
            else:
                # No clear dates - analyze notes
                if any(word in analysis_notes.lower() for word in ['in effect', 'currently valid', 'active', 'valid until']):
                    calculated_in_effect = True
                    reason = "analysis notes indicate contract is active"
                elif any(word in analysis_notes.lower() for word in ['expired', 'not in effect', 'terminated']):
                    calculated_in_effect = False
                    reason = "analysis notes indicate contract is inactive"
                else:
                    calculated_in_effect = None
                    reason = "unclear from available information"
            
            # Check for inconsistency and fix if needed
            if calculated_in_effect is not None and calculated_in_effect != is_in_effect:
                logger.warning(f"Inconsistency detected for {filename}:")
                logger.warning(f"  LLM set is_in_effect={is_in_effect}")
                logger.warning(f"  Calculated should be {calculated_in_effect} ({reason})")
                logger.warning(f"  Analysis notes: {analysis_notes[:100]}...")
                
                # Fix the inconsistency
                analysis_result['is_in_effect'] = calculated_in_effect
                analysis_result['_consistency_fix'] = {
                    'original_value': is_in_effect,
                    'corrected_value': calculated_in_effect,
                    'reason': reason
                }
                
                logger.info(f"Corrected is_in_effect from {is_in_effect} to {calculated_in_effect} for {filename}")
            
            # Ensure new fields have default values if missing
            if 'third_party_emails' not in analysis_result:
                analysis_result['third_party_emails'] = []
            if 'third_party_tax_ids' not in analysis_result:
                analysis_result['third_party_tax_ids'] = []
            
            # Ensure these are lists
            if not isinstance(analysis_result.get('third_party_emails'), list):
                analysis_result['third_party_emails'] = []
            if not isinstance(analysis_result.get('third_party_tax_ids'), list):
                analysis_result['third_party_tax_ids'] = []
            
            return analysis_result
            
        except Exception as e:
            logger.error(f"Error in consistency validation: {e}")
            return analysis_result

Parameters

Name Type Default Kind
bases - -

Parameter Details

bases: Parameter of type

Return Value

Returns unspecified type

Class Interface

Methods

__init__(self, config)

Purpose: Initialize the contract validity analyzer. Args: config: Configuration dictionary

Parameters:

  • config: Type: Dict[str, Any]

Returns: None

analyze_contracts(self, max_concurrent, dry_run, max_files) -> List[Dict[str, Any]]

Purpose: Analyze all contracts in the configured FileCloud path. Args: max_concurrent: Maximum number of concurrent document processing threads dry_run: If True, only discover documents without processing them max_files: Maximum number of files to process (for testing) Returns: List of analysis results

Parameters:

  • max_concurrent: Type: int
  • dry_run: Type: bool
  • max_files: Type: int

Returns: Returns List[Dict[str, Any]]

_analyze_single_document(self, doc_info) -> Optional[Dict[str, Any]]

Purpose: Analyze a single document. Args: doc_info: Document information from FileCloud Returns: Analysis result dictionary

Parameters:

  • doc_info: Type: Dict[str, Any]

Returns: Returns Optional[Dict[str, Any]]

_save_results(self)

Purpose: Save analysis results to CSV and JSON files.

Returns: None

_save_excel_report(self, excel_path)

Purpose: Save results to Excel file following reg_extractor pattern.

Parameters:

  • excel_path: Type: str

Returns: None

_save_json_report(self, json_path)

Purpose: Save detailed results to JSON file.

Parameters:

  • json_path: Type: str

Returns: None

get_summary_stats(self) -> Dict[str, Any]

Purpose: Get summary statistics of the analysis.

Returns: Returns Dict[str, Any]

generate_filecloud_url(self, filepath) -> str

Purpose: Generate a FileCloud URL for the given file path using the same logic from reg_extractor. Args: filepath: Path to the file Returns: FileCloud URL for the file

Parameters:

  • filepath: Type: str

Returns: Returns str

_save_failed_documents_report(self, failed_docs_path)

Purpose: Save failed documents to Excel file with filename and FileCloud URL.

Parameters:

  • failed_docs_path: Type: str

Returns: None

_validate_and_fix_consistency(self, analysis_result) -> Dict[str, Any]

Purpose: Validate and fix inconsistencies between is_in_effect flag and analysis dates/notes. Args: analysis_result: The analysis result from LLM Returns: Corrected analysis result

Parameters:

  • analysis_result: Type: Dict[str, Any]

Returns: Returns Dict[str, Any]

Required Imports

import logging
import os
import sys
import csv
import json

Usage Example

# Example usage:
# result = ContractAnalyzer(bases)

Similar Components

AI-powered semantic similarity - components with related functionality:

  • function main_v5 71.5% similar

    Main entry point function for the Contract Validity Analyzer application that orchestrates configuration loading, logging setup, FileCloud connection, and contract analysis execution.

    From: /tf/active/vicechatdev/contract_validity_analyzer/main.py
  • function main_v6 65.1% similar

    Main entry point function that orchestrates the contract validity analysis workflow by loading configuration, setting up logging, initializing the analyzer, running analysis, and reporting results.

    From: /tf/active/vicechatdev/contract_validity_analyzer/core/analyzer.py
  • function main 65.1% similar

    Main entry point function for a Legal Contract Data Extractor application that processes contracts from FileCloud, extracts data, and exports results to multiple formats (CSV, Excel, JSON).

    From: /tf/active/vicechatdev/contract_validity_analyzer/extractor.py
  • function main_v21 62.3% similar

    Orchestrates and executes a comprehensive test suite for a Contract Validity Analyzer system, running tests for configuration, FileCloud connection, document processing, LLM client, and full analyzer functionality.

    From: /tf/active/vicechatdev/contract_validity_analyzer/test_implementation.py
  • function test_full_analyzer 56.8% similar

    Tests the full contract analyzer pipeline by running a dry-run analysis on a limited number of files to verify the system is working correctly.

    From: /tf/active/vicechatdev/contract_validity_analyzer/test_implementation.py
← Back to Browse