🔍 Code Extractor

class MixedCloudProcessor

Maturity: 55

A cloud integration processor that monitors both OneDrive and reMarkable Cloud for input PDF files, processes them through an API, and manages file synchronization between cloud services.

File:
/tf/active/vicechatdev/e-ink-llm/mixed_cloud_processor.py
Lines:
476 - 654
Complexity:
complex

Purpose

MixedCloudProcessor orchestrates file watching and processing across multiple cloud platforms. It monitors OneDrive folders and two reMarkable Cloud folders (a regular input folder and a 'gpt_out' folder) for new PDF files, processes them through an API using the OneDriveProcessor, and handles the complete lifecycle of file ingestion, processing, and output management. This enables seamless integration between e-ink devices (reMarkable) and cloud storage (OneDrive) for document processing workflows.

Source Code

class MixedCloudProcessor:
    """Mixed cloud processor that watches both OneDrive and reMarkable Cloud for input files"""
    
    def __init__(self, onedrive_config: Dict, remarkable_session, api_key: str):
        self.onedrive_config = onedrive_config
        self.remarkable_session = remarkable_session
        self.api_key = api_key
        
        # Setup logging
        self.logger = logging.getLogger('MixedCloudProcessor')
        self.logger.setLevel(logging.INFO)
        
        if not ONEDRIVE_AVAILABLE:
            raise ImportError("OneDrive integration not available. Install with: pip install msal requests")
        
        # Initialize OneDrive processor
        self.onedrive_processor = OneDriveProcessor(onedrive_config, api_key)
        
        # Initialize reMarkable watchers - both input folder and gpt_out
        self.remarkable_input_watcher = RemarkableCloudWatcher(remarkable_session, self.logger)
        self.remarkable_gptout_watcher = RemarkableCloudWatcher(remarkable_session, self.logger)
        
        # Configuration
        self.poll_interval = onedrive_config.get('poll_interval', 60)
        self.remarkable_poll_interval = onedrive_config.get('remarkable_poll_interval', 60)
    
    async def start_watching(self):
        """Start watching both OneDrive and reMarkable Cloud for input files"""
        self.logger.info("🚀 Starting Mixed Cloud Processor")
        self.logger.info("   📁 OneDrive: Input and Output")
        self.logger.info("   🌐 reMarkable Cloud: Input (regular folder + gpt_out folder)")
        
        # Initialize reMarkable watchers
        input_init_success = await self._initialize_remarkable_input_watcher()
        gptout_init_success = await self._initialize_remarkable_gptout_watcher()
        
        if not input_init_success and not gptout_init_success:
            self.logger.warning("⚠️ Both reMarkable watchers failed to initialize, continuing with OneDrive only")
        
        # Start all watchers concurrently
        tasks = [
            self._onedrive_watcher(),
        ]
        
        if input_init_success:
            tasks.append(self._remarkable_input_watcher())
        
        if gptout_init_success:
            tasks.append(self._remarkable_gptout_watcher())
        
        await asyncio.gather(*tasks, return_exceptions=True)
    
    async def _initialize_remarkable_input_watcher(self):
        """Initialize the reMarkable input folder watcher"""
        try:
            # Set target folder for input watcher (regular input folder)
            target_folder = self.onedrive_config.get('remarkable_input_folder', '/E-Ink LLM Input')
            self.logger.info(f"🔍 Initializing reMarkable input watcher for folder: {target_folder}")
            
            # Find the target folder
            all_nodes = await self.remarkable_input_watcher._discover_all_nodes()
            
            for uuid, node in all_nodes.items():
                if (node.get('node_type') == 'folder' and 
                    node.get('name', '').strip() == target_folder.strip('/').split('/')[-1]):
                    self.remarkable_input_watcher.gpt_out_folder_uuid = uuid
                    self.logger.info(f"✅ Found reMarkable input folder: {target_folder} ({uuid})")
                    return True
            
            self.logger.warning(f"⚠️ reMarkable input folder '{target_folder}' not found")
            return False
            
        except Exception as e:
            self.logger.error(f"❌ Failed to initialize reMarkable input watcher: {e}")
            return False
    
    async def _initialize_remarkable_gptout_watcher(self):
        """Initialize the reMarkable gpt_out folder watcher"""
        try:
            success = await self.remarkable_gptout_watcher.initialize()
            if success:
                self.logger.info("✅ reMarkable gpt_out watcher initialized")
            else:
                self.logger.warning("⚠️ reMarkable gpt_out folder not found")
            return success
        except Exception as e:
            self.logger.error(f"❌ Failed to initialize reMarkable gpt_out watcher: {e}")
            return False
    
    async def _onedrive_watcher(self):
        """Run OneDrive watcher"""
        try:
            self.logger.info("📁 Starting OneDrive watcher...")
            await self.onedrive_processor.start_watching()
        except Exception as e:
            self.logger.error(f"❌ OneDrive watcher error: {e}")
    
    async def _remarkable_input_watcher(self):
        """Run reMarkable Cloud input folder watcher"""
        try:
            self.logger.info("🌐 Starting reMarkable input folder watcher...")
            
            while True:
                try:
                    # Check for new files in reMarkable input folder
                    new_files = await self.remarkable_input_watcher.check_for_new_files()
                    
                    # Process each new file through the OneDrive processor
                    for pdf_file in new_files:
                        self.logger.info(f"📄 Processing reMarkable input file: {pdf_file.name}")
                        
                        try:
                            # Process the file using OneDrive processor's file processing logic
                            await self._process_remarkable_file(pdf_file, source="input_folder")
                        except Exception as e:
                            self.logger.error(f"❌ Error processing {pdf_file.name}: {e}")
                
                except Exception as e:
                    self.logger.error(f"❌ reMarkable input watcher loop error: {e}")
                
                # Wait before next check
                await asyncio.sleep(self.remarkable_poll_interval)
                
        except Exception as e:
            self.logger.error(f"❌ reMarkable input watcher error: {e}")
    
    async def _remarkable_gptout_watcher(self):
        """Run reMarkable Cloud gpt_out folder watcher"""
        try:
            self.logger.info("🌐 Starting reMarkable gpt_out folder watcher...")
            
            while True:
                try:
                    # Check for new files in reMarkable gpt_out folder
                    new_files = await self.remarkable_gptout_watcher.check_for_new_files()
                    
                    # Process each new file through the OneDrive processor
                    for pdf_file in new_files:
                        self.logger.info(f"📄 Processing reMarkable gpt_out file: {pdf_file.name}")
                        
                        try:
                            # Process the file using OneDrive processor's file processing logic
                            await self._process_remarkable_file(pdf_file, source="gpt_out")
                        except Exception as e:
                            self.logger.error(f"❌ Error processing {pdf_file.name}: {e}")
                
                except Exception as e:
                    self.logger.error(f"❌ reMarkable gpt_out watcher loop error: {e}")
                
                # Wait before next check
                await asyncio.sleep(self.remarkable_poll_interval)
                
        except Exception as e:
            self.logger.error(f"❌ reMarkable gpt_out watcher error: {e}")
    
    async def _process_remarkable_file(self, pdf_file: Path, source: str = "unknown"):
        """Process a PDF file from reMarkable Cloud using the OneDrive processor"""
        try:
            # Use OneDrive processor to handle the file processing and upload
            # This ensures consistent processing between OneDrive and reMarkable sources
            
            # Create a temporary copy in OneDrive processor's expected location
            temp_input_file = pdf_file.parent / f"remarkable_{source}_{pdf_file.name}"
            shutil.copy2(pdf_file, temp_input_file)
            
            # Process through OneDrive processor
            success = await self.onedrive_processor._process_file_from_path(str(temp_input_file))
            
            if success:
                self.logger.info(f"✅ Successfully processed reMarkable file from {source}: {pdf_file.name}")
            else:
                self.logger.error(f"❌ Failed to process reMarkable file from {source}: {pdf_file.name}")
            
            # Clean up temporary file
            if temp_input_file.exists():
                temp_input_file.unlink()
        
        except Exception as e:
            self.logger.error(f"❌ Error processing reMarkable file {pdf_file.name} from {source}: {e}")

