🔍 Code Extractor

class RemarkableReplicaBuilder

Maturity: 25

Step-by-step replica builder

File:
/tf/active/vicechatdev/e-ink-llm/cloudtest/local_replica_v2.py
Lines:
52 - 884
Complexity:
moderate

Purpose

Step-by-step replica builder

Source Code

class RemarkableReplicaBuilder:
    """Step-by-step replica builder"""
    
    def __init__(self, session: requests.Session, replica_dir: str = "remarkable_replica_v2"):
        self.session = session
        self.base_url = "https://eu.tectonic.remarkable.com"
        
        # Setup directories
        self.replica_dir = Path(replica_dir).resolve()
        self.content_dir = self.replica_dir / "content"
        self.raw_dir = self.replica_dir / "raw_components"
        
        for directory in [self.replica_dir, self.content_dir, self.raw_dir]:
            directory.mkdir(parents=True, exist_ok=True)
        
        # Setup logging
        self.log_file = self.replica_dir / "build.log"
        self.setup_logging()
        
        # State
        self.nodes: Dict[str, RemarkableNode] = {}
        self.all_hashes: Set[str] = set()
        self.failed_downloads: Set[str] = set()
        self.existing_database: Optional[Dict[str, Any]] = None
        
        # Load existing database if it exists
        self._load_existing_database()
        
        # Statistics
        self.stats = {
            'total_nodes': 0,
            'folders': 0,
            'documents': 0,
            'trash_items': 0,
            'pdfs_extracted': 0,
            'rm_files_extracted': 0,
            'rm_pdfs_converted': 0,
            'total_files': 0,
            'nodes_updated': 0,
            'nodes_added': 0,
            'nodes_unchanged': 0
        }
    
    def setup_logging(self):
        """Setup logging"""
        self.logger = logging.getLogger('ReplicaBuilder')
        self.logger.setLevel(logging.DEBUG)
        self.logger.handlers.clear()
        
        # File handler
        file_handler = logging.FileHandler(self.log_file, mode='w', encoding='utf-8')
        file_handler.setLevel(logging.DEBUG)
        file_formatter = logging.Formatter(
            '%(asctime)s | %(levelname)-8s | %(message)s',
            datefmt='%Y-%m-%d %H:%M:%S'
        )
        file_handler.setFormatter(file_formatter)
        
        # Console handler
        console_handler = logging.StreamHandler()
        console_handler.setLevel(logging.INFO)
        console_formatter = logging.Formatter('%(message)s')
        console_handler.setFormatter(console_formatter)
        
        self.logger.addHandler(file_handler)
        self.logger.addHandler(console_handler)
        
        self.logger.info(f"🏗️ REMARKABLE REPLICA BUILDER (STEP-BY-STEP)")
        self.logger.info(f"📁 Replica directory: {self.replica_dir}")
    
    def _load_existing_database(self):
        """Load existing database if it exists"""
        database_file = self.replica_dir / "replica_database.json"
        
        if database_file.exists():
            try:
                with open(database_file, 'r', encoding='utf-8') as f:
                    self.existing_database = json.load(f)
                    
                existing_count = len(self.existing_database.get('nodes', {}))
                last_sync = self.existing_database.get('replica_info', {}).get('last_sync', 'unknown')
                
                self.logger.info(f"📂 Found existing database with {existing_count} nodes")
                self.logger.info(f"📅 Last sync: {last_sync}")
                
            except Exception as e:
                self.logger.warning(f"⚠️ Failed to load existing database: {e}")
                self.existing_database = None
        else:
            self.logger.info(f"📂 No existing database found - full sync will be performed")
    
    def _should_update_node(self, node_hash: str, node_uuid: str) -> bool:
        """Check if a node needs to be updated based on existing database"""
        if not self.existing_database:
            return True
            
        existing_nodes = self.existing_database.get('nodes', {})
        hash_registry = self.existing_database.get('hash_registry', {})
        
        # Check if this hash is already known
        if node_hash in hash_registry:
            existing_uuid = hash_registry[node_hash].get('uuid')
            if existing_uuid == node_uuid:
                # Same node, same hash - no update needed
                return False
        
        # Check if node exists but with different hash (updated)
        if node_uuid in existing_nodes:
            existing_hash = existing_nodes[node_uuid].get('hash')
            if existing_hash != node_hash:
                # Node exists but hash changed - update needed
                return True
            else:
                # Same hash - no update needed
                return False
        
        # New node - update needed
        return True
    
    def fetch_hash_content(self, hash_ref: str) -> Optional[Dict[str, Any]]:
        """Fetch content from reMarkable cloud by hash"""
        if hash_ref in self.failed_downloads:
            return None
            
        try:
            url = f"{self.base_url}/sync/v3/files/{hash_ref}"
            self.logger.debug(f"FETCHING: {hash_ref[:16]}...")
            
            response = self.session.get(url)
            response.raise_for_status()
            
            content = response.content
            self.logger.debug(f"  → {len(content)} bytes")
            
            return {
                'hash': hash_ref,
                'content': content,
                'size': len(content)
            }
            
        except Exception as e:
            self.logger.error(f"Failed to fetch {hash_ref[:16]}...: {e}")
            self.failed_downloads.add(hash_ref)
            return None
    
    def get_root_hash(self) -> Optional[str]:
        """Get the root hash"""
        try:
            url = f"{self.base_url}/sync/v4/root"
            self.logger.debug(f"Getting root hash from: {url}")
            response = self.session.get(url, timeout=30)
            
            if response.status_code == 200:
                data = response.json()
                root_hash = data.get('hash')
                
                self.logger.info(f"🌱 Root hash: {root_hash}")
                return root_hash
            else:
                self.logger.error(f"Root request failed with status {response.status_code}")
                return None
            
        except Exception as e:
            self.logger.error(f"Failed to get root hash: {e}")
            self.logger.debug(f"Response text: {getattr(response, 'text', 'No response')}")
            return None
    
    def parse_directory_listing(self, content: bytes) -> Dict[str, Any]:
        """Parse directory listing"""
        try:
            text_content = content.decode('utf-8')
        except UnicodeDecodeError:
            return {'child_objects': [], 'data_components': []}
        
        result = {
            'child_objects': [],
            'data_components': []
        }
        
        lines = text_content.split('\n')
        if lines and lines[0].strip().isdigit():
            lines = lines[1:]  # Skip count line
        
        import re
        entry_pattern = r'^([a-f0-9]{64}):([0-9a-fA-F]+):([a-f0-9-/]+(?:\.[^:]+)?):(\d+):(\d+)$'
        
        for line in lines:
            line = line.strip()
            if not line:
                continue
                
            match = re.match(entry_pattern, line, re.IGNORECASE)
            if match:
                hash_val, flags, uuid_component, type_val, size_val = match.groups()
                
                entry_info = {
                    'hash': hash_val,
                    'flags': flags,
                    'uuid_component': uuid_component,
                    'type': type_val,
                    'size': int(size_val)
                }
                
                if '.' in uuid_component:
                    # Data component (.content, .metadata, .pdf, .rm, etc.)
                    component_type = uuid_component.split('.')[-1]
                    if '/' in component_type:  # Handle .rm files like "uuid/filename.rm"
                        component_type = component_type.split('/')[-1]
                    entry_info['component_type'] = component_type
                    result['data_components'].append(entry_info)
                else:
                    # Child object (pure UUID)
                    result['child_objects'].append(entry_info)
        
        return result
    
    def extract_metadata(self, metadata_hash: str) -> Optional[Dict[str, Any]]:
        """Extract metadata from hash"""
        content_info = self.fetch_hash_content(metadata_hash)
        if not content_info:
            return None
        
        try:
            text_content = content_info['content'].decode('utf-8')
            return json.loads(text_content)
        except (UnicodeDecodeError, json.JSONDecodeError) as e:
            self.logger.debug(f"Failed to parse metadata {metadata_hash[:16]}...: {e}")
            return None
    
    # ================================================================
    # STEP 1: DISCOVERY PHASE
    # ================================================================
    
    def discover_all_nodes(self, root_hash: str) -> bool:
        """Step 1: Discover all nodes and collect metadata"""
        self.logger.info(f"📋 STEP 1: DISCOVERY PHASE")
        self.logger.info(f"🔍 Discovering all nodes from root...")
        
        discovered_hashes = set()
        hashes_to_process = [root_hash]
        
        while hashes_to_process:
            current_hash = hashes_to_process.pop(0)
            
            if current_hash in discovered_hashes:
                continue
                
            discovered_hashes.add(current_hash)
            self.logger.debug(f"  Processing: {current_hash[:16]}...")
            
            # Fetch and parse content
            content_info = self.fetch_hash_content(current_hash)
            if not content_info:
                continue
            
            parsed = self.parse_directory_listing(content_info['content'])
            
            # Extract metadata if available
            metadata = {}
            metadata_hash = None
            node_name = f"unknown_{current_hash[:8]}"
            node_type = "folder"
            parent_uuid = None
            
            for component in parsed['data_components']:
                if component['component_type'] == 'metadata':
                    metadata_hash = component['hash']
                    extracted_metadata = self.extract_metadata(metadata_hash)
                    if extracted_metadata:
                        metadata = extracted_metadata
                        node_name = metadata.get('visibleName', node_name)
                        if metadata.get('type') == 'DocumentType':
                            node_type = "document"
                        elif metadata.get('type') == 'CollectionType':
                            node_type = "folder"
                        parent_uuid = metadata.get('parent', '') or None
                    break
            
            # Determine node UUID
            node_uuid = None
            for component in parsed['child_objects']:
                node_uuid = component['uuid_component']
                break
            if not node_uuid and parsed['data_components']:
                component_name = parsed['data_components'][0]['uuid_component']
                if '.' in component_name:
                    node_uuid = component_name.split('.')[0]
            if not node_uuid:
                node_uuid = current_hash[:32]  # Fallback
            
            # Check if node needs updating (incremental sync)
            if self._should_update_node(current_hash, node_uuid):
                # Create node
                node = RemarkableNode(
                    uuid=node_uuid,
                    hash=current_hash,
                    name=node_name,
                    node_type=node_type,
                    parent_uuid=parent_uuid,
                    metadata=metadata
                )
                
                # Extract component hashes
                for component in parsed['data_components']:
                    comp_type = component['component_type']
                    comp_hash = component['hash']
                    
                    if comp_type == 'content':
                        node.content_hash = comp_hash
                    elif comp_type == 'metadata':
                        node.metadata_hash = comp_hash
                    elif comp_type == 'pdf':
                        node.pdf_hash = comp_hash
                    elif comp_type == 'pagedata':
                        node.pagedata_hash = comp_hash
                    elif comp_type == 'rm' or comp_type.endswith('.rm'):
                        node.rm_hashes.append(comp_hash)
                
                # Store node
                self.nodes[node_uuid] = node
                self.stats['nodes_added'] += 1
                
                self.logger.debug(f"    → NEW/UPDATED {node_type}: {node_name} (parent: {parent_uuid or 'ROOT'})")
            else:
                # Node unchanged - load from existing database
                if self.existing_database and node_uuid in self.existing_database.get('nodes', {}):
                    existing_node_data = self.existing_database['nodes'][node_uuid]
                    
                    node = RemarkableNode(
                        uuid=existing_node_data['uuid'],
                        hash=existing_node_data['hash'],
                        name=existing_node_data['name'],
                        node_type=existing_node_data['node_type'],
                        parent_uuid=existing_node_data['parent_uuid'],
                        metadata=existing_node_data['metadata']
                    )
                    
                    # Restore component hashes
                    comp_hashes = existing_node_data.get('component_hashes', {})
                    node.content_hash = comp_hashes.get('content')
                    node.metadata_hash = comp_hashes.get('metadata')
                    node.pdf_hash = comp_hashes.get('pdf')
                    node.pagedata_hash = comp_hashes.get('pagedata')
                    node.rm_hashes = comp_hashes.get('rm_files', [])
                    
                    # Restore paths and files
                    node.local_path = existing_node_data.get('local_path', '')
                    node.extracted_files = existing_node_data.get('extracted_files', [])
                    
                    self.nodes[node_uuid] = node
                    self.stats['nodes_unchanged'] += 1
                
                self.logger.debug(f"    → UNCHANGED {node_type}: {node_name}")
            
            self.stats['total_nodes'] += 1
            
            if node_type == "folder":
                self.stats['folders'] += 1
            else:
                self.stats['documents'] += 1
            
            # Track trash items
            if parent_uuid == 'trash':
                self.stats['trash_items'] += 1
            
            # Add child hashes to process
            for child_obj in parsed['child_objects']:
                if child_obj['hash'] not in discovered_hashes:
                    hashes_to_process.append(child_obj['hash'])
        
        self.logger.info(f"✅ Discovery complete: {len(self.nodes)} nodes found")
        self.logger.info(f"   📂 Folders: {self.stats['folders']}")
        self.logger.info(f"   📄 Documents: {self.stats['documents']}")
        self.logger.info(f"   🗑️ Trash items: {self.stats['trash_items']}")
        self.logger.info(f"   🆕 New/Updated: {self.stats['nodes_added']}")
        self.logger.info(f"   ✅ Unchanged: {self.stats['nodes_unchanged']}")
        
        return True
    
    # ================================================================
    # STEP 2: HIERARCHY PHASE
    # ================================================================
    
    def build_folder_structure(self) -> bool:
        """Step 2: Build correct folder structure based on parent UUIDs"""
        self.logger.info(f"\n📁 STEP 2: HIERARCHY PHASE")
        self.logger.info(f"🏗️ Building folder structure...")
        
        # Create special trash folder
        trash_folder = self.content_dir / "trash"
        trash_folder.mkdir(parents=True, exist_ok=True)
        self.logger.info(f"🗑️ Created trash folder: {trash_folder}")
        
        # Find root nodes (nodes with no parent or empty parent)
        root_nodes = []
        trash_nodes = []
        for uuid, node in self.nodes.items():
            if node.parent_uuid == 'trash':
                trash_nodes.append(node)
            elif not node.parent_uuid:
                root_nodes.append(node)
        
        self.logger.info(f"📍 Found {len(root_nodes)} root nodes")
        self.logger.info(f"🗑️ Found {len(trash_nodes)} trash nodes")
        
        # Build paths recursively from root
        for root_node in root_nodes:
            self._build_node_paths(root_node, str(self.content_dir))
        
        # Build paths for trash nodes
        for trash_node in trash_nodes:
            self._build_node_paths(trash_node, str(trash_folder))
        
        # Create all folder directories
        for uuid, node in self.nodes.items():
            if node.node_type == "folder" and node.local_path:
                Path(node.local_path).mkdir(parents=True, exist_ok=True)
                self.logger.debug(f"📁 Created: {node.local_path}")
        
        self.logger.info(f"✅ Folder structure built")
        return True
    
    def _build_node_paths(self, node: RemarkableNode, parent_path: str):
        """Recursively build paths for node and its children"""
        # Sanitize name for filesystem
        safe_name = "".join(c for c in node.name if c.isalnum() or c in (' ', '-', '_', '.')).rstrip()
        if not safe_name:
            safe_name = f"unnamed_{node.uuid[:8]}"
        
        # Set local path
        node.local_path = str(Path(parent_path) / safe_name)
        
        # Log with special indication for trash items
        if node.parent_uuid == 'trash':
            self.logger.debug(f"  🗑️ Trash Path: {node.name} → {node.local_path}")
        else:
            self.logger.debug(f"  Path: {node.name} → {node.local_path}")
        
        # Process children - both normal UUID children and trash children
        for child_uuid, child_node in self.nodes.items():
            if child_node.parent_uuid == node.uuid:
                self._build_node_paths(child_node, node.local_path)
    
    # ================================================================
    # STEP 3: EXTRACTION PHASE  
    # ================================================================
    
    def extract_all_files(self) -> bool:
        """Step 3: Extract PDFs and .rm files to correct locations"""
        self.logger.info(f"\n📎 STEP 3: EXTRACTION PHASE")
        self.logger.info(f"⬇️ Extracting files to correct locations...")
        
        nodes_to_process = []
        for uuid, node in self.nodes.items():
            if node.node_type == "document":
                # Only process if node is new/updated (has no extracted files from database)
                if not node.extracted_files or len(node.extracted_files) == 0:
                    nodes_to_process.append(node)
        
        if nodes_to_process:
            self.logger.info(f"🔄 Processing {len(nodes_to_process)} new/updated documents...")
            
            for node in nodes_to_process:
                self._extract_node_files(node)
        else:
            self.logger.info(f"✅ No new documents to process - all files up to date")
        
        self.logger.info(f"✅ File extraction complete")
        self.logger.info(f"   📄 PDFs extracted: {self.stats['pdfs_extracted']}")
        self.logger.info(f"   🖊️ RM files extracted: {self.stats['rm_files_extracted']}")
        self.logger.info(f"   📄 RM→PDF conversions: {self.stats['rm_pdfs_converted']}")
        
        return True
    
    def _extract_node_files(self, node: RemarkableNode):
        """Extract files for a document node"""
        if not node.local_path:
            self.logger.warning(f"No local path for {node.name}")
            return
        
        # Ensure parent directory exists
        parent_dir = Path(node.local_path).parent
        parent_dir.mkdir(parents=True, exist_ok=True)
        
        # Extract PDF if available - this goes directly to the folder structure
        if node.pdf_hash:
            pdf_path = Path(node.local_path).with_suffix('.pdf')
            if self._extract_pdf(node.pdf_hash, pdf_path):
                node.extracted_files.append(str(pdf_path))
                self.stats['pdfs_extracted'] += 1
                self.logger.debug(f"  📄 PDF: {pdf_path}")
        
        # Extract .rm files if available - these get converted to PDF
        if node.rm_hashes:
            # Create temporary notebook subdirectory for processing
            notebook_dir = parent_dir / f"{Path(node.local_path).stem}_temp_notebook"
            notebook_dir.mkdir(exist_ok=True)
            
            # Extract .rm files to temporary directory
            for i, rm_hash in enumerate(node.rm_hashes):
                rm_path = notebook_dir / f"page_{i+1}.rm"
                if self._extract_rm_file(rm_hash, rm_path):
                    self.stats['rm_files_extracted'] += 1
                    self.logger.debug(f"  🖊️ RM: {rm_path}")
            
            # Convert .rm files to PDF (this places the PDF in the correct location)
            self._convert_notebook_to_pdf(node, notebook_dir)
            
            # Clean up temporary notebook directory after conversion
            import shutil
            shutil.rmtree(notebook_dir, ignore_errors=True)
        
        # Store metadata components in node for database (don't extract to filesystem)
        if node.content_hash:
            content_info = self.fetch_hash_content(node.content_hash)
            if content_info:
                try:
                    node.metadata['content_data'] = content_info['content'].decode('utf-8')
                except UnicodeDecodeError:
                    node.metadata['content_data'] = f"<binary data: {len(content_info['content'])} bytes>"
        
        if node.pagedata_hash:
            pagedata_info = self.fetch_hash_content(node.pagedata_hash)  
            if pagedata_info:
                try:
                    node.metadata['pagedata_data'] = pagedata_info['content'].decode('utf-8')
                except UnicodeDecodeError:
                    node.metadata['pagedata_data'] = f"<binary data: {len(pagedata_info['content'])} bytes>"
    
    def _extract_pdf(self, pdf_hash: str, target_path: Path) -> bool:
        """Extract PDF file"""
        content_info = self.fetch_hash_content(pdf_hash)
        if not content_info:
            return False
        
        try:
            with open(target_path, 'wb') as f:
                f.write(content_info['content'])
            return True
        except Exception as e:
            self.logger.error(f"Failed to write PDF {target_path}: {e}")
            return False
    
    def _extract_rm_file(self, rm_hash: str, target_path: Path) -> bool:
        """Extract .rm file"""
        content_info = self.fetch_hash_content(rm_hash)
        if not content_info:
            return False
        
        try:
            with open(target_path, 'wb') as f:
                f.write(content_info['content'])
            return True
        except Exception as e:
            self.logger.error(f"Failed to write RM file {target_path}: {e}")
            return False
    
    def _extract_component(self, comp_hash: str, target_path: Path) -> bool:
        """Extract other component"""
        content_info = self.fetch_hash_content(comp_hash)
        if not content_info:
            return False
        
        try:
            with open(target_path, 'wb') as f:
                f.write(content_info['content'])
            return True
        except Exception as e:
            self.logger.error(f"Failed to write component {target_path}: {e}")
            return False
    
    def _convert_notebook_to_pdf(self, node: RemarkableNode, notebook_dir: Path):
        """Convert reMarkable notebook files to PDF using rmc and concatenate pages"""
        try:
            import subprocess
            
            # Find all .rm files in the notebook directory
            rm_files = sorted(notebook_dir.glob("page_*.rm"), key=lambda x: int(x.stem.split('_')[1]))
            if not rm_files:
                self.logger.debug(f"  ⚠️ No .rm files found for {node.name}")
                return
            
            # Final PDF should be placed at the same level as notebook folder, named after the node
            parent_dir = notebook_dir.parent
            final_pdf_path = parent_dir / f"{node.name}.pdf"
            
            if len(rm_files) == 1:
                # Single page - convert directly
                try:
                    result = subprocess.run([
                        "rmc", str(rm_files[0]), "-o", str(final_pdf_path)
                    ], capture_output=True, text=True, timeout=60)
                    
                    if result.returncode == 0 and final_pdf_path.exists() and final_pdf_path.stat().st_size > 0:
                        node.extracted_files.append(str(final_pdf_path))
                        self.logger.debug(f"  📄 Converted single page to PDF: {final_pdf_path}")
                        self.stats['rm_pdfs_converted'] += 1
                    else:
                        self.logger.debug(f"  ⚠️ rmc conversion failed: {result.stderr}")
                
                except (subprocess.TimeoutExpired, Exception) as e:
                    self.logger.debug(f"  ⚠️ rmc conversion error: {e}")
            
            else:
                # Multiple pages - convert each to temporary PDF and concatenate
                temp_pdfs = []
                conversion_success = True
                
                for i, rm_file in enumerate(rm_files):
                    temp_pdf = notebook_dir / f"temp_page_{i+1}.pdf"
                    
                    try:
                        result = subprocess.run([
                            "rmc", str(rm_file), "-o", str(temp_pdf)
                        ], capture_output=True, text=True, timeout=60)
                        
                        if result.returncode == 0 and temp_pdf.exists() and temp_pdf.stat().st_size > 0:
                            temp_pdfs.append(temp_pdf)
                            self.logger.debug(f"  📄 Converted page {i+1}")
                        else:
                            self.logger.debug(f"  ⚠️ rmc conversion failed for page {i+1}: {result.stderr}")
                            conversion_success = False
                            break
                    
                    except (subprocess.TimeoutExpired, Exception) as e:
                        self.logger.debug(f"  ⚠️ rmc conversion error for page {i+1}: {e}")
                        conversion_success = False
                        break
                
                # Concatenate PDFs if all conversions succeeded
                if conversion_success and temp_pdfs:
                    try:
                        # Use PyPDF2 to concatenate PDFs
                        import PyPDF2
                        
                        pdf_writer = PyPDF2.PdfWriter()
                        
                        for temp_pdf in temp_pdfs:
                            with open(temp_pdf, 'rb') as pdf_file:
                                pdf_reader = PyPDF2.PdfReader(pdf_file)
                                for page in pdf_reader.pages:
                                    pdf_writer.add_page(page)
                        
                        # Write the concatenated PDF
                        with open(final_pdf_path, 'wb') as output_file:
                            pdf_writer.write(output_file)
                        
                        if final_pdf_path.exists() and final_pdf_path.stat().st_size > 0:
                            node.extracted_files.append(str(final_pdf_path))
                            self.logger.debug(f"  📄 Concatenated {len(temp_pdfs)} pages to PDF: {final_pdf_path}")
                            self.stats['rm_pdfs_converted'] += 1
                        
                    except ImportError:
                        # Fallback: use system tools to concatenate if PyPDF2 not available
                        self.logger.debug(f"  ⚠️ PyPDF2 not available, using first page only")
                        if temp_pdfs:
                            import shutil
                            shutil.copy2(temp_pdfs[0], final_pdf_path)
                            node.extracted_files.append(str(final_pdf_path))
                            self.stats['rm_pdfs_converted'] += 1
                    
                    except Exception as e:
                        self.logger.debug(f"  ⚠️ PDF concatenation failed: {e}")
                    
                    finally:
                        # Clean up temporary files
                        for temp_pdf in temp_pdfs:
                            temp_pdf.unlink(missing_ok=True)
        
        except Exception as e:
            self.logger.debug(f"  ⚠️ PDF conversion error for {node.name}: {e}")
    
    # ================================================================
    # MAIN BUILD PROCESS
    # ================================================================
    
    def build_complete_replica(self) -> bool:
        """Build complete replica using 3-step process"""
        self.logger.info(f"🚀 STARTING 3-STEP REPLICA BUILD")
        
        # Get root hash
        root_hash = self.get_root_hash()
        if not root_hash:
            self.logger.error("❌ Failed to get root hash")
            return False
        
        # Step 1: Discovery
        if not self.discover_all_nodes(root_hash):
            self.logger.error("❌ Discovery phase failed")
            return False
        
        # Step 2: Hierarchy
        if not self.build_folder_structure():
            self.logger.error("❌ Hierarchy phase failed") 
            return False
        
        # Step 3: Extraction
        if not self.extract_all_files():
            self.logger.error("❌ Extraction phase failed")
            return False
        
        # Save database
        self._save_database()
        
        # Final report
        self.logger.info(f"\n🎉 REPLICA BUILD COMPLETED!")
        self.logger.info(f"📊 FINAL STATISTICS:")
        self.logger.info(f"  📁 Total nodes: {self.stats['total_nodes']}")
        self.logger.info(f"  📂 Folders: {self.stats['folders']}")
        self.logger.info(f"  📄 Documents: {self.stats['documents']}")
        self.logger.info(f"  �️ Trash items: {self.stats['trash_items']}")
        self.logger.info(f"  �📄 PDFs extracted: {self.stats['pdfs_extracted']}")
        self.logger.info(f"  🖊️ RM files extracted: {self.stats['rm_files_extracted']}")
        self.logger.info(f"  📄 RM→PDF conversions: {self.stats['rm_pdfs_converted']}")
        self.logger.info(f"  ❌ Failed downloads: {len(self.failed_downloads)}")
        self.logger.info(f"\n📁 Replica location: {self.replica_dir}")
        self.logger.info(f"🗑️ Trash location: {self.replica_dir}/content/trash")
        
        return True
    
    def _save_database(self):
        """Save the comprehensive replica database"""
        database = {
            'replica_info': {
                'created': datetime.now().isoformat(),
                'last_sync': datetime.now().isoformat(),
                'replica_dir': str(self.replica_dir),
                'total_nodes': len(self.nodes),
                'statistics': self.stats,
                'version': "2.0"
            },
            'nodes': {},
            'hash_registry': {},  # For tracking file changes
            'failed_downloads': list(self.failed_downloads)
        }
        
        # Create detailed node entries
        for uuid, node in self.nodes.items():
            node_data = {
                'uuid': node.uuid,
                'hash': node.hash,
                'name': node.name,
                'node_type': node.node_type,
                'parent_uuid': node.parent_uuid,
                'local_path': node.local_path,
                'extracted_files': node.extracted_files,
                
                # Component hashes for sync tracking
                'component_hashes': {
                    'content': node.content_hash,
                    'metadata': node.metadata_hash,
                    'pdf': node.pdf_hash,
                    'pagedata': node.pagedata_hash,
                    'rm_files': node.rm_hashes
                },
                
                # Full metadata including content and pagedata
                'metadata': node.metadata,
                
                # Timestamps
                'last_modified': node.metadata.get('lastModified', ''),
                'version': node.metadata.get('version', 0),
                
                # Sync status
                'sync_status': 'current',
                'last_synced': datetime.now().isoformat()
            }
            
            database['nodes'][uuid] = node_data
            
            # Add to hash registry for quick lookup
            database['hash_registry'][node.hash] = {
                'uuid': uuid,
                'type': 'node',
                'last_seen': datetime.now().isoformat()
            }
            
            # Add component hashes to registry
            for comp_type, comp_hash in node_data['component_hashes'].items():
                if comp_hash:
                    if isinstance(comp_hash, list):
                        for i, h in enumerate(comp_hash):
                            database['hash_registry'][h] = {
                                'uuid': uuid,
                                'type': f'{comp_type}_{i}',
                                'last_seen': datetime.now().isoformat()
                            }
                    else:
                        database['hash_registry'][comp_hash] = {
                            'uuid': uuid,
                            'type': comp_type,
                            'last_seen': datetime.now().isoformat()
                        }
        
        database_file = self.replica_dir / "replica_database.json"
        with open(database_file, 'w', encoding='utf-8') as f:
            json.dump(database, f, indent=2, ensure_ascii=False)
        
        self.logger.info(f"💾 Database saved: {database_file}")
        
        # Also create a human-readable summary
        summary_file = self.replica_dir / "replica_summary.txt"
        with open(summary_file, 'w', encoding='utf-8') as f:
            f.write(f"reMarkable Replica Summary\n")
            f.write(f"=" * 50 + "\n")
            f.write(f"Created: {database['replica_info']['created']}\n")
            f.write(f"Location: {database['replica_info']['replica_dir']}\n")
            f.write(f"Total Nodes: {database['replica_info']['total_nodes']}\n")
            f.write(f"Statistics: {database['replica_info']['statistics']}\n\n")
            
            f.write(f"Folder Structure:\n")
            f.write(f"-" * 20 + "\n")
            
            # Write folder structure
            def write_node_tree(uuid, indent=0):
                if uuid not in self.nodes:
                    return
                node = self.nodes[uuid]
                prefix = "  " * indent
                icon = "📁" if node.node_type == "folder" else "📄"
                f.write(f"{prefix}{icon} {node.name}\n")
                
                # Find children
                for child_uuid, child_node in self.nodes.items():
                    if child_node.parent_uuid == uuid:
                        write_node_tree(child_uuid, indent + 1)
            
            # Write root nodes
            for uuid, node in self.nodes.items():
                if not node.parent_uuid:
                    write_node_tree(uuid)
        
        self.logger.info(f"📄 Summary saved: {summary_file}")

