class RemarkableEInkProcessor
Enhanced E-Ink LLM Processor that extends EInkLLMProcessor with reMarkable Cloud integration, enabling file processing from both local directories and reMarkable Cloud storage.
/tf/active/vicechatdev/e-ink-llm/remarkable_processor.py
17 - 218
complex
Purpose
This class provides a comprehensive solution for processing files through an LLM with E-Ink display optimization, supporting both local file watching and reMarkable Cloud integration. It can monitor reMarkable Cloud folders for new documents, download them, process them through the LLM pipeline, and upload responses back to the cloud. The class manages authentication, folder setup, file watching, and bidirectional synchronization with reMarkable Cloud while maintaining backward compatibility with local file processing.
Source Code
class RemarkableEInkProcessor(EInkLLMProcessor):
"""Enhanced E-Ink LLM Processor with reMarkable Cloud integration"""
def __init__(self, api_key: Optional[str] = None, watch_folder: Optional[str] = None,
remarkable_config: Optional[Dict[str, Any]] = None):
"""
Initialize the enhanced processor
Args:
api_key: OpenAI API key
watch_folder: Local folder to watch (for local mode)
remarkable_config: Configuration for reMarkable Cloud integration
{
'enabled': bool,
'watch_folder_path': str, # Path in reMarkable Cloud to watch
'output_folder_path': str, # Path in reMarkable Cloud to upload responses
'poll_interval': int, # Seconds between checks (default: 60)
'one_time_code': str, # For initial authentication (optional)
}
"""
# Initialize base processor
super().__init__(api_key, watch_folder)
self.remarkable_config = remarkable_config or {}
self.remarkable_enabled = self.remarkable_config.get('enabled', False)
# Initialize reMarkable Cloud components if enabled
self.cloud_manager = None
self.file_watcher = None
if self.remarkable_enabled:
self.cloud_manager = RemarkableCloudManager()
# Extract configuration
self.remarkable_watch_path = self.remarkable_config.get('watch_folder_path', '/E-Ink LLM Input')
self.remarkable_output_path = self.remarkable_config.get('output_folder_path', '/E-Ink LLM Output')
self.poll_interval = self.remarkable_config.get('poll_interval', 60)
print(f"š reMarkable Cloud integration enabled")
print(f"š Input folder: {self.remarkable_watch_path}")
print(f"š¤ Output folder: {self.remarkable_output_path}")
else:
print(f"š Local file watching mode")
async def authenticate_remarkable(self) -> bool:
"""Authenticate with reMarkable Cloud"""
if not self.remarkable_enabled or not self.cloud_manager:
return False
one_time_code = self.remarkable_config.get('one_time_code')
return await self.cloud_manager.authenticate(one_time_code)
async def setup_remarkable_folders(self) -> bool:
"""Ensure required folders exist in reMarkable Cloud"""
if not self.cloud_manager:
return False
try:
# Create input folder if it doesn't exist
await self.cloud_manager.create_folder(self.remarkable_watch_path)
# Create output folder if it doesn't exist
await self.cloud_manager.create_folder(self.remarkable_output_path)
return True
except Exception as e:
self.logger.error(f"Error setting up reMarkable folders: {e}")
return False
async def process_remarkable_file(self, document, local_file_path: Path) -> None:
"""
Process a file downloaded from reMarkable Cloud
Args:
document: reMarkable Document object
local_file_path: Path to locally downloaded file
"""
try:
print(f"\n{'='*60}")
print(f"š PROCESSING REMARKABLE FILE: {document.name}")
print(f"{'='*60}")
# Process the file using the standard processor
result_path = await self.process_file(local_file_path)
if result_path and result_path.exists():
# Upload the response back to reMarkable Cloud
response_name = f"RESPONSE_{document.name}"
print(f"š¤ Uploading response to reMarkable Cloud...")
success = await self.cloud_manager.upload_document(
result_path,
self.remarkable_output_path,
response_name
)
if success:
print(f"ā
Response uploaded successfully: {response_name}")
else:
print(f"ā Failed to upload response to reMarkable Cloud")
else:
print(f"ā No response file to upload")
except Exception as e:
self.logger.error(f"Error processing reMarkable file {document.name}: {e}")
print(f"ā Error processing {document.name}: {e}")
async def start_remarkable_watching(self, process_existing: bool = False) -> None:
"""Start watching reMarkable Cloud folder for new files"""
if not self.remarkable_enabled:
raise ValueError("reMarkable Cloud integration is not enabled")
# Authenticate
print(f"š Authenticating with reMarkable Cloud...")
auth_success = await self.authenticate_remarkable()
if not auth_success:
raise Exception("Failed to authenticate with reMarkable Cloud")
# Setup folders
print(f"š Setting up reMarkable folders...")
folder_success = await self.setup_remarkable_folders()
if not folder_success:
raise Exception("Failed to setup reMarkable folders")
# Process existing files if requested
if process_existing:
print(f"š Processing existing files in {self.remarkable_watch_path}...")
existing_files = await self.cloud_manager.list_files_in_folder(
self.remarkable_watch_path, include_subfolders=True
)
if existing_files:
print(f"š Found {len(existing_files)} existing file(s) to process")
with tempfile.TemporaryDirectory() as temp_dir:
temp_path = Path(temp_dir)
for doc in existing_files:
local_file = await self.cloud_manager.download_document(doc, temp_path)
if local_file:
await self.process_remarkable_file(doc, local_file)
else:
print(f"š No existing files found")
# Start file watcher
self.file_watcher = RemarkableFileWatcher(
self.cloud_manager,
self.remarkable_watch_path,
self.poll_interval
)
await self.file_watcher.start_watching(self.process_remarkable_file)
async def start_watching(self, process_existing: bool = True, mode: str = "local") -> None:
"""
Start watching for files
Args:
process_existing: Whether to process existing files on startup
mode: 'local' for local file watching, 'remarkable' for cloud watching, 'both' for both
"""
print(f"\nšÆ Starting Enhanced E-Ink LLM File Processor")
print(f"š§ Mode: {mode}")
if mode == "remarkable":
if not self.remarkable_enabled:
raise ValueError("reMarkable mode requested but integration not enabled")
await self.start_remarkable_watching(process_existing)
elif mode == "local":
# Use the original local file watching
await super().start_watching(process_existing)
elif mode == "both":
if not self.remarkable_enabled:
print("ā ļø reMarkable integration not enabled, falling back to local mode")
await super().start_watching(process_existing)
return
# Start both watchers concurrently
print(f"š Starting both local and reMarkable file watchers...")
# Create tasks for both watchers
local_task = asyncio.create_task(super().start_watching(process_existing))
remarkable_task = asyncio.create_task(self.start_remarkable_watching(process_existing))
try:
# Wait for either task to complete (which shouldn't happen unless there's an error)
await asyncio.gather(local_task, remarkable_task)
except KeyboardInterrupt:
print(f"\nš Stopping all file watchers...")
local_task.cancel()
remarkable_task.cancel()
# Wait a bit for clean shutdown
try:
await asyncio.wait_for(asyncio.gather(local_task, remarkable_task, return_exceptions=True), timeout=5.0)
except asyncio.TimeoutError:
pass
else:
raise ValueError(f"Invalid mode: {mode}. Must be 'local', 'remarkable', or 'both'")
Parameters
| Name | Type | Default | Kind |
|---|---|---|---|
bases |
EInkLLMProcessor | - |
Parameter Details
api_key: Optional OpenAI API key for LLM processing. If not provided, will attempt to use environment variable or configuration from parent class.
watch_folder: Optional local folder path to watch for new files in local mode. Used when operating in local or both modes.
remarkable_config: Optional dictionary containing reMarkable Cloud integration settings. Keys include: 'enabled' (bool, whether to enable cloud integration), 'watch_folder_path' (str, cloud folder to monitor, default '/E-Ink LLM Input'), 'output_folder_path' (str, cloud folder for responses, default '/E-Ink LLM Output'), 'poll_interval' (int, seconds between cloud checks, default 60), 'one_time_code' (str, optional authentication code for initial setup).
Return Value
Instantiation returns a RemarkableEInkProcessor object configured for file processing. Methods return various types: authenticate_remarkable() returns bool indicating authentication success, setup_remarkable_folders() returns bool for folder setup success, process_remarkable_file() returns None (side effect: uploads processed file), start_remarkable_watching() returns None (runs indefinitely), start_watching() returns None (runs indefinitely based on mode).
Class Interface
Methods
__init__(self, api_key: Optional[str] = None, watch_folder: Optional[str] = None, remarkable_config: Optional[Dict[str, Any]] = None)
Purpose: Initialize the enhanced processor with optional local and cloud configurations
Parameters:
api_key: OpenAI API key for LLM processingwatch_folder: Local folder path to watch for filesremarkable_config: Dictionary with reMarkable Cloud settings (enabled, paths, poll_interval, one_time_code)
Returns: None (constructor)
async authenticate_remarkable(self) -> bool
Purpose: Authenticate with reMarkable Cloud service using configured credentials
Returns: Boolean indicating whether authentication was successful
async setup_remarkable_folders(self) -> bool
Purpose: Create required input and output folders in reMarkable Cloud if they don't exist
Returns: Boolean indicating whether folder setup was successful
async process_remarkable_file(self, document, local_file_path: Path) -> None
Purpose: Process a file downloaded from reMarkable Cloud through the LLM pipeline and upload the response back
Parameters:
document: reMarkable Document object containing metadatalocal_file_path: Path to the locally downloaded file to process
Returns: None (side effect: uploads processed response to reMarkable Cloud)
async start_remarkable_watching(self, process_existing: bool = False) -> None
Purpose: Start monitoring reMarkable Cloud folder for new files, authenticate, setup folders, and begin processing
Parameters:
process_existing: Whether to process files already present in the cloud folder before starting to watch
Returns: None (runs indefinitely until interrupted)
async start_watching(self, process_existing: bool = True, mode: str = 'local') -> None
Purpose: Start watching for files in specified mode (local, remarkable, or both)
Parameters:
process_existing: Whether to process existing files on startupmode: Operating mode: 'local' for local file watching, 'remarkable' for cloud watching, 'both' for concurrent operation
Returns: None (runs indefinitely until interrupted)
Attributes
| Name | Type | Description | Scope |
|---|---|---|---|
remarkable_config |
Dict[str, Any] | Configuration dictionary for reMarkable Cloud integration settings | instance |
remarkable_enabled |
bool | Flag indicating whether reMarkable Cloud integration is enabled | instance |
cloud_manager |
Optional[RemarkableCloudManager] | Manager instance for reMarkable Cloud operations (authentication, upload, download) | instance |
file_watcher |
Optional[RemarkableFileWatcher] | Watcher instance that monitors reMarkable Cloud folder for new files | instance |
remarkable_watch_path |
str | Path in reMarkable Cloud to monitor for input files (default: '/E-Ink LLM Input') | instance |
remarkable_output_path |
str | Path in reMarkable Cloud to upload processed responses (default: '/E-Ink LLM Output') | instance |
poll_interval |
int | Seconds between cloud folder checks (default: 60) | instance |
logger |
logging.Logger | Logger instance inherited from parent class for error and info logging | instance |
Dependencies
asynciotempfilepathlibtypingloggingprocessorremarkable_cloudrmcl
Required Imports
import asyncio
import tempfile
from pathlib import Path
from typing import Optional, Dict, Any
import logging
from processor import EInkLLMProcessor
from remarkable_cloud import RemarkableCloudManager, RemarkableFileWatcher
from rmcl import Item
Usage Example
# Example 1: Local mode only
processor = RemarkableEInkProcessor(
api_key='your-openai-key',
watch_folder='/path/to/local/folder'
)
await processor.start_watching(process_existing=True, mode='local')
# Example 2: reMarkable Cloud mode
remarkable_config = {
'enabled': True,
'watch_folder_path': '/E-Ink LLM Input',
'output_folder_path': '/E-Ink LLM Output',
'poll_interval': 60,
'one_time_code': 'abc123' # For first-time auth
}
processor = RemarkableEInkProcessor(
api_key='your-openai-key',
remarkable_config=remarkable_config
)
await processor.start_watching(process_existing=True, mode='remarkable')
# Example 3: Both modes simultaneously
processor = RemarkableEInkProcessor(
api_key='your-openai-key',
watch_folder='/path/to/local/folder',
remarkable_config=remarkable_config
)
await processor.start_watching(process_existing=True, mode='both')
Best Practices
- Always call start_watching() within an async context (asyncio.run() or existing event loop)
- Ensure reMarkable Cloud authentication is successful before processing files by checking authenticate_remarkable() return value
- Use 'one_time_code' only for initial authentication; subsequent runs will use stored tokens
- Handle KeyboardInterrupt gracefully when running in 'both' mode to ensure clean shutdown of both watchers
- Process existing files on startup (process_existing=True) to avoid missing files added while processor was offline
- Set appropriate poll_interval based on expected file frequency (default 60 seconds balances responsiveness and API usage)
- Ensure output folders exist in reMarkable Cloud before processing to avoid upload failures
- Use temporary directories for downloaded files to avoid disk space issues with large document volumes
- Monitor logs for authentication failures, upload errors, and processing exceptions
- The class maintains state through cloud_manager and file_watcher attributes; do not modify these directly
Tags
Similar Components
AI-powered semantic similarity - components with related functionality:
-
class MixedCloudProcessor 70.4% similar
-
class RemarkableCloudWatcher 70.1% similar
-
class RemarkableCloudManager 69.3% similar
-
function main_v68 68.2% similar
-
function main_v21 67.2% similar