Parameters

Name Type Default Kind
bases - -

Parameter Details

onedrive_config: Dictionary containing OneDrive configuration settings including authentication details, folder paths, poll intervals, and reMarkable-specific settings. Expected keys: 'poll_interval' (seconds between OneDrive checks), 'remarkable_poll_interval' (seconds between reMarkable checks), 'remarkable_input_folder' (path to reMarkable input folder, defaults to '/E-Ink LLM Input'). Must contain all necessary OneDrive authentication parameters required by OneDriveProcessor.

remarkable_session: An authenticated reMarkable Cloud session object (typically from RemarkableAuth) that provides access to the reMarkable Cloud API. Must be pre-authenticated and ready for API calls.

api_key: API key string used for processing files through the backend service. Passed to OneDriveProcessor for file processing operations.

Return Value

The constructor returns a MixedCloudProcessor instance. The start_watching() method returns None but runs indefinitely as an async coroutine, gathering results from multiple concurrent watcher tasks. Internal processing methods return boolean success indicators or None.

Class Interface

Methods

__init__(self, onedrive_config: Dict, remarkable_session, api_key: str)

Purpose: Initialize the MixedCloudProcessor with configuration for OneDrive and reMarkable Cloud integration

Parameters:

  • onedrive_config: Dictionary with OneDrive settings and poll intervals
  • remarkable_session: Authenticated reMarkable Cloud session object
  • api_key: API key for file processing service

