šŸ” Code Extractor

class RemarkableFileWatcher

Maturity: 51

A unified file watcher class that monitors a reMarkable Cloud folder for new files, supporting both REST API and rmcl client implementations with automatic client type detection.

File:
/tf/active/vicechatdev/e-ink-llm/remarkable_cloud.py
Lines:
461 - 592
Complexity:
moderate

Purpose

RemarkableFileWatcher provides a unified interface for monitoring reMarkable Cloud folders for new files. It automatically detects whether to use REST API or rmcl client based on the cloud_manager's client type, polls the folder at specified intervals, tracks processed files to avoid reprocessing, and executes callbacks when new files are detected. The class handles file downloading, temporary storage, and error recovery during the watch loop.

Source Code

class RemarkableFileWatcher:
    """Unified file watcher that works with both REST API and rmcl clients"""
    
    def __init__(self, cloud_manager: RemarkableCloudManager, 
                 watch_folder_path: str, poll_interval: int = 60):
        self.cloud_manager = cloud_manager
        self.watch_folder_path = watch_folder_path
        self.poll_interval = poll_interval
        self.logger = logging.getLogger(__name__)
        
        # Initialize appropriate watcher
        if cloud_manager.client_type == "rest":
            # Extract folder name from path for REST API
            folder_name = watch_folder_path.strip('/').split('/')[-1] if watch_folder_path.strip('/') else "Root"
            self.rest_watcher = RemarkableRestFileWatcher(
                cloud_manager.rest_client, folder_name, poll_interval
            )
            self.watcher_type = "rest"
        else:
            # Use original implementation for rmcl
            self.watcher_type = "rmcl"
            # Track processed files to avoid reprocessing
            self.processed_files: Set[str] = set()
            self.last_check_time = datetime.now()
    
    async def get_new_files(self) -> List[Union[Dict, Document]]:
        """
        Get list of new files since last check
        
        Returns:
            List of new Document objects/dicts
        """
        try:
            if self.watcher_type == "rest":
                return await self.rest_watcher.get_new_files()
            
            elif self.watcher_type == "rmcl":
                # Original rmcl implementation
                all_files = await self.cloud_manager.list_files_in_folder(
                    self.watch_folder_path, include_subfolders=True
                )
                
                new_files = []
                current_time = datetime.now()
                
                for doc in all_files:
                    # Skip if already processed
                    doc_id = doc.id if hasattr(doc, 'id') else str(doc)
                    if doc_id in self.processed_files:
                        continue
                    
                    # Check if file is new (modified after last check)
                    # Note: rmcl may not provide exact modification times,
                    # so we'll track by ID for now
                    new_files.append(doc)
                    self.processed_files.add(doc_id)
                
                self.last_check_time = current_time
                return new_files
            
            return []
            
        except Exception as e:
            self.logger.error(f"Error checking for new files: {e}")
            return []
    
    async def start_watching(self, callback):
        """
        Start watching for new files
        
        Args:
            callback: Async function to call with new files (signature: async def callback(document, local_file_path))
        """
        if self.watcher_type == "rest":
            # Delegate to REST watcher
            await self.rest_watcher.start_watching(callback)
            return
        
        # Original rmcl implementation
        print(f"šŸ‘ļø  Started watching reMarkable folder: {self.watch_folder_path}")
        print(f"šŸ”„ Checking every {self.poll_interval} seconds...")
        
        # Get initial file list to mark as already processed
        initial_files = await self.cloud_manager.list_files_in_folder(
            self.watch_folder_path, include_subfolders=True
        )
        for doc in initial_files:
            doc_id = doc.id if hasattr(doc, 'id') else str(doc)
            self.processed_files.add(doc_id)
        
        print(f"šŸ“ Tracking {len(initial_files)} existing files")
        
        try:
            while True:
                try:
                    new_files = await self.get_new_files()
                    
                    if new_files:
                        print(f"šŸ“„ Found {len(new_files)} new file(s)")
                        
                        # Create temporary directory for downloads
                        with tempfile.TemporaryDirectory() as temp_dir:
                            temp_path = Path(temp_dir)
                            
                            for doc in new_files:
                                doc_name = doc.name if hasattr(doc, 'name') else "Unknown"
                                print(f"šŸ“„ Processing: {doc_name}")
                                
                                # Download file
                                local_file = await self.cloud_manager.download_document(doc, temp_path)
                                
                                if local_file:
                                    # Call the callback function
                                    try:
                                        await callback(doc, local_file)
                                    except Exception as e:
                                        self.logger.error(f"Error in callback for {doc_name}: {e}")
                                        print(f"āŒ Error processing {doc_name}: {e}")
                    
                    # Wait before next check
                    await asyncio.sleep(self.poll_interval)
                    
                except Exception as e:
                    self.logger.error(f"Error in watch loop: {e}")
                    print(f"āŒ Watch error: {e}")
                    await asyncio.sleep(self.poll_interval)
                    
        except KeyboardInterrupt:
            print(f"\nšŸ›‘ Stopping reMarkable file watcher...")
        except Exception as e:
            self.logger.error(f"Fatal error in file watcher: {e}")
            print(f"āŒ Fatal watcher error: {e}")