Parameters

Name Type Default Kind
bases - -

Parameter Details

bases: Parameter of type

Return Value

Returns unspecified type

Class Interface

Methods

__init__(self, session, replica_dir)

Purpose: Internal method: init

Parameters:

  • session: Type: requests.Session
  • replica_dir: Type: str

Returns: None

setup_logging(self)

Purpose: Setup logging

Returns: None

_load_existing_database(self)

Purpose: Load existing database if it exists

Returns: None

_should_update_node(self, node_hash, node_uuid) -> bool

Purpose: Check if a node needs to be updated based on existing database

Parameters:

  • node_hash: Type: str
  • node_uuid: Type: str

Returns: Returns bool

fetch_hash_content(self, hash_ref) -> Optional[Dict[str, Any]]

Purpose: Fetch content from reMarkable cloud by hash

Parameters:

  • hash_ref: Type: str

Returns: Returns Optional[Dict[str, Any]]

get_root_hash(self) -> Optional[str]

Purpose: Get the root hash

Returns: Returns Optional[str]

parse_directory_listing(self, content) -> Dict[str, Any]

Purpose: Parse directory listing

Parameters:

  • content: Type: bytes

Returns: Returns Dict[str, Any]

extract_metadata(self, metadata_hash) -> Optional[Dict[str, Any]]

