🔍 Code Extractor

class RemarkableCloudWatcher

Maturity: 46

Monitors the reMarkable Cloud 'gpt_out' folder for new documents, automatically downloads them, and converts .rm (reMarkable native) files to PDF format.

File:
/tf/active/vicechatdev/e-ink-llm/mixed_cloud_processor.py
Lines:
36 - 473
Complexity:
complex

Purpose

This class provides a complete solution for watching a specific folder in reMarkable Cloud storage, detecting new documents, and extracting them as PDFs. It handles both native PDF files and reMarkable's proprietary .rm format, converting the latter using the 'rmc' command-line tool. The class maintains state to avoid reprocessing files, manages temporary storage, and supports multi-page notebook conversion with PDF concatenation.

Source Code

class RemarkableCloudWatcher:
    """Watches reMarkable Cloud gpt_out folder for new files"""
    
    def __init__(self, remarkable_session, logger):
        self.session = remarkable_session
        self.logger = logger
        self.base_url = "https://eu.tectonic.remarkable.com"
        self.processed_files = set()  # Track processed file hashes
        self.gpt_out_folder_uuid = None
        self.temp_dir = Path(tempfile.mkdtemp(prefix="remarkable_watch_"))
        
    def __del__(self):
        """Cleanup temporary directory"""
        if hasattr(self, 'temp_dir') and self.temp_dir.exists():
            shutil.rmtree(self.temp_dir, ignore_errors=True)
    
    async def initialize(self):
        """Initialize the watcher by finding the gpt_out folder"""
        self.logger.info("🔍 Initializing reMarkable Cloud watcher...")
        
        try:
            # Discover all folders to find gpt_out
            all_nodes = await self._discover_all_nodes()
            
            # Find gpt_out folder
            for uuid, node in all_nodes.items():
                if (node.get('node_type') == 'folder' and 
                    node.get('name', '').lower() in ['gpt_out', 'gpt out', 'gptout']):
                    self.gpt_out_folder_uuid = uuid
                    self.logger.info(f"✅ Found gpt_out folder: {uuid}")
                    break
            
            if not self.gpt_out_folder_uuid:
                self.logger.warning("⚠️ gpt_out folder not found in reMarkable Cloud")
                return False
                
            return True
            
        except Exception as e:
            self.logger.error(f"❌ Failed to initialize reMarkable watcher: {e}")
            return False
    
    async def _discover_all_nodes(self) -> Dict[str, Dict]:
        """Discover all nodes in reMarkable Cloud using local_replica_v2.py method"""
        all_nodes = {}
        
        try:
            # Get root hash
            root_response = self.session.get(f"{self.base_url}/sync/v4/root")
            root_response.raise_for_status()
            root_data = root_response.json()
            root_hash = root_data.get('hash')
            
            if not root_hash:
                return all_nodes
            
            # Discover nodes recursively
            discovered_hashes = set()
            hashes_to_process = [root_hash]
            
            while hashes_to_process:
                current_hash = hashes_to_process.pop(0)
                
                if current_hash in discovered_hashes:
                    continue
                    
                discovered_hashes.add(current_hash)
                
                # Fetch and parse content
                content_info = await self._fetch_hash_content(current_hash)
                if not content_info:
                    continue
                
                parsed = self._parse_directory_listing(content_info['content'])
                
                # Extract metadata if available
                metadata = {}
                node_name = f"unknown_{current_hash[:8]}"
                node_type = "folder"
                parent_uuid = None
                
                for component in parsed['data_components']:
                    if component['component_type'] == 'metadata':
                        extracted_metadata = await self._extract_metadata(component['hash'])
                        if extracted_metadata:
                            metadata = extracted_metadata
                            node_name = metadata.get('visibleName', node_name)
                            if metadata.get('type') == 'DocumentType':
                                node_type = "document"
                            elif metadata.get('type') == 'CollectionType':
                                node_type = "folder"
                            parent_uuid = metadata.get('parent', '') or None
                        break
                
                # Determine node UUID
                node_uuid = None
                for component in parsed['child_objects']:
                    node_uuid = component['uuid_component']
                    break
                if not node_uuid and parsed['data_components']:
                    component_name = parsed['data_components'][0]['uuid_component']
                    if '.' in component_name:
                        node_uuid = component_name.split('.')[0]
                if not node_uuid:
                    node_uuid = current_hash[:32]
                
                # Store node
                all_nodes[node_uuid] = {
                    'uuid': node_uuid,
                    'hash': current_hash,
                    'name': node_name,
                    'node_type': node_type,
                    'parent_uuid': parent_uuid,
                    'metadata': metadata,
                    'parsed_data': parsed
                }
                
                # Add child hashes to process
                for child_obj in parsed['child_objects']:
                    if child_obj['hash'] not in discovered_hashes:
                        hashes_to_process.append(child_obj['hash'])
            
            return all_nodes
            
        except Exception as e:
            self.logger.error(f"❌ Failed to discover nodes: {e}")
            return all_nodes
    
    async def _fetch_hash_content(self, hash_ref: str) -> Optional[Dict[str, Any]]:
        """Fetch content from reMarkable cloud by hash"""
        try:
            url = f"{self.base_url}/sync/v3/files/{hash_ref}"
            response = self.session.get(url)
            response.raise_for_status()
            
            return {
                'hash': hash_ref,
                'content': response.content,
                'size': len(response.content)
            }
            
        except Exception as e:
            self.logger.debug(f"Failed to fetch {hash_ref[:16]}...: {e}")
            return None
    
    def _parse_directory_listing(self, content: bytes) -> Dict[str, Any]:
        """Parse directory listing using local_replica_v2.py method"""
        try:
            text_content = content.decode('utf-8')
        except UnicodeDecodeError:
            return {'child_objects': [], 'data_components': []}
        
        result = {
            'child_objects': [],
            'data_components': []
        }
        
        lines = text_content.split('\n')
        if lines and lines[0].strip().isdigit():
            lines = lines[1:]  # Skip count line
        
        entry_pattern = r'^([a-f0-9]{64}):([0-9a-fA-F]+):([a-f0-9-/]+(?:\.[^:]+)?):(\d+):(\d+)$'
        
        for line in lines:
            line = line.strip()
            if not line:
                continue
                
            match = re.match(entry_pattern, line, re.IGNORECASE)
            if match:
                hash_val, flags, uuid_component, type_val, size_val = match.groups()
                
                entry_info = {
                    'hash': hash_val,
                    'flags': flags,
                    'uuid_component': uuid_component,
                    'type': type_val,
                    'size': int(size_val)
                }
                
                if '.' in uuid_component:
                    # Data component (.content, .metadata, .pdf, .rm, etc.)
                    component_type = uuid_component.split('.')[-1]
                    if '/' in component_type:  # Handle .rm files like "uuid/filename.rm"
                        component_type = component_type.split('/')[-1]
                    entry_info['component_type'] = component_type
                    result['data_components'].append(entry_info)
                else:
                    # Child object (pure UUID)
                    result['child_objects'].append(entry_info)
        
        return result
    
    async def _extract_metadata(self, metadata_hash: str) -> Optional[Dict[str, Any]]:
        """Extract metadata from hash"""
        content_info = await self._fetch_hash_content(metadata_hash)
        if not content_info:
            return None
        
        try:
            text_content = content_info['content'].decode('utf-8')
            return json.loads(text_content)
        except (UnicodeDecodeError, json.JSONDecodeError) as e:
            self.logger.debug(f"Failed to parse metadata {metadata_hash[:16]}...: {e}")
            return None
    
    async def check_for_new_files(self) -> List[Path]:
        """Check gpt_out folder for new files and convert them to PDFs"""
        if not self.gpt_out_folder_uuid:
            return []
        
        new_pdf_files = []
        
        try:
            # Re-discover nodes to get current state
            all_nodes = await self._discover_all_nodes()
            
            # Find documents in gpt_out folder
            gpt_out_documents = []
            for uuid, node in all_nodes.items():
                if (node.get('node_type') == 'document' and 
                    node.get('parent_uuid') == self.gpt_out_folder_uuid):
                    gpt_out_documents.append(node)
            
            self.logger.debug(f"Found {len(gpt_out_documents)} documents in gpt_out folder")
            
            for doc_node in gpt_out_documents:
                doc_hash = doc_node['hash']
                
                # Skip if already processed
                if doc_hash in self.processed_files:
                    continue
                
                self.logger.info(f"📄 Processing new document: {doc_node['name']}")
                
                # Extract the document
                pdf_file = await self._extract_document(doc_node, all_nodes)
                if pdf_file:
                    new_pdf_files.append(pdf_file)
                    self.processed_files.add(doc_hash)
        
        except Exception as e:
            self.logger.error(f"❌ Error checking for new files: {e}")
        
        return new_pdf_files
    
    async def _extract_document(self, doc_node: Dict, all_nodes: Dict) -> Optional[Path]:
        """Extract a document from reMarkable Cloud, converting .rm files to PDF if needed"""
        try:
            parsed_data = doc_node.get('parsed_data', {})
            doc_name = doc_node.get('name', 'unknown')
            
            # Create document-specific temp directory
            doc_temp_dir = self.temp_dir / f"doc_{doc_node['uuid'][:8]}"
            doc_temp_dir.mkdir(exist_ok=True)
            
            # Check for PDF content first
            pdf_hash = None
            rm_hashes = []
            
            for component in parsed_data.get('data_components', []):
                if component['component_type'] == 'pdf':
                    pdf_hash = component['hash']
                elif component['component_type'] == 'rm':
                    rm_hashes.append(component['hash'])
            
            # If PDF exists, extract it directly
            if pdf_hash:
                self.logger.info(f"📄 Extracting PDF: {doc_name}")
                pdf_content = await self._fetch_hash_content(pdf_hash)
                if pdf_content:
                    pdf_path = doc_temp_dir / f"{doc_name}.pdf"
                    with open(pdf_path, 'wb') as f:
                        f.write(pdf_content['content'])
                    return pdf_path
            
            # If .rm files exist, convert to PDF
            elif rm_hashes:
                self.logger.info(f"🖊️ Converting .rm files to PDF: {doc_name}")
                return await self._convert_rm_to_pdf(doc_name, rm_hashes, doc_temp_dir)
            
            else:
                self.logger.warning(f"⚠️ No PDF or .rm content found for: {doc_name}")
                return None
        
        except Exception as e:
            self.logger.error(f"❌ Error extracting document {doc_name}: {e}")
            return None
    
    async def _convert_rm_to_pdf(self, doc_name: str, rm_hashes: List[str], output_dir: Path) -> Optional[Path]:
        """Convert .rm files to PDF using rmc tool (from local_replica_v2.py)"""
        try:
            # Create notebook directory for .rm files
            notebook_dir = output_dir / "notebook"
            notebook_dir.mkdir(exist_ok=True)
            
            # Download .rm files
            rm_files = []
            for i, rm_hash in enumerate(rm_hashes):
                rm_content = await self._fetch_hash_content(rm_hash)
                if rm_content:
                    rm_path = notebook_dir / f"page_{i+1}.rm"
                    with open(rm_path, 'wb') as f:
                        f.write(rm_content['content'])
                    rm_files.append(rm_path)
            
            if not rm_files:
                self.logger.warning(f"⚠️ No .rm files downloaded for {doc_name}")
                return None
            
            # Sort files by page number
            rm_files.sort(key=lambda x: int(x.stem.split('_')[1]))
            
            # Final PDF path
            final_pdf_path = output_dir / f"{doc_name}.pdf"
            
            if len(rm_files) == 1:
                # Single page - convert directly
                result = subprocess.run([
                    "rmc", str(rm_files[0]), "-o", str(final_pdf_path)
                ], capture_output=True, text=True, timeout=60)
                
                if result.returncode == 0 and final_pdf_path.exists() and final_pdf_path.stat().st_size > 0:
                    self.logger.info(f"✅ Converted single page to PDF: {final_pdf_path}")
                    return final_pdf_path
                else:
                    self.logger.error(f"❌ rmc conversion failed: {result.stderr}")
                    return None
            
            else:
                # Multiple pages - convert each to temporary PDF and concatenate
                temp_pdfs = []
                
                for i, rm_file in enumerate(rm_files):
                    temp_pdf = notebook_dir / f"temp_page_{i+1}.pdf"
                    
                    result = subprocess.run([
                        "rmc", str(rm_file), "-o", str(temp_pdf)
                    ], capture_output=True, text=True, timeout=60)
                    
                    if result.returncode == 0 and temp_pdf.exists() and temp_pdf.stat().st_size > 0:
                        temp_pdfs.append(temp_pdf)
                    else:
                        self.logger.error(f"❌ rmc conversion failed for page {i+1}: {result.stderr}")
                        return None
                
                if temp_pdfs:
                    # Concatenate PDFs using PyPDF2 or similar
                    success = await self._concatenate_pdfs(temp_pdfs, final_pdf_path)
                    if success:
                        self.logger.info(f"✅ Converted multi-page notebook to PDF: {final_pdf_path}")
                        return final_pdf_path
                
                return None
        
        except Exception as e:
            self.logger.error(f"❌ Error converting .rm files: {e}")
            return None
    
    async def _concatenate_pdfs(self, pdf_files: List[Path], output_path: Path) -> bool:
        """Concatenate multiple PDF files into one"""
        if len(pdf_files) <= 1:
            # If only one file, just copy it
            if pdf_files:
                shutil.copy2(pdf_files[0], output_path)
                return True
            return False
        
        try:
            # Try PyPDF2 first (newest and most stable)
            from PyPDF2 import PdfWriter, PdfReader
            
            writer = PdfWriter()
            
            for pdf_file in pdf_files:
                reader = PdfReader(str(pdf_file))
                for page in reader.pages:
                    writer.add_page(page)
            
            with open(output_path, 'wb') as output_file:
                writer.write(output_file)
            
            self.logger.info(f"✅ PDF concatenation successful using PyPDF2")
            return True
            
        except ImportError:
            try:
                # Try PyPDF4 as fallback
                from PyPDF4 import PdfFileWriter, PdfFileReader
                
                writer = PdfFileWriter()
                
                for pdf_file in pdf_files:
                    reader = PdfFileReader(str(pdf_file))
                    for page_num in range(reader.getNumPages()):
                        page = reader.getPage(page_num)
                        writer.addPage(page)
                
                with open(output_path, 'wb') as output_file:
                    writer.write(output_file)
                
                self.logger.info(f"✅ PDF concatenation successful using PyPDF4")
                return True
                
            except ImportError:
                # Fallback to using system commands if no PDF library available
                self.logger.warning("⚠️ No PDF library available (PyPDF2/PyPDF4), trying system commands")
                self.logger.warning("💡 Install PyPDF2 for better performance: pip install PyPDF2")
                
                try:
                    # Try using pdftk if available
                    cmd = ["pdftk"] + [str(f) for f in pdf_files] + ["cat", "output", str(output_path)]
                    result = subprocess.run(cmd, capture_output=True, text=True, timeout=120)
                    
                    if result.returncode == 0:
                        self.logger.info(f"✅ PDF concatenation successful using pdftk")
                        return True
                    
                    # Try using gs (ghostscript) as fallback
                    cmd = ["gs", "-dNOPAUSE", "-dBATCH", "-sDEVICE=pdfwrite", f"-sOutputFile={output_path}"] + [str(f) for f in pdf_files]
                    result = subprocess.run(cmd, capture_output=True, text=True, timeout=120)
                    
                    if result.returncode == 0:
                        self.logger.info(f"✅ PDF concatenation successful using ghostscript")
                        return True
                    else:
                        self.logger.error("❌ All PDF concatenation methods failed")
                        self.logger.error("💡 Install dependencies: pip install PyPDF2 OR sudo apt-get install pdftk ghostscript")
                        return False
                    
                except Exception as e:
                    self.logger.error(f"❌ PDF concatenation failed: {e}")
                    self.logger.error("💡 Install dependencies: pip install PyPDF2 OR sudo apt-get install pdftk ghostscript")
                    return False
        
        except Exception as e:
            self.logger.error(f"❌ PDF concatenation error: {e}")
            return False