Returns: None (constructor)

async start_watching(self)

Purpose: Start monitoring all configured cloud sources (OneDrive and reMarkable folders) for new files and process them concurrently

Returns: None - runs indefinitely as an async coroutine, gathering results from multiple watcher tasks

async _initialize_remarkable_input_watcher(self) -> bool

Purpose: Initialize the watcher for the reMarkable input folder by discovering and locating the target folder UUID

Returns: Boolean indicating whether initialization was successful (True if folder found, False otherwise)

async _initialize_remarkable_gptout_watcher(self) -> bool

Purpose: Initialize the watcher for the reMarkable 'gpt_out' folder

Returns: Boolean indicating whether initialization was successful

async _onedrive_watcher(self)

Purpose: Run the OneDrive file watcher loop, delegating to OneDriveProcessor's start_watching method

Returns: None - runs indefinitely until error or cancellation

async _remarkable_input_watcher(self)

Purpose: Run the reMarkable input folder watcher loop, checking for new files at regular intervals and processing them

Returns: None - runs indefinitely in a polling loop with remarkable_poll_interval delays

async _remarkable_gptout_watcher(self)

Purpose: Run the reMarkable 'gpt_out' folder watcher loop, checking for new files at regular intervals and processing them

Returns: None - runs indefinitely in a polling loop with remarkable_poll_interval delays

async _process_remarkable_file(self, pdf_file: Path, source: str = 'unknown')

Purpose: Process a PDF file from reMarkable Cloud by creating a temporary copy and delegating to OneDriveProcessor for consistent handling

Parameters:

  • pdf_file: Path object pointing to the PDF file to process
  • source: String identifier for the source folder ('input_folder', 'gpt_out', or 'unknown')

Returns: None - logs success/failure and cleans up temporary files

Attributes

Name Type Description Scope
onedrive_config Dict Configuration dictionary for OneDrive integration including authentication and folder settings instance
remarkable_session object Authenticated reMarkable Cloud session for API access instance
api_key str API key for file processing service instance
logger logging.Logger Logger instance for MixedCloudProcessor with INFO level logging instance
onedrive_processor OneDriveProcessor OneDriveProcessor instance for handling OneDrive file operations and processing instance
remarkable_input_watcher RemarkableCloudWatcher Watcher instance for monitoring the reMarkable input folder instance
remarkable_gptout_watcher RemarkableCloudWatcher Watcher instance for monitoring the reMarkable 'gpt_out' folder instance
poll_interval int Polling interval in seconds for OneDrive checks (default: 60) instance
remarkable_poll_interval int Polling interval in seconds for reMarkable Cloud checks (default: 60) instance