Purpose: Extract metadata from hash

Parameters:

  • metadata_hash: Type: str

Returns: Returns Optional[Dict[str, Any]]

discover_all_nodes(self, root_hash) -> bool

Purpose: Step 1: Discover all nodes and collect metadata

Parameters:

  • root_hash: Type: str

Returns: Returns bool

build_folder_structure(self) -> bool

Purpose: Step 2: Build correct folder structure based on parent UUIDs

Returns: Returns bool

_build_node_paths(self, node, parent_path)

Purpose: Recursively build paths for node and its children

Parameters:

  • node: Type: RemarkableNode
  • parent_path: Type: str

Returns: None

extract_all_files(self) -> bool

Purpose: Step 3: Extract PDFs and .rm files to correct locations

Returns: Returns bool

_extract_node_files(self, node)

Purpose: Extract files for a document node

Parameters:

  • node: Type: RemarkableNode

Returns: None

_extract_pdf(self, pdf_hash, target_path) -> bool

Purpose: Extract PDF file

Parameters:

  • pdf_hash: Type: str
  • target_path: Type: Path

Returns: Returns bool

_extract_rm_file(self, rm_hash, target_path) -> bool

Purpose: Extract .rm file

Parameters:

  • rm_hash: Type: str
  • target_path: Type: Path