Parameters

Name Type Default Kind
bases - -

Parameter Details

remarkable_session: An authenticated requests.Session object configured with reMarkable Cloud authentication headers and tokens. This session is used for all API calls to the reMarkable Cloud service.

logger: A logging.Logger instance for outputting status messages, errors, and debug information throughout the watching and extraction process.

Return Value

Instantiation returns a RemarkableCloudWatcher object. The main method check_for_new_files() returns a List[Path] containing paths to newly extracted PDF files. The initialize() method returns a boolean indicating success/failure of finding the gpt_out folder.

Class Interface

Methods

__init__(self, remarkable_session, logger)

Purpose: Initialize the watcher with authentication session and logger, create temporary directory for file processing

Parameters:

  • remarkable_session: Authenticated requests.Session for reMarkable Cloud API
  • logger: Logger instance for status and error messages

Returns: None (constructor)

__del__(self)

Purpose: Cleanup temporary directory when object is destroyed

Returns: None

async initialize(self) -> bool

Purpose: Find and store the UUID of the gpt_out folder in reMarkable Cloud by discovering all nodes

Returns: Boolean indicating whether gpt_out folder was successfully found

async _discover_all_nodes(self) -> Dict[str, Dict]

Purpose: Recursively discover all nodes (folders and documents) in reMarkable Cloud storage