Dependencies

  • asyncio
  • json
  • re
  • subprocess
  • tempfile
  • shutil
  • pathlib
  • typing
  • datetime
  • logging
  • onedrive_client
  • requests
  • cloudtest.auth
  • PyPDF2
  • PyPDF4
  • msal

Required Imports

import asyncio
import shutil
import logging
from pathlib import Path
from typing import Dict
from onedrive_client import OneDriveProcessor
from cloudtest.auth import RemarkableAuth

Conditional/Optional Imports

These imports are only needed under specific conditions:

from onedrive_client import OneDriveClient, OneDriveProcessor

Condition: OneDrive integration must be available (ONEDRIVE_AVAILABLE flag must be True)

Required (conditional)
import msal

Condition: Required for OneDrive authentication, checked via ONEDRIVE_AVAILABLE flag

Required (conditional)
import requests

Condition: Required for HTTP requests to cloud services

Required (conditional)

Usage Example

import asyncio
from cloudtest.auth import RemarkableAuth
from mixed_cloud_processor import MixedCloudProcessor

# Configure OneDrive settings
onedrive_config = {
    'client_id': 'your-client-id',
    'client_secret': 'your-secret',
    'tenant_id': 'your-tenant',
    'poll_interval': 60,
    'remarkable_poll_interval': 60,
    'remarkable_input_folder': '/E-Ink LLM Input'
}

# Authenticate with reMarkable Cloud
rm_auth = RemarkableAuth()
rm_session = rm_auth.get_session()

# Create processor instance
processor = MixedCloudProcessor(
    onedrive_config=onedrive_config,
    remarkable_session=rm_session,
    api_key='your-api-key'
)

# Start watching (runs indefinitely)
async def main():
    await processor.start_watching()

asyncio.run(main())

Best Practices

  • Always ensure OneDrive integration is available before instantiation (check ONEDRIVE_AVAILABLE flag)
  • Provide a properly authenticated reMarkable session before creating the processor
  • The start_watching() method runs indefinitely - use asyncio.run() or await it in an async context
  • Configure appropriate poll intervals to balance responsiveness and API rate limits (default 60 seconds)
  • Ensure target folders exist in both OneDrive and reMarkable Cloud before starting
  • The processor creates temporary files during processing - ensure sufficient disk space
  • Handle exceptions at the caller level as start_watching() uses return_exceptions=True
  • The processor will continue running even if one watcher fails to initialize
  • Logging is configured automatically but can be adjusted via the logger attribute
  • Files are processed sequentially within each watcher to avoid race conditions
  • Temporary files are automatically cleaned up after processing
  • The processor uses the OneDriveProcessor for consistent file handling across sources

Similar Components

AI-powered semantic similarity - components with related functionality:

  • function create_mixed_processor 76.9% similar

    Factory function that instantiates and returns a MixedCloudProcessor object configured with OneDrive and reMarkable cloud integration settings.

    From: /tf/active/vicechatdev/e-ink-llm/mixed_cloud_processor.py
  • class RemarkableEInkProcessor 70.4% similar

    Enhanced E-Ink LLM Processor that extends EInkLLMProcessor with reMarkable Cloud integration, enabling file processing from both local directories and reMarkable Cloud storage.

    From: /tf/active/vicechatdev/e-ink-llm/remarkable_processor.py
  • class OneDriveProcessor 68.9% similar

    OneDriveProcessor is a class that monitors a OneDrive folder for new files, processes them using an E-Ink LLM Assistant, and uploads the results back to OneDrive.

    From: /tf/active/vicechatdev/e-ink-llm/onedrive_client.py
  • function main_v21 65.8% similar

    Asynchronous main function that runs a reMarkable tablet file watcher as a separate process, monitoring a specified folder for new documents, processing them, and uploading responses back to the reMarkable Cloud.

    From: /tf/active/vicechatdev/e-ink-llm/run_remarkable_bridge.py
  • class RemarkableCloudWatcher 65.5% similar

    Monitors the reMarkable Cloud 'gpt_out' folder for new documents, automatically downloads them, and converts .rm (reMarkable native) files to PDF format.

    From: /tf/active/vicechatdev/e-ink-llm/mixed_cloud_processor.py
← Back to Browse