class RemarkableFileWatcher
A unified file watcher class that monitors a reMarkable Cloud folder for new files, supporting both REST API and rmcl client implementations with automatic client type detection.
/tf/active/vicechatdev/e-ink-llm/remarkable_cloud.py
461 - 592
moderate
Purpose
RemarkableFileWatcher provides a unified interface for monitoring reMarkable Cloud folders for new files. It automatically detects whether to use REST API or rmcl client based on the cloud_manager's client type, polls the folder at specified intervals, tracks processed files to avoid reprocessing, and executes callbacks when new files are detected. The class handles file downloading, temporary storage, and error recovery during the watch loop.
Source Code
class RemarkableFileWatcher:
"""Unified file watcher that works with both REST API and rmcl clients"""
def __init__(self, cloud_manager: RemarkableCloudManager,
watch_folder_path: str, poll_interval: int = 60):
self.cloud_manager = cloud_manager
self.watch_folder_path = watch_folder_path
self.poll_interval = poll_interval
self.logger = logging.getLogger(__name__)
# Initialize appropriate watcher
if cloud_manager.client_type == "rest":
# Extract folder name from path for REST API
folder_name = watch_folder_path.strip('/').split('/')[-1] if watch_folder_path.strip('/') else "Root"
self.rest_watcher = RemarkableRestFileWatcher(
cloud_manager.rest_client, folder_name, poll_interval
)
self.watcher_type = "rest"
else:
# Use original implementation for rmcl
self.watcher_type = "rmcl"
# Track processed files to avoid reprocessing
self.processed_files: Set[str] = set()
self.last_check_time = datetime.now()
async def get_new_files(self) -> List[Union[Dict, Document]]:
"""
Get list of new files since last check
Returns:
List of new Document objects/dicts
"""
try:
if self.watcher_type == "rest":
return await self.rest_watcher.get_new_files()
elif self.watcher_type == "rmcl":
# Original rmcl implementation
all_files = await self.cloud_manager.list_files_in_folder(
self.watch_folder_path, include_subfolders=True
)
new_files = []
current_time = datetime.now()
for doc in all_files:
# Skip if already processed
doc_id = doc.id if hasattr(doc, 'id') else str(doc)
if doc_id in self.processed_files:
continue
# Check if file is new (modified after last check)
# Note: rmcl may not provide exact modification times,
# so we'll track by ID for now
new_files.append(doc)
self.processed_files.add(doc_id)
self.last_check_time = current_time
return new_files
return []
except Exception as e:
self.logger.error(f"Error checking for new files: {e}")
return []
async def start_watching(self, callback):
"""
Start watching for new files
Args:
callback: Async function to call with new files (signature: async def callback(document, local_file_path))
"""
if self.watcher_type == "rest":
# Delegate to REST watcher
await self.rest_watcher.start_watching(callback)
return
# Original rmcl implementation
print(f"šļø Started watching reMarkable folder: {self.watch_folder_path}")
print(f"š Checking every {self.poll_interval} seconds...")
# Get initial file list to mark as already processed
initial_files = await self.cloud_manager.list_files_in_folder(
self.watch_folder_path, include_subfolders=True
)
for doc in initial_files:
doc_id = doc.id if hasattr(doc, 'id') else str(doc)
self.processed_files.add(doc_id)
print(f"š Tracking {len(initial_files)} existing files")
try:
while True:
try:
new_files = await self.get_new_files()
if new_files:
print(f"š„ Found {len(new_files)} new file(s)")
# Create temporary directory for downloads
with tempfile.TemporaryDirectory() as temp_dir:
temp_path = Path(temp_dir)
for doc in new_files:
doc_name = doc.name if hasattr(doc, 'name') else "Unknown"
print(f"š Processing: {doc_name}")
# Download file
local_file = await self.cloud_manager.download_document(doc, temp_path)
if local_file:
# Call the callback function
try:
await callback(doc, local_file)
except Exception as e:
self.logger.error(f"Error in callback for {doc_name}: {e}")
print(f"ā Error processing {doc_name}: {e}")
# Wait before next check
await asyncio.sleep(self.poll_interval)
except Exception as e:
self.logger.error(f"Error in watch loop: {e}")
print(f"ā Watch error: {e}")
await asyncio.sleep(self.poll_interval)
except KeyboardInterrupt:
print(f"\nš Stopping reMarkable file watcher...")
except Exception as e:
self.logger.error(f"Fatal error in file watcher: {e}")
print(f"ā Fatal watcher error: {e}")
Parameters
| Name | Type | Default | Kind |
|---|---|---|---|
bases |
- | - |
Parameter Details
cloud_manager: An instance of RemarkableCloudManager that provides the connection to reMarkable Cloud. Must have a client_type attribute ('rest' or 'rmcl') and methods for listing and downloading files.
watch_folder_path: String path to the folder to monitor. For REST API, the folder name is extracted from the path. For rmcl, the full path is used. Can be empty string or '/' for root folder.
poll_interval: Integer number of seconds to wait between checks for new files. Default is 60 seconds. Lower values increase responsiveness but may increase API usage.
Return Value
The constructor returns an instance of RemarkableFileWatcher. The get_new_files() method returns a List[Union[Dict, Document]] containing new files found since the last check (empty list if none found or on error). The start_watching() method does not return a value but runs indefinitely until interrupted.
Class Interface
Methods
__init__(self, cloud_manager: RemarkableCloudManager, watch_folder_path: str, poll_interval: int = 60)
Purpose: Initialize the file watcher with cloud manager, folder path, and polling interval. Automatically detects and configures the appropriate watcher type (REST or rmcl).
Parameters:
cloud_manager: RemarkableCloudManager instance providing cloud accesswatch_folder_path: Path to the folder to monitor for new filespoll_interval: Seconds between polling checks (default 60)
Returns: None (constructor)
async get_new_files(self) -> List[Union[Dict, Document]]
Purpose: Retrieve a list of new files that have appeared in the watched folder since the last check. Tracks processed files to avoid duplicates.
Returns: List of Document objects (rmcl) or dictionaries (REST API) representing new files. Returns empty list on error or if no new files found.
async start_watching(self, callback)
Purpose: Start continuous monitoring of the folder, calling the provided callback function for each new file detected. Runs indefinitely until interrupted.
Parameters:
callback: Async function with signature: async def callback(document, local_file_path). Called for each new file with the document object and path to downloaded file.
Returns: None. Runs until KeyboardInterrupt or fatal error.
Attributes
| Name | Type | Description | Scope |
|---|---|---|---|
cloud_manager |
RemarkableCloudManager | The cloud manager instance used to interact with reMarkable Cloud API | instance |
watch_folder_path |
str | Path to the folder being monitored for new files | instance |
poll_interval |
int | Number of seconds to wait between polling checks | instance |
logger |
logging.Logger | Logger instance for recording errors and debug information | instance |
watcher_type |
str | Type of watcher being used: 'rest' or 'rmcl' | instance |
rest_watcher |
RemarkableRestFileWatcher | REST API watcher instance (only present when watcher_type is 'rest') | instance |
processed_files |
Set[str] | Set of document IDs that have already been processed (only for rmcl watcher type) | instance |
last_check_time |
datetime | Timestamp of the last check for new files (only for rmcl watcher type) | instance |
Dependencies
asyncioostempfiletimeuuiddatetimepathlibtypingloggingjsonremarkable_rest_clientrmcl
Required Imports
import asyncio
import tempfile
import logging
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional, Set, Union
from remarkable_rest_client import RemarkableRestClient, RemarkableRestFileWatcher
from rmcl import Document
Conditional/Optional Imports
These imports are only needed under specific conditions:
from remarkable_rest_client import RemarkableRestFileWatcher
Condition: only when cloud_manager.client_type == 'rest'
Required (conditional)from rmcl import Document
Condition: only when cloud_manager.client_type == 'rmcl'
Required (conditional)Usage Example
import asyncio
from pathlib import Path
from remarkable_cloud_manager import RemarkableCloudManager
from remarkable_file_watcher import RemarkableFileWatcher
async def process_new_file(document, local_file_path):
"""Callback function to process new files"""
print(f"New file: {document.name}")
print(f"Downloaded to: {local_file_path}")
# Process the file here
async def main():
# Initialize cloud manager
cloud_manager = RemarkableCloudManager(client_type='rest', token='your_token')
# Create file watcher for a specific folder
watcher = RemarkableFileWatcher(
cloud_manager=cloud_manager,
watch_folder_path='/My Notes/Work',
poll_interval=30
)
# Check for new files once
new_files = await watcher.get_new_files()
print(f"Found {len(new_files)} new files")
# Or start continuous watching
await watcher.start_watching(process_new_file)
if __name__ == '__main__':
asyncio.run(main())
Best Practices
- Always use async/await when calling methods as this is an async class
- Provide a proper async callback function to start_watching() with signature: async def callback(document, local_file_path)
- The watcher maintains state (processed_files set) to avoid reprocessing files, so use a single instance per folder
- Files are downloaded to temporary directories that are automatically cleaned up after callback execution
- Handle KeyboardInterrupt to gracefully stop the start_watching() loop
- The watcher automatically detects client type from cloud_manager, no manual configuration needed
- For rmcl client, initial files in the folder are marked as processed to avoid processing existing files on startup
- Error handling is built-in but callback functions should implement their own error handling for robustness
- The poll_interval should be balanced between responsiveness and API rate limits
- The watcher will continue running even if individual file processing fails
Tags
Similar Components
AI-powered semantic similarity - components with related functionality:
-
class RemarkableRestFileWatcher 85.5% similar
-
class RemarkableCloudWatcher 78.4% similar
-
class RemarkableCloudManager 72.9% similar
-
function main_v21 68.8% similar
-
class RemarkableAPIClient 68.0% similar