šŸ” Code Extractor

class EInkLLMProcessor

Maturity: 26

Main processor class that handles the complete workflow

File:
/tf/active/vicechatdev/e-ink-llm/processor.py
Lines:
61 - 455
Complexity:
moderate

Purpose

Main processor class that handles the complete workflow

Source Code

class EInkLLMProcessor:
    """Main processor class that handles the complete workflow"""
    
    def __init__(self, api_key: Optional[str] = None, watch_folder: Optional[str] = None, 
                 conversation_id: Optional[str] = None, compact_mode: bool = True,
                 auto_detect_session: bool = True, enable_multi_page: bool = True,
                 max_pages: int = 50, enable_editing_workflow: bool = True,
                 enable_hybrid_mode: bool = True):
        # Initialize components
        self.input_processor = InputProcessor(enable_multi_page=enable_multi_page, max_pages=max_pages)
        self.llm_handler = LLMHandler(api_key)
        self.multi_page_handler = MultiPageLLMHandler(api_key) if enable_multi_page else None
        self.pdf_generator = PDFGenerator()
        self.session_manager = SessionManager()
        self.compact_formatter = CompactResponseFormatter()
        self.session_detector = SessionDetector()
        self.conversation_context = ConversationContextManager(self.session_manager)
        self.editing_workflow = EditingWorkflowHandler(self.llm_handler) if enable_editing_workflow else None
        
        # Initialize hybrid response handler if available and enabled
        self.hybrid_handler = None
        self.enable_hybrid_mode = enable_hybrid_mode and HYBRID_AVAILABLE
        if self.enable_hybrid_mode:
            self.hybrid_handler = HybridResponseHandler(api_key)
            print(f"šŸŽØ Hybrid mode enabled (text + graphics)")
        elif enable_hybrid_mode and not HYBRID_AVAILABLE:
            print(f"āš ļø Hybrid mode requested but dependencies not available")
        
        # Configuration
        self.compact_mode = compact_mode
        self.auto_detect_session = auto_detect_session
        self.enable_multi_page = enable_multi_page
        self.max_pages = max_pages
        self.enable_editing_workflow = enable_editing_workflow
        
        # Session management
        if conversation_id:
            self.conversation_id = conversation_id
        else:
            self.conversation_id = self.session_manager.create_conversation()
        
        # Set up watch folder
        self.watch_folder = Path(watch_folder) if watch_folder else Path.cwd() / "watch"
        self.watch_folder.mkdir(exist_ok=True)
        
        # Set up logging
        self.setup_logging()
        
        print(f"šŸŽÆ E-Ink LLM Processor initialized")
        print(f"šŸ†” Conversation ID: {self.conversation_id}")
        print(f"šŸ“ Watch folder: {self.watch_folder.absolute()}")
        print(f"šŸ¤– Models: {self.llm_handler.small_model} (preprocessing), {self.llm_handler.main_model} (main)")
        print(f"šŸ“ Compact mode: {'ON' if self.compact_mode else 'OFF'}")
        print(f"šŸŽØ Hybrid mode: {'ON' if self.enable_hybrid_mode else 'OFF'} (text + graphics)")
        print(f"šŸ” Auto-detect sessions: {'ON' if self.auto_detect_session else 'OFF'}")
        print(f"šŸ“Š Multi-page PDFs: {'ON' if self.enable_multi_page else 'OFF'} (max {self.max_pages} pages)")
        print(f"šŸ’¾ Session tracking: {self.session_manager.db_path}")
    
    def setup_logging(self):
        """Set up logging for the application"""
        log_file = self.watch_folder / "eink_llm.log"
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(levelname)s - %(message)s',
            handlers=[
                logging.FileHandler(log_file),
                logging.StreamHandler()
            ]
        )
        self.logger = logging.getLogger(__name__)
    
    async def process_file(self, file_path: Path) -> Optional[Path]:
        """
        Process a single file and generate response PDF
        
        Args:
            file_path: Path to input file
            
        Returns:
            Path to generated response PDF or None if failed
        """
        start_time = time.time()
        
        try:
            # Auto-detect session information if enabled
            detected_session = None
            if self.auto_detect_session:
                detected_session = detect_session_from_file(str(file_path))
                if detected_session:
                    if detected_session.confidence >= 0.7:  # High confidence threshold
                        print(f"šŸ” Auto-detected session: {detected_session.conversation_id} "
                              f"(exchange #{detected_session.exchange_number}, "
                              f"confidence: {detected_session.confidence:.2f})")
                        
                        # Update conversation ID to detected one
                        self.conversation_id = detected_session.conversation_id
                        print(f"šŸ”„ Switched to detected conversation: {self.conversation_id}")
                    else:
                        print(f"šŸ” Detected session with low confidence ({detected_session.confidence:.2f}), "
                              f"continuing with current conversation")
            
            print(f"\n{'='*60}")
            print(f"šŸš€ PROCESSING: {file_path.name}")
            print(f"šŸ†” Conversation: {self.conversation_id}")
            if detected_session and detected_session.confidence >= 0.7:
                print(f"šŸŽÆ Auto-continuation from exchange #{detected_session.exchange_number}")
            print(f"{'='*60}")
            
            # Step 1: Extract image(s) from input
            print(f"šŸ“ø Step 1: Extracting image from {file_path.suffix} file...")
            extraction_result = self.input_processor.extract_image(str(file_path))
            
            # Handle both single-page and multi-page results
            if isinstance(extraction_result[0], list):
                # Multi-page PDF
                page_images, metadata = extraction_result
                is_multi_page = True
                print(f"āœ… Multi-page extraction successful")
                print(f"   • Total pages: {metadata.get('total_pages', len(page_images))}")
                print(f"   • Processed pages: {len(page_images)}")
                print(f"   • Content pages: {metadata.get('content_pages', 'Unknown')}")
                print(f"   • Total text length: {metadata.get('total_text_length', 0):,} chars")
            else:
                # Single page
                image_b64, metadata = extraction_result
                is_multi_page = False
                print(f"āœ… Image extracted successfully")
                print(f"   • Dimensions: {metadata.get('dimensions', 'Unknown')}")
                print(f"   • Source type: {metadata.get('source_type', 'Unknown')}")
            
            # Step 2: Get conversation context
            conversation_context = self.session_manager.get_conversation_context(self.conversation_id)
            if conversation_context:
                print(f"šŸ’¬ Using conversation context ({len(conversation_context)} chars)")
                metadata['conversation_context'] = conversation_context
                
                # Add detected session context if available
                if detected_session and detected_session.confidence >= 0.7:
                    metadata['continuation_note'] = (
                        f"This appears to be a follow-up to exchange #{detected_session.exchange_number} "
                        f"in conversation {detected_session.conversation_id}. "
                        f"Please provide a response that builds upon the previous conversation."
                    )
            
            # Step 3: Process with AI
            if is_multi_page and self.multi_page_handler:
                print(f"🧠 Step 3: Processing multi-page document with AI...")
                
                # Get pages from multi-page processor
                pages, _ = self.input_processor.multi_page_processor.extract_all_pages(file_path)
                
                # Analyze with multi-page handler
                multi_result = await self.multi_page_handler.analyze_multi_page_document(
                    pages, metadata, conversation_context
                )
                
                llm_response = multi_result.combined_response
                print(f"āœ… Multi-page AI processing complete")
                print(f"   • Pages analyzed: {multi_result.processing_stats['pages_processed']}")
                print(f"   • Analysis method: {', '.join(multi_result.processing_stats['analysis_methods'])}")
                print(f"   • Document type: {multi_result.document_summary.document_type}")
                print(f"   • Response length: {len(llm_response):,} characters")
                
                # Use first page image for PDF generation
                image_b64 = page_images[0] if page_images else ""
                
            else:
                print(f"🧠 Step 3: Processing with AI...")
                
                # Get enhanced context using conversation context manager
                enhanced_prompt = self.conversation_context.enhance_prompt_with_context(
                    base_prompt="",  # Will be handled by LLM handler
                    conversation_id=self.conversation_id,
                    session_manager=self.session_manager
                )
                
                # Add enhanced context to metadata
                if enhanced_prompt:
                    metadata['enhanced_context'] = enhanced_prompt
                    print(f"   • Enhanced with conversation context")
                
                llm_response = await self.llm_handler.analyze_and_respond(image_b64, metadata)
                print(f"āœ… AI processing complete ({len(llm_response):,} characters)")
            
            # Step 3.5: Process editing workflow if enabled
            editing_workflow_result = None
            if self.editing_workflow:
                print(f"āœļø  Step 3.5: Processing editing workflow...")
                editing_workflow_result = await self.editing_workflow.process_document_for_editing(
                    file_path, image_b64, llm_response
                )
                if editing_workflow_result:
                    print(f"   • Detected {editing_workflow_result.annotations_detected} annotations")
                    print(f"   • Confidence: {editing_workflow_result.confidence_score:.2f}")
                    if editing_workflow_result.rewritten_content:
                        print(f"   • Generated rewritten content ({len(editing_workflow_result.rewritten_content):,} chars)")
                        # Add editing workflow results to metadata for PDF generation
                        metadata['editing_workflow'] = {
                            'annotations_detected': editing_workflow_result.annotations_detected,
                            'confidence_score': editing_workflow_result.confidence_score,
                            'recommendations': editing_workflow_result.recommendations,
                            'rewritten_content': editing_workflow_result.rewritten_content
                        }
                else:
                    print(f"   • No annotations detected or workflow failed")
            
            # Step 4: Apply compact formatting if enabled
            final_response = llm_response
            if self.compact_mode:
                print(f"šŸŽÆ Step 4: Applying compact formatting...")
                compact_response = self.compact_formatter.parse_llm_response_to_compact(llm_response)
                if compact_response:
                    final_response = compact_response
                    print(f"   • Compressed: {len(llm_response)} → {len(compact_response)} chars ({len(compact_response)/len(llm_response)*100:.0f}%)")
                else:
                    print(f"   • Compact formatting failed, using original")
            
            # Step 5: Generate output PDF with session-aware filename
            print(f"šŸ“„ Step 5: Generating response PDF...")
            
            # Get current exchange number
            conversation = self.session_manager.get_conversation(self.conversation_id)
            next_exchange_num = (conversation.total_exchanges + 1) if conversation else 1
            
            output_filename = self.session_manager.generate_session_filename(
                self.conversation_id, next_exchange_num, file_path.name
            )
            output_path = file_path.parent / output_filename
            
            # Enable hybrid mode in metadata for this processing
            if self.enable_hybrid_mode:
                metadata['enable_hybrid_mode'] = True
            
            # Use hybrid response handler if available and response contains graphics
            if (self.enable_hybrid_mode and self.hybrid_handler and 
                '[GRAPHIC:' in final_response):
                
                print(f"   šŸŽØ Using hybrid mode (text + graphics)")
                generated_pdf = await self.hybrid_handler.process_hybrid_response(
                    llm_response=final_response,
                    metadata=metadata,
                    output_path=str(output_path),
                    conversation_id=self.conversation_id,
                    exchange_number=next_exchange_num
                )
            else:
                # Use standard PDF generation
                if self.enable_hybrid_mode and '[GRAPHIC:' in final_response:
                    print(f"   āš ļø Graphics detected but hybrid handler not available, using standard mode")
                
                self.pdf_generator.create_response_pdf(
                    llm_response=final_response,
                    original_image_b64=image_b64,
                    metadata=metadata,
                    output_path=str(output_path),
                    conversation_id=self.conversation_id,
                    exchange_number=next_exchange_num
                )
            
            # Step 6: Record exchange in session
            processing_time = time.time() - start_time
            usage_stats = self.llm_handler.get_usage_summary()
            
            exchange_id = self.session_manager.add_exchange(
                conversation_id=self.conversation_id,
                input_file=str(file_path),
                input_type=file_path.suffix,
                response_text=final_response,
                processing_time=processing_time,
                tokens_used=usage_stats['total_tokens_used'],
                metadata={
                    'dimensions': metadata.get('dimensions'),
                    'source_type': metadata.get('source_type'),
                    'compact_mode': self.compact_mode,
                    'original_response_length': len(llm_response),
                    'final_response_length': len(final_response)
                }
            )
            
            # Log success
            print(f"\nšŸŽ‰ SUCCESS! Processing completed in {processing_time:.1f} seconds")
            print(f"šŸ“„ Response saved: {output_path.name}")
            print(f"šŸ†” Exchange ID: {exchange_id}")
            
            # Log usage statistics
            print(f"šŸ“Š Usage: {usage_stats['total_tokens_used']} tokens, ~${usage_stats['total_cost_estimate']:.3f}")
            
            self.logger.info(f"Successfully processed {file_path.name} -> {output_path.name} "
                           f"({processing_time:.1f}s, {usage_stats['total_tokens_used']} tokens, {exchange_id})")
            
            return output_path
            
        except Exception as e:
            error_msg = f"Error processing {file_path.name}: {str(e)}"
            print(f"\nāŒ ERROR: {error_msg}")
            self.logger.error(error_msg)
            
            # Generate error PDF with session-aware filename
            try:
                conversation = self.session_manager.get_conversation(self.conversation_id)
                next_exchange_num = (conversation.total_exchanges + 1) if conversation else 1
                
                error_filename = self.session_manager.generate_session_filename(
                    self.conversation_id, next_exchange_num, file_path.name, is_error=True
                )
                error_output_path = file_path.parent / error_filename
                
                self.pdf_generator.generate_error_pdf(
                    error_message=str(e),
                    original_file=str(file_path),
                    output_path=str(error_output_path),
                    conversation_id=self.conversation_id,
                    exchange_number=next_exchange_num
                )
                print(f"šŸ“„ Error report saved: {error_output_path.name}")
                
                # Record error exchange
                processing_time = time.time() - start_time
                self.session_manager.add_exchange(
                    conversation_id=self.conversation_id,
                    input_file=str(file_path),
                    input_type=file_path.suffix,
                    response_text=f"ERROR: {str(e)}",
                    processing_time=processing_time,
                    tokens_used=0,
                    metadata={'error': True, 'error_message': str(e)}
                )
                
                return error_output_path
            except Exception as pdf_error:
                print(f"āŒ Failed to generate error PDF: {pdf_error}")
                return None
    
    async def process_existing_files(self):
        """Process any existing files in the watch folder"""
        print(f"šŸ” Checking for existing files in {self.watch_folder}...")
        
        existing_files = [
            f for f in self.watch_folder.iterdir() 
            if f.is_file() and InputProcessor.is_supported_file(f) and not f.name.startswith(('RESPONSE_', 'ERROR_'))
        ]
        
        if existing_files:
            print(f"šŸ“ Found {len(existing_files)} existing file(s) to process")
            for file_path in existing_files:
                await self.process_file(file_path)
        else:
            print(f"šŸ“ No existing files found")
    
    async def start_watching(self, process_existing: bool = True):
        """
        Start watching the folder for new files
        
        Args:
            process_existing: Whether to process existing files on startup
        """
        print(f"\nšŸŽÆ Starting E-Ink LLM File Processor")
        print(f"šŸ“ Watching folder: {self.watch_folder.absolute()}")
        print(f"šŸ“ Supported formats: PDF, JPG, JPEG, PNG, GIF, BMP, TIFF, WEBP")
        print(f"šŸ’” Place files in the watch folder to process them automatically")
        print(f"šŸ“„ Responses will be saved with conversation tracking")
        print(f"\n{'='*60}")
        
        # Process existing files if requested
        if process_existing:
            await self.process_existing_files()
        
        # Set up file system watcher
        event_handler = EInkFileHandler(self)
        observer = Observer()
        observer.schedule(event_handler, str(self.watch_folder), recursive=False)
        
        # Start watching
        observer.start()
        print(f"šŸ‘ļø  File watcher started. Monitoring for new files...")
        print(f"šŸ’¾ Logs are saved to: {self.watch_folder / 'eink_llm.log'}")
        print(f"šŸ›‘ Press Ctrl+C to stop")
        
        try:
            while True:
                await asyncio.sleep(1)
        except KeyboardInterrupt:
            print(f"\nšŸ›‘ Stopping file watcher...")
            observer.stop()
            
            # Print final usage summary
            usage_stats = self.llm_handler.get_usage_summary()
            print(f"\nšŸ“Š FINAL USAGE SUMMARY:")
            print(f"   • Preprocessing calls: {usage_stats['preprocessing_calls']}")
            print(f"   • Main processing calls: {usage_stats['main_processing_calls']}")
            print(f"   • Total tokens used: {usage_stats['total_tokens_used']:,}")
            print(f"   • Estimated cost: ${usage_stats['total_cost_estimate']:.3f}")
            
        observer.join()
        print(f"āœ… File watcher stopped")