Returns: Dictionary mapping UUIDs to node information (name, type, parent, metadata, hash)

async _fetch_hash_content(self, hash_ref: str) -> Optional[Dict[str, Any]]

Purpose: Download content from reMarkable Cloud using a content hash reference

Parameters:

  • hash_ref: 64-character hash identifying the content to fetch

Returns: Dictionary with 'hash', 'content' (bytes), and 'size' keys, or None on failure

_parse_directory_listing(self, content: bytes) -> Dict[str, Any]

Purpose: Parse reMarkable's directory listing format to extract child objects and data components

Parameters:

  • content: Raw bytes of directory listing content

Returns: Dictionary with 'child_objects' (folders/documents) and 'data_components' (files like .pdf, .rm, .metadata)

async _extract_metadata(self, metadata_hash: str) -> Optional[Dict[str, Any]]

Purpose: Fetch and parse JSON metadata for a document or folder

Parameters:

  • metadata_hash: Hash reference to the metadata file

Returns: Parsed JSON metadata dictionary containing visibleName, type, parent, etc., or None on failure

async check_for_new_files(self) -> List[Path]

Purpose: Main method to check gpt_out folder for new documents and extract them as PDFs

Returns: List of Path objects pointing to newly extracted PDF files in temporary directory

async _extract_document(self, doc_node: Dict, all_nodes: Dict) -> Optional[Path]

