class MixedCloudProcessor
A cloud integration processor that monitors both OneDrive and reMarkable Cloud for input PDF files, processes them through an API, and manages file synchronization between cloud services.
/tf/active/vicechatdev/e-ink-llm/mixed_cloud_processor.py
476 - 654
complex
Purpose
MixedCloudProcessor orchestrates file watching and processing across multiple cloud platforms. It monitors OneDrive folders and two reMarkable Cloud folders (a regular input folder and a 'gpt_out' folder) for new PDF files, processes them through an API using the OneDriveProcessor, and handles the complete lifecycle of file ingestion, processing, and output management. This enables seamless integration between e-ink devices (reMarkable) and cloud storage (OneDrive) for document processing workflows.
Source Code
class MixedCloudProcessor:
"""Mixed cloud processor that watches both OneDrive and reMarkable Cloud for input files"""
def __init__(self, onedrive_config: Dict, remarkable_session, api_key: str):
self.onedrive_config = onedrive_config
self.remarkable_session = remarkable_session
self.api_key = api_key
# Setup logging
self.logger = logging.getLogger('MixedCloudProcessor')
self.logger.setLevel(logging.INFO)
if not ONEDRIVE_AVAILABLE:
raise ImportError("OneDrive integration not available. Install with: pip install msal requests")
# Initialize OneDrive processor
self.onedrive_processor = OneDriveProcessor(onedrive_config, api_key)
# Initialize reMarkable watchers - both input folder and gpt_out
self.remarkable_input_watcher = RemarkableCloudWatcher(remarkable_session, self.logger)
self.remarkable_gptout_watcher = RemarkableCloudWatcher(remarkable_session, self.logger)
# Configuration
self.poll_interval = onedrive_config.get('poll_interval', 60)
self.remarkable_poll_interval = onedrive_config.get('remarkable_poll_interval', 60)
async def start_watching(self):
"""Start watching both OneDrive and reMarkable Cloud for input files"""
self.logger.info("🚀 Starting Mixed Cloud Processor")
self.logger.info(" 📁 OneDrive: Input and Output")
self.logger.info(" 🌐 reMarkable Cloud: Input (regular folder + gpt_out folder)")
# Initialize reMarkable watchers
input_init_success = await self._initialize_remarkable_input_watcher()
gptout_init_success = await self._initialize_remarkable_gptout_watcher()
if not input_init_success and not gptout_init_success:
self.logger.warning("⚠️ Both reMarkable watchers failed to initialize, continuing with OneDrive only")
# Start all watchers concurrently
tasks = [
self._onedrive_watcher(),
]
if input_init_success:
tasks.append(self._remarkable_input_watcher())
if gptout_init_success:
tasks.append(self._remarkable_gptout_watcher())
await asyncio.gather(*tasks, return_exceptions=True)
async def _initialize_remarkable_input_watcher(self):
"""Initialize the reMarkable input folder watcher"""
try:
# Set target folder for input watcher (regular input folder)
target_folder = self.onedrive_config.get('remarkable_input_folder', '/E-Ink LLM Input')
self.logger.info(f"🔍 Initializing reMarkable input watcher for folder: {target_folder}")
# Find the target folder
all_nodes = await self.remarkable_input_watcher._discover_all_nodes()
for uuid, node in all_nodes.items():
if (node.get('node_type') == 'folder' and
node.get('name', '').strip() == target_folder.strip('/').split('/')[-1]):
self.remarkable_input_watcher.gpt_out_folder_uuid = uuid
self.logger.info(f"✅ Found reMarkable input folder: {target_folder} ({uuid})")
return True
self.logger.warning(f"⚠️ reMarkable input folder '{target_folder}' not found")
return False
except Exception as e:
self.logger.error(f"❌ Failed to initialize reMarkable input watcher: {e}")
return False
async def _initialize_remarkable_gptout_watcher(self):
"""Initialize the reMarkable gpt_out folder watcher"""
try:
success = await self.remarkable_gptout_watcher.initialize()
if success:
self.logger.info("✅ reMarkable gpt_out watcher initialized")
else:
self.logger.warning("⚠️ reMarkable gpt_out folder not found")
return success
except Exception as e:
self.logger.error(f"❌ Failed to initialize reMarkable gpt_out watcher: {e}")
return False
async def _onedrive_watcher(self):
"""Run OneDrive watcher"""
try:
self.logger.info("📁 Starting OneDrive watcher...")
await self.onedrive_processor.start_watching()
except Exception as e:
self.logger.error(f"❌ OneDrive watcher error: {e}")
async def _remarkable_input_watcher(self):
"""Run reMarkable Cloud input folder watcher"""
try:
self.logger.info("🌐 Starting reMarkable input folder watcher...")
while True:
try:
# Check for new files in reMarkable input folder
new_files = await self.remarkable_input_watcher.check_for_new_files()
# Process each new file through the OneDrive processor
for pdf_file in new_files:
self.logger.info(f"📄 Processing reMarkable input file: {pdf_file.name}")
try:
# Process the file using OneDrive processor's file processing logic
await self._process_remarkable_file(pdf_file, source="input_folder")
except Exception as e:
self.logger.error(f"❌ Error processing {pdf_file.name}: {e}")
except Exception as e:
self.logger.error(f"❌ reMarkable input watcher loop error: {e}")
# Wait before next check
await asyncio.sleep(self.remarkable_poll_interval)
except Exception as e:
self.logger.error(f"❌ reMarkable input watcher error: {e}")
async def _remarkable_gptout_watcher(self):
"""Run reMarkable Cloud gpt_out folder watcher"""
try:
self.logger.info("🌐 Starting reMarkable gpt_out folder watcher...")
while True:
try:
# Check for new files in reMarkable gpt_out folder
new_files = await self.remarkable_gptout_watcher.check_for_new_files()
# Process each new file through the OneDrive processor
for pdf_file in new_files:
self.logger.info(f"📄 Processing reMarkable gpt_out file: {pdf_file.name}")
try:
# Process the file using OneDrive processor's file processing logic
await self._process_remarkable_file(pdf_file, source="gpt_out")
except Exception as e:
self.logger.error(f"❌ Error processing {pdf_file.name}: {e}")
except Exception as e:
self.logger.error(f"❌ reMarkable gpt_out watcher loop error: {e}")
# Wait before next check
await asyncio.sleep(self.remarkable_poll_interval)
except Exception as e:
self.logger.error(f"❌ reMarkable gpt_out watcher error: {e}")
async def _process_remarkable_file(self, pdf_file: Path, source: str = "unknown"):
"""Process a PDF file from reMarkable Cloud using the OneDrive processor"""
try:
# Use OneDrive processor to handle the file processing and upload
# This ensures consistent processing between OneDrive and reMarkable sources
# Create a temporary copy in OneDrive processor's expected location
temp_input_file = pdf_file.parent / f"remarkable_{source}_{pdf_file.name}"
shutil.copy2(pdf_file, temp_input_file)
# Process through OneDrive processor
success = await self.onedrive_processor._process_file_from_path(str(temp_input_file))
if success:
self.logger.info(f"✅ Successfully processed reMarkable file from {source}: {pdf_file.name}")
else:
self.logger.error(f"❌ Failed to process reMarkable file from {source}: {pdf_file.name}")
# Clean up temporary file
if temp_input_file.exists():
temp_input_file.unlink()
except Exception as e:
self.logger.error(f"❌ Error processing reMarkable file {pdf_file.name} from {source}: {e}")
Parameters
| Name | Type | Default | Kind |
|---|---|---|---|
bases |
- | - |
Parameter Details
onedrive_config: Dictionary containing OneDrive configuration settings including authentication details, folder paths, poll intervals, and reMarkable-specific settings. Expected keys: 'poll_interval' (seconds between OneDrive checks), 'remarkable_poll_interval' (seconds between reMarkable checks), 'remarkable_input_folder' (path to reMarkable input folder, defaults to '/E-Ink LLM Input'). Must contain all necessary OneDrive authentication parameters required by OneDriveProcessor.
remarkable_session: An authenticated reMarkable Cloud session object (typically from RemarkableAuth) that provides access to the reMarkable Cloud API. Must be pre-authenticated and ready for API calls.
api_key: API key string used for processing files through the backend service. Passed to OneDriveProcessor for file processing operations.
Return Value
The constructor returns a MixedCloudProcessor instance. The start_watching() method returns None but runs indefinitely as an async coroutine, gathering results from multiple concurrent watcher tasks. Internal processing methods return boolean success indicators or None.
Class Interface
Methods
__init__(self, onedrive_config: Dict, remarkable_session, api_key: str)
Purpose: Initialize the MixedCloudProcessor with configuration for OneDrive and reMarkable Cloud integration
Parameters:
onedrive_config: Dictionary with OneDrive settings and poll intervalsremarkable_session: Authenticated reMarkable Cloud session objectapi_key: API key for file processing service
Returns: None (constructor)
async start_watching(self)
Purpose: Start monitoring all configured cloud sources (OneDrive and reMarkable folders) for new files and process them concurrently
Returns: None - runs indefinitely as an async coroutine, gathering results from multiple watcher tasks
async _initialize_remarkable_input_watcher(self) -> bool
Purpose: Initialize the watcher for the reMarkable input folder by discovering and locating the target folder UUID
Returns: Boolean indicating whether initialization was successful (True if folder found, False otherwise)
async _initialize_remarkable_gptout_watcher(self) -> bool
Purpose: Initialize the watcher for the reMarkable 'gpt_out' folder
Returns: Boolean indicating whether initialization was successful
async _onedrive_watcher(self)
Purpose: Run the OneDrive file watcher loop, delegating to OneDriveProcessor's start_watching method
Returns: None - runs indefinitely until error or cancellation
async _remarkable_input_watcher(self)
Purpose: Run the reMarkable input folder watcher loop, checking for new files at regular intervals and processing them
Returns: None - runs indefinitely in a polling loop with remarkable_poll_interval delays
async _remarkable_gptout_watcher(self)
Purpose: Run the reMarkable 'gpt_out' folder watcher loop, checking for new files at regular intervals and processing them
Returns: None - runs indefinitely in a polling loop with remarkable_poll_interval delays
async _process_remarkable_file(self, pdf_file: Path, source: str = 'unknown')
Purpose: Process a PDF file from reMarkable Cloud by creating a temporary copy and delegating to OneDriveProcessor for consistent handling
Parameters:
pdf_file: Path object pointing to the PDF file to processsource: String identifier for the source folder ('input_folder', 'gpt_out', or 'unknown')
Returns: None - logs success/failure and cleans up temporary files
Attributes
| Name | Type | Description | Scope |
|---|---|---|---|
onedrive_config |
Dict | Configuration dictionary for OneDrive integration including authentication and folder settings | instance |
remarkable_session |
object | Authenticated reMarkable Cloud session for API access | instance |
api_key |
str | API key for file processing service | instance |
logger |
logging.Logger | Logger instance for MixedCloudProcessor with INFO level logging | instance |
onedrive_processor |
OneDriveProcessor | OneDriveProcessor instance for handling OneDrive file operations and processing | instance |
remarkable_input_watcher |
RemarkableCloudWatcher | Watcher instance for monitoring the reMarkable input folder | instance |
remarkable_gptout_watcher |
RemarkableCloudWatcher | Watcher instance for monitoring the reMarkable 'gpt_out' folder | instance |
poll_interval |
int | Polling interval in seconds for OneDrive checks (default: 60) | instance |
remarkable_poll_interval |
int | Polling interval in seconds for reMarkable Cloud checks (default: 60) | instance |
Dependencies
asynciojsonresubprocesstempfileshutilpathlibtypingdatetimeloggingonedrive_clientrequestscloudtest.authPyPDF2PyPDF4msal
Required Imports
import asyncio
import shutil
import logging
from pathlib import Path
from typing import Dict
from onedrive_client import OneDriveProcessor
from cloudtest.auth import RemarkableAuth
Conditional/Optional Imports
These imports are only needed under specific conditions:
from onedrive_client import OneDriveClient, OneDriveProcessor
Condition: OneDrive integration must be available (ONEDRIVE_AVAILABLE flag must be True)
Required (conditional)import msal
Condition: Required for OneDrive authentication, checked via ONEDRIVE_AVAILABLE flag
Required (conditional)import requests
Condition: Required for HTTP requests to cloud services
Required (conditional)Usage Example
import asyncio
from cloudtest.auth import RemarkableAuth
from mixed_cloud_processor import MixedCloudProcessor
# Configure OneDrive settings
onedrive_config = {
'client_id': 'your-client-id',
'client_secret': 'your-secret',
'tenant_id': 'your-tenant',
'poll_interval': 60,
'remarkable_poll_interval': 60,
'remarkable_input_folder': '/E-Ink LLM Input'
}
# Authenticate with reMarkable Cloud
rm_auth = RemarkableAuth()
rm_session = rm_auth.get_session()
# Create processor instance
processor = MixedCloudProcessor(
onedrive_config=onedrive_config,
remarkable_session=rm_session,
api_key='your-api-key'
)
# Start watching (runs indefinitely)
async def main():
await processor.start_watching()
asyncio.run(main())
Best Practices
- Always ensure OneDrive integration is available before instantiation (check ONEDRIVE_AVAILABLE flag)
- Provide a properly authenticated reMarkable session before creating the processor
- The start_watching() method runs indefinitely - use asyncio.run() or await it in an async context
- Configure appropriate poll intervals to balance responsiveness and API rate limits (default 60 seconds)
- Ensure target folders exist in both OneDrive and reMarkable Cloud before starting
- The processor creates temporary files during processing - ensure sufficient disk space
- Handle exceptions at the caller level as start_watching() uses return_exceptions=True
- The processor will continue running even if one watcher fails to initialize
- Logging is configured automatically but can be adjusted via the logger attribute
- Files are processed sequentially within each watcher to avoid race conditions
- Temporary files are automatically cleaned up after processing
- The processor uses the OneDriveProcessor for consistent file handling across sources
Tags
Similar Components
AI-powered semantic similarity - components with related functionality:
-
function create_mixed_processor 76.9% similar
-
class RemarkableEInkProcessor 70.4% similar
-
class OneDriveProcessor 68.9% similar
-
function main_v21 65.8% similar
-
class RemarkableCloudWatcher 65.5% similar