Parameters

Name Type Default Kind
bases - -

Parameter Details

cloud_manager: An instance of RemarkableCloudManager that provides the connection to reMarkable Cloud. Must have a client_type attribute ('rest' or 'rmcl') and methods for listing and downloading files.

watch_folder_path: String path to the folder to monitor. For REST API, the folder name is extracted from the path. For rmcl, the full path is used. Can be empty string or '/' for root folder.

poll_interval: Integer number of seconds to wait between checks for new files. Default is 60 seconds. Lower values increase responsiveness but may increase API usage.

Return Value

The constructor returns an instance of RemarkableFileWatcher. The get_new_files() method returns a List[Union[Dict, Document]] containing new files found since the last check (empty list if none found or on error). The start_watching() method does not return a value but runs indefinitely until interrupted.

Class Interface

Methods

__init__(self, cloud_manager: RemarkableCloudManager, watch_folder_path: str, poll_interval: int = 60)

Purpose: Initialize the file watcher with cloud manager, folder path, and polling interval. Automatically detects and configures the appropriate watcher type (REST or rmcl).

Parameters:

  • cloud_manager: RemarkableCloudManager instance providing cloud access
  • watch_folder_path: Path to the folder to monitor for new files
  • poll_interval: Seconds between polling checks (default 60)

Returns: None (constructor)

async get_new_files(self) -> List[Union[Dict, Document]]

Purpose: Retrieve a list of new files that have appeared in the watched folder since the last check. Tracks processed files to avoid duplicates.

Returns: List of Document objects (rmcl) or dictionaries (REST API) representing new files. Returns empty list on error or if no new files found.

async start_watching(self, callback)

Purpose: Start continuous monitoring of the folder, calling the provided callback function for each new file detected. Runs indefinitely until interrupted.

Parameters:

  • callback: Async function with signature: async def callback(document, local_file_path). Called for each new file with the document object and path to downloaded file.

Returns: None. Runs until KeyboardInterrupt or fatal error.

Attributes

Name Type Description Scope
cloud_manager RemarkableCloudManager The cloud manager instance used to interact with reMarkable Cloud API instance
watch_folder_path str Path to the folder being monitored for new files instance
poll_interval int Number of seconds to wait between polling checks instance
logger logging.Logger Logger instance for recording errors and debug information instance
watcher_type str Type of watcher being used: 'rest' or 'rmcl' instance
rest_watcher RemarkableRestFileWatcher REST API watcher instance (only present when watcher_type is 'rest') instance
processed_files Set[str] Set of document IDs that have already been processed (only for rmcl watcher type) instance
last_check_time datetime Timestamp of the last check for new files (only for rmcl watcher type) instance