Purpose: Extract a single document from reMarkable Cloud, handling both PDF and .rm formats

Parameters:

  • doc_node: Dictionary containing document metadata and parsed data
  • all_nodes: Complete node tree for reference

Returns: Path to extracted PDF file, or None on failure

async _convert_rm_to_pdf(self, doc_name: str, rm_hashes: List[str], output_dir: Path) -> Optional[Path]

Purpose: Convert reMarkable .rm files to PDF using the rmc tool, handling single and multi-page documents

Parameters:

  • doc_name: Name of the document for output filename
  • rm_hashes: List of hash references to .rm page files
  • output_dir: Directory to store output PDF and temporary files

Returns: Path to final PDF file, or None on failure

async _concatenate_pdfs(self, pdf_files: List[Path], output_path: Path) -> bool

Purpose: Merge multiple PDF files into a single PDF, trying PyPDF2, PyPDF4, pdftk, and ghostscript in order

Parameters:

  • pdf_files: List of Path objects to PDF files to concatenate
  • output_path: Path where merged PDF should be written

Returns: Boolean indicating success or failure of concatenation

Attributes

Name Type Description Scope
session requests.Session Authenticated session for making API requests to reMarkable Cloud instance
logger logging.Logger Logger instance for outputting messages instance
base_url str Base URL for reMarkable Cloud API (https://eu.tectonic.remarkable.com) instance
processed_files Set[str] Set of document hashes that have already been processed to avoid reprocessing instance
gpt_out_folder_uuid Optional[str] UUID of the gpt_out folder in reMarkable Cloud, set during initialization instance
temp_dir Path Temporary directory for storing downloaded and converted files, automatically cleaned up instance

Dependencies

  • asyncio
  • json
  • re
  • subprocess
  • tempfile
  • shutil
  • pathlib
  • typing
  • datetime
  • logging
  • requests
  • PyPDF2
  • PyPDF4

Required Imports

import asyncio
import json
import re
import subprocess
import tempfile
import shutil
from pathlib import Path
from typing import Dict, List, Optional, Any
import logging
import requests

Conditional/Optional Imports

These imports are only needed under specific conditions:

from PyPDF2 import PdfWriter, PdfReader

Condition: Required for PDF concatenation when converting multi-page .rm notebooks. Falls back to PyPDF4 if not available.

Optional
from PyPDF4 import PdfFileWriter, PdfFileReader

Condition: Fallback for PDF concatenation if PyPDF2 is not available. Can also fall back to system commands (pdftk, ghostscript).

Optional

Usage Example

import asyncio
import logging
import requests
from remarkable_cloud_watcher import RemarkableCloudWatcher

# Setup logger
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)