Returns: Returns bool

_extract_component(self, comp_hash, target_path) -> bool

Purpose: Extract other component

Parameters:

  • comp_hash: Type: str
  • target_path: Type: Path

Returns: Returns bool

_convert_notebook_to_pdf(self, node, notebook_dir)

Purpose: Convert reMarkable notebook files to PDF using rmc and concatenate pages

Parameters:

  • node: Type: RemarkableNode
  • notebook_dir: Type: Path

Returns: None

build_complete_replica(self) -> bool

Purpose: Build complete replica using 3-step process

Returns: Returns bool

_save_database(self)

Purpose: Save the comprehensive replica database

Returns: None

Required Imports

import os
import json
import requests
import logging
from pathlib import Path

Usage Example

# Example usage:
# result = RemarkableReplicaBuilder(bases)

Similar Components

AI-powered semantic similarity - components with related functionality:

  • class RemarkableLocalReplica 64.4% similar

    Builds and maintains a complete local replica of reMarkable cloud

    From: /tf/active/vicechatdev/e-ink-llm/cloudtest/local_replica.py
  • class RemarkableReplicaSync_v1 63.8% similar

    Standalone replica synchronization using proven local_replica_v2 approach

    From: /tf/active/vicechatdev/e-ink-llm/cloudtest/sync_replica.py
  • class RemarkableReplicaSync 57.9% similar

    A class that synchronizes reMarkable cloud documents to a local replica directory, downloading and organizing folders and documents in a hierarchical structure.

    From: /tf/active/vicechatdev/e-ink-llm/cloudtest/sync_replica_new.py
  • function main_v61 51.8% similar

    Entry point function that authenticates with Remarkable cloud service and builds a complete local replica of the user's Remarkable documents and notebooks.

    From: /tf/active/vicechatdev/e-ink-llm/cloudtest/local_replica_v2.py
  • function test_complete_replica_build 50.7% similar

    Tests the complete local replica build process for a reMarkable device by creating a local copy of all content including folders, documents, notebooks, and PDFs with comprehensive statistics and logging.

    From: /tf/active/vicechatdev/e-ink-llm/cloudtest/test_complete_suite.py
← Back to Browse