Parameters

Name Type Default Kind
bases - -

Parameter Details

bases: Parameter of type

Return Value

Returns unspecified type

Class Interface

Methods

__init__(self, api_key, watch_folder, conversation_id, compact_mode, auto_detect_session, enable_multi_page, max_pages, enable_editing_workflow, enable_hybrid_mode)

Purpose: Internal method: init

Parameters:

  • api_key: Type: Optional[str]
  • watch_folder: Type: Optional[str]
  • conversation_id: Type: Optional[str]
  • compact_mode: Type: bool
  • auto_detect_session: Type: bool
  • enable_multi_page: Type: bool
  • max_pages: Type: int
  • enable_editing_workflow: Type: bool
  • enable_hybrid_mode: Type: bool

Returns: None

setup_logging(self)

Purpose: Set up logging for the application

Returns: None

Required Imports

import os
import asyncio
import time
from pathlib import Path
from watchdog.observers import Observer

Usage Example

# Example usage:
# result = EInkLLMProcessor(bases)

Similar Components

AI-powered semantic similarity - components with related functionality:

  • class InvoiceProcessor 65.9% similar

    Main orchestrator class that coordinates the complete invoice processing pipeline from PDF extraction through validation to Excel generation.

    From: /tf/active/vicechatdev/invoice_extraction/main.py
  • class RemarkableEInkProcessor 63.3% similar

    Enhanced E-Ink LLM Processor that extends EInkLLMProcessor with reMarkable Cloud integration, enabling file processing from both local directories and reMarkable Cloud storage.

    From: /tf/active/vicechatdev/e-ink-llm/remarkable_processor.py
  • class OneDriveProcessor 56.7% similar

    OneDriveProcessor is a class that monitors a OneDrive folder for new files, processes them using an E-Ink LLM Assistant, and uploads the results back to OneDrive.

    From: /tf/active/vicechatdev/e-ink-llm/onedrive_client.py
  • class DataProcessor 56.4% similar

    Handles data loading, validation, and preprocessing

    From: /tf/active/vicechatdev/full_smartstat/data_processor.py
  • class DataProcessor_v1 56.1% similar

    Handles data loading, validation, and preprocessing

    From: /tf/active/vicechatdev/smartstat/data_processor.py
← Back to Browse