# Create authenticated session (example - actual auth is more complex)
session = requests.Session()
session.headers.update({
    'Authorization': 'Bearer YOUR_TOKEN',
    'User-Agent': 'remarkable-cloud-watcher'
})

# Create watcher instance
watcher = RemarkableCloudWatcher(session, logger)

# Initialize and watch for files
async def main():
    # Initialize - finds gpt_out folder
    if await watcher.initialize():
        # Check for new files (returns list of PDF paths)
        new_pdfs = await watcher.check_for_new_files()
        
        for pdf_path in new_pdfs:
            print(f'New PDF: {pdf_path}')
            # Process the PDF file...
    else:
        print('Failed to initialize watcher')

# Run the async function
asyncio.run(main())

Best Practices

  • Always call initialize() before check_for_new_files() to ensure the gpt_out folder is located
  • The class automatically cleans up temporary files in __del__, but for long-running processes, consider periodic cleanup
  • The processed_files set grows indefinitely - for long-running watchers, implement periodic clearing or size limits
  • Ensure the 'rmc' tool is installed for .rm file conversion (remarkable-cli package)
  • Install PyPDF2 for reliable PDF concatenation: pip install PyPDF2
  • The class uses async methods - must be called with await in an async context
  • Handle the case where initialize() returns False (gpt_out folder not found)
  • The session object must remain valid throughout the watcher's lifetime
  • Temporary files are stored in system temp directory - ensure sufficient disk space
  • For production use, implement retry logic for network failures in API calls