Dependencies

  • asyncio
  • os
  • tempfile
  • time
  • uuid
  • datetime
  • pathlib
  • typing
  • logging
  • json
  • remarkable_rest_client
  • rmcl

Required Imports

import asyncio
import tempfile
import logging
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional, Set, Union
from remarkable_rest_client import RemarkableRestClient, RemarkableRestFileWatcher
from rmcl import Document

Conditional/Optional Imports

These imports are only needed under specific conditions:

from remarkable_rest_client import RemarkableRestFileWatcher

Condition: only when cloud_manager.client_type == 'rest'

Required (conditional)
from rmcl import Document

Condition: only when cloud_manager.client_type == 'rmcl'

Required (conditional)

Usage Example

import asyncio
from pathlib import Path
from remarkable_cloud_manager import RemarkableCloudManager
from remarkable_file_watcher import RemarkableFileWatcher

async def process_new_file(document, local_file_path):
    """Callback function to process new files"""
    print(f"New file: {document.name}")
    print(f"Downloaded to: {local_file_path}")
    # Process the file here

async def main():
    # Initialize cloud manager
    cloud_manager = RemarkableCloudManager(client_type='rest', token='your_token')
    
    # Create file watcher for a specific folder
    watcher = RemarkableFileWatcher(
        cloud_manager=cloud_manager,
        watch_folder_path='/My Notes/Work',
        poll_interval=30
    )
    
    # Check for new files once
    new_files = await watcher.get_new_files()
    print(f"Found {len(new_files)} new files")
    
    # Or start continuous watching
    await watcher.start_watching(process_new_file)

if __name__ == '__main__':
    asyncio.run(main())

Best Practices

  • Always use async/await when calling methods as this is an async class
  • Provide a proper async callback function to start_watching() with signature: async def callback(document, local_file_path)
  • The watcher maintains state (processed_files set) to avoid reprocessing files, so use a single instance per folder
  • Files are downloaded to temporary directories that are automatically cleaned up after callback execution
  • Handle KeyboardInterrupt to gracefully stop the start_watching() loop
  • The watcher automatically detects client type from cloud_manager, no manual configuration needed
  • For rmcl client, initial files in the folder are marked as processed to avoid processing existing files on startup
  • Error handling is built-in but callback functions should implement their own error handling for robustness
  • The poll_interval should be balanced between responsiveness and API rate limits
  • The watcher will continue running even if individual file processing fails

Similar Components

AI-powered semantic similarity - components with related functionality:

  • class RemarkableRestFileWatcher 85.5% similar

    A file watcher class that monitors a specific folder on a reMarkable tablet using the REST API, polling for new files at regular intervals and triggering callbacks when new files are detected.

    From: /tf/active/vicechatdev/e-ink-llm/remarkable_rest_client.py
  • class RemarkableCloudWatcher 78.4% similar

    Monitors the reMarkable Cloud 'gpt_out' folder for new documents, automatically downloads them, and converts .rm (reMarkable native) files to PDF format.

    From: /tf/active/vicechatdev/e-ink-llm/mixed_cloud_processor.py
  • class RemarkableCloudManager 72.9% similar

    Unified manager for reMarkable Cloud operations that uses REST API as primary method with rmcl library as fallback, handling authentication, file operations, and folder management.

    From: /tf/active/vicechatdev/e-ink-llm/remarkable_cloud.py
  • function main_v21 68.8% similar

    Asynchronous main function that runs a reMarkable tablet file watcher as a separate process, monitoring a specified folder for new documents, processing them, and uploading responses back to the reMarkable Cloud.

    From: /tf/active/vicechatdev/e-ink-llm/run_remarkable_bridge.py
  • class RemarkableAPIClient 68.0% similar

    Asynchronous API client for interacting with the reMarkable Cloud service, providing methods for file management, folder operations, and document synchronization.

    From: /tf/active/vicechatdev/e-ink-llm/remarkable_api_endpoints.py
← Back to Browse