Similar Components

AI-powered semantic similarity - components with related functionality:

  • class RemarkableFileWatcher 78.4% similar

    A unified file watcher class that monitors a reMarkable Cloud folder for new files, supporting both REST API and rmcl client implementations with automatic client type detection.

    From: /tf/active/vicechatdev/e-ink-llm/remarkable_cloud.py
  • class RemarkableCloudManager 71.1% similar

    Unified manager for reMarkable Cloud operations that uses REST API as primary method with rmcl library as fallback, handling authentication, file operations, and folder management.

    From: /tf/active/vicechatdev/e-ink-llm/remarkable_cloud.py
  • class RemarkableRestFileWatcher 70.3% similar

    A file watcher class that monitors a specific folder on a reMarkable tablet using the REST API, polling for new files at regular intervals and triggering callbacks when new files are detected.

    From: /tf/active/vicechatdev/e-ink-llm/remarkable_rest_client.py
  • class RemarkableEInkProcessor 70.1% similar

    Enhanced E-Ink LLM Processor that extends EInkLLMProcessor with reMarkable Cloud integration, enabling file processing from both local directories and reMarkable Cloud storage.

    From: /tf/active/vicechatdev/e-ink-llm/remarkable_processor.py
  • function test_remarkable_discovery 68.8% similar

    Asynchronous test function that verifies reMarkable cloud folder discovery functionality by initializing a RemarkableCloudWatcher and attempting to locate the 'gpt_out' folder.

    From: /tf/active/vicechatdev/e-ink-llm/test_mixed_mode.py
← Back to Browse