class PylontechMover
Moves 'pylontech' document from 'Myfolder' to 'Otherfolder' using the working upload mechanism
/tf/active/vicechatdev/e-ink-llm/cloudtest/test_move_pylontech_fixed.py
54 - 796
moderate
Purpose
Moves 'pylontech' document from 'Myfolder' to 'Otherfolder' using the working upload mechanism
Source Code
class PylontechMover:
"""Moves 'pylontech' document from 'Myfolder' to 'Otherfolder' using the working upload mechanism"""
def __init__(self):
# Load auth session
auth = RemarkableAuth()
self.session = auth.get_authenticated_session()
if not self.session:
raise RuntimeError("Failed to authenticate with reMarkable")
print("š Pylontech Document Mover Initialized")
def parse_directory_listing(self, content: bytes):
"""Parse directory listing using the correct reMarkable format (from local_replica_v2.py)"""
try:
text_content = content.decode('utf-8')
except UnicodeDecodeError:
return {'child_objects': [], 'data_components': []}
result = {
'child_objects': [],
'data_components': []
}
lines = text_content.split('\n')
if lines and lines[0].strip().isdigit():
lines = lines[1:] # Skip count line
entry_pattern = r'^([a-f0-9]{64}):([0-9a-fA-F]+):([a-f0-9-/]+(?:\.[^:]+)?):(\d+):(\d+)$'
for line in lines:
line = line.strip()
if not line:
continue
match = re.match(entry_pattern, line, re.IGNORECASE)
if match:
hash_val, flags, uuid_component, type_val, size_val = match.groups()
entry_info = {
'hash': hash_val,
'flags': flags,
'uuid_component': uuid_component,
'type': type_val,
'size': int(size_val)
}
if '.' in uuid_component:
# Data component (.content, .metadata, .pdf, .rm, etc.)
component_type = uuid_component.split('.')[-1]
if '/' in component_type: # Handle .rm files like "uuid/filename.rm"
component_type = component_type.split('/')[-1]
entry_info['component_type'] = component_type
result['data_components'].append(entry_info)
else:
# Child object (pure UUID)
result['child_objects'].append(entry_info)
return result
def fetch_hash_content(self, hash_ref: str):
"""Fetch content from reMarkable cloud by hash"""
try:
url = f"https://eu.tectonic.remarkable.com/sync/v3/files/{hash_ref}"
response = self.session.get(url)
response.raise_for_status()
return {
'hash': hash_ref,
'content': response.content,
'size': len(response.content)
}
except Exception as e:
print(f"ā Failed to fetch {hash_ref[:16]}...: {e}")
return None
def extract_metadata(self, metadata_hash: str):
"""Extract metadata from hash"""
content_info = self.fetch_hash_content(metadata_hash)
if not content_info:
return None
try:
text_content = content_info['content'].decode('utf-8')
return json.loads(text_content)
except (UnicodeDecodeError, json.JSONDecodeError) as e:
print(f"ā Failed to parse metadata {metadata_hash[:16]}...: {e}")
return None
def analyze_cloud_state(self):
"""Analyze current cloud state using proper discovery method from local_replica_v2.py"""
print("\nš STEP 1: ANALYZING CLOUD STATE")
print("=" * 50)
# Get current root info
root_response = self.session.get("https://eu.tectonic.remarkable.com/sync/v4/root")
root_response.raise_for_status()
root_data = root_response.json()
# Get root content
root_content_response = self.session.get(f"https://eu.tectonic.remarkable.com/sync/v3/files/{root_data['hash']}")
root_content_response.raise_for_status()
root_content = root_content_response.text
print(f"ā
Current root hash: {root_data['hash']}")
print(f"ā
Current generation: {root_data.get('generation')}")
print(f"ā
Root content size: {len(root_content)} bytes")
# Use proper discovery method like local_replica_v2.py
all_nodes = {}
discovered_hashes = set()
hashes_to_process = [root_data['hash']]
print(f"\nš Discovering all nodes from root...")
while hashes_to_process:
current_hash = hashes_to_process.pop(0)
if current_hash in discovered_hashes:
continue
discovered_hashes.add(current_hash)
print(f" Processing: {current_hash[:16]}...")
# Fetch and parse content
content_info = self.fetch_hash_content(current_hash)
if not content_info:
continue
parsed = self.parse_directory_listing(content_info['content'])
# Extract metadata if available
metadata = {}
metadata_hash = None
node_name = f"unknown_{current_hash[:8]}"
node_type = "folder"
parent_uuid = None
for component in parsed['data_components']:
if component['component_type'] == 'metadata':
metadata_hash = component['hash']
extracted_metadata = self.extract_metadata(metadata_hash)
if extracted_metadata:
metadata = extracted_metadata
node_name = metadata.get('visibleName', node_name)
if metadata.get('type') == 'DocumentType':
node_type = "document"
elif metadata.get('type') == 'CollectionType':
node_type = "folder"
parent_uuid = metadata.get('parent', '') or None
break
# Determine node UUID
node_uuid = None
for component in parsed['child_objects']:
node_uuid = component['uuid_component']
break
if not node_uuid and parsed['data_components']:
component_name = parsed['data_components'][0]['uuid_component']
if '.' in component_name:
node_uuid = component_name.split('.')[0]
if not node_uuid:
node_uuid = current_hash[:32] # Fallback
# Store node
all_nodes[node_uuid] = {
'uuid': node_uuid,
'hash': current_hash,
'name': node_name,
'node_type': node_type,
'parent_uuid': parent_uuid,
'metadata': metadata,
'metadata_hash': metadata_hash
}
print(f" ā {node_type.upper()}: {node_name} (parent: {parent_uuid or 'ROOT'})")
# Add child hashes to process
for child_obj in parsed['child_objects']:
if child_obj['hash'] not in discovered_hashes:
hashes_to_process.append(child_obj['hash'])
# Organize by type
documents = {uuid: node for uuid, node in all_nodes.items() if node['node_type'] == 'document'}
folders = {uuid: node for uuid, node in all_nodes.items() if node['node_type'] == 'folder'}
print(f"ā
Found {len(documents)} documents and {len(folders)} folders")
return root_data, root_content, documents, folders
def find_target_items(self, documents, folders):
"""Find pylontech document, Myfolder, and Otherfolder"""
print(f"\nšÆ STEP 2: FINDING TARGET ITEMS")
print("-" * 30)
print(f"š Available documents:")
for doc_uuid, doc_data in documents.items():
print(f" - {doc_data['name']} (UUID: {doc_uuid[:8]}...) in parent: {doc_data['parent_uuid'] or 'root'}")
print(f"\nš Available folders:")
for folder_uuid, folder_data in folders.items():
print(f" - {folder_data['name']} (UUID: {folder_uuid[:8]}...) in parent: {folder_data['parent_uuid'] or 'root'}")
# Find pylontech document
pylontech_doc = None
pylontech_uuid = None
for doc_uuid, doc_data in documents.items():
if 'pylontech' in doc_data['name'].lower():
pylontech_doc = doc_data
pylontech_uuid = doc_uuid
break
if not pylontech_doc:
print("ā Could not find 'pylontech' document")
return None, None, None, None, None, None
# Find Myfolder
myfolder_uuid = None
myfolder_data = None
for folder_uuid, folder_data in folders.items():
if 'myfolder' in folder_data['name'].lower():
myfolder_uuid = folder_uuid
myfolder_data = folder_data
break
if not myfolder_data:
print("ā Could not find 'Myfolder' folder")
return None, None, None, None, None, None
# Find Otherfolder
otherfolder_uuid = None
otherfolder_data = None
for folder_uuid, folder_data in folders.items():
if 'otherfolder' in folder_data['name'].lower():
otherfolder_uuid = folder_uuid
otherfolder_data = folder_data
break
if not otherfolder_data:
print("ā Could not find 'Otherfolder' folder")
return None, None, None, None, None, None
# Verify pylontech is currently in Myfolder
if pylontech_doc['parent_uuid'] != myfolder_uuid:
print(f"ā ļø WARNING: 'pylontech' document is not currently in 'Myfolder'")
print(f" Current parent: {pylontech_doc['parent_uuid']}")
print(f" Expected parent (Myfolder): {myfolder_uuid}")
print(f" Will proceed with move from current location to Otherfolder")
print(f"ā
Found all target items:")
print(f" š Document: {pylontech_doc['name']} (UUID: {pylontech_uuid[:8]}...)")
print(f" š Source: {myfolder_data['name']} (UUID: {myfolder_uuid[:8]}...)")
print(f" š Target: {otherfolder_data['name']} (UUID: {otherfolder_uuid[:8]}...)")
return pylontech_uuid, pylontech_doc, myfolder_uuid, myfolder_data, otherfolder_uuid, otherfolder_data
def get_document_schema(self, doc_hash: str):
"""Retrieve document's docSchema"""
print(f"\nš STEP 3: RETRIEVING DOCUMENT SCHEMA")
print("-" * 30)
doc_response = self.session.get(f"https://eu.tectonic.remarkable.com/sync/v3/files/{doc_hash}")
doc_response.raise_for_status()
doc_content = doc_response.text
print(f"ā
Document docSchema size: {len(doc_content)} bytes")
print(f"š Document docSchema content:")
lines = doc_content.strip().split('\n')
for i, line in enumerate(lines):
print(f" Line {i}: {line}")
return doc_content, lines
def get_current_metadata(self, doc_lines: list):
"""Extract and fetch current metadata"""
print(f"\nš STEP 4: GETTING CURRENT METADATA")
print("-" * 30)
metadata_hash = None
metadata_line = None
# Find metadata component
for line in doc_lines[1:]: # Skip version
if ':' in line and '.metadata' in line:
parts = line.split(':')
if len(parts) >= 5:
metadata_hash = parts[0]
metadata_line = line
break
if not metadata_hash:
raise ValueError("Metadata component not found in document schema")
print(f"ā
Metadata hash: {metadata_hash}")
# Fetch current metadata
metadata_response = self.session.get(f"https://eu.tectonic.remarkable.com/sync/v3/files/{metadata_hash}")
metadata_response.raise_for_status()
current_metadata = json.loads(metadata_response.text)
print(f"ā
Current metadata:")
for key, value in current_metadata.items():
print(f" {key}: {value}")
return current_metadata, metadata_line
def create_updated_metadata(self, current_metadata: dict, new_parent_uuid: str, old_parent_uuid: str):
"""Create updated metadata with new parent"""
print(f"\nš STEP 5: CREATING UPDATED METADATA")
print("-" * 30)
# Copy current metadata and update parent
updated_metadata = current_metadata.copy()
updated_metadata['parent'] = new_parent_uuid
print(f"ā
Updating parent:")
print(f" Old parent: {old_parent_uuid}")
print(f" New parent: {new_parent_uuid}")
# Add/update source field to match real app documents
updated_metadata['source'] = 'com.remarkable.macos'
print(f"ā
Setting 'source' field: com.remarkable.macos")
# Fix lastOpened to match real app behavior
if 'lastOpened' in updated_metadata and updated_metadata['lastOpened'] != 0:
updated_metadata['lastOpened'] = 0
print(f"ā
Setting lastOpened to 0 (real app behavior)")
# Make metadata match real app behavior for moves
updated_metadata['lastModified'] = int(time.time() * 1000)
updated_metadata['metadatamodified'] = False
updated_metadata['modified'] = False
# Convert to JSON
updated_metadata_json = json.dumps(updated_metadata, separators=(',', ':'))
print(f"ā
Updated metadata ({len(updated_metadata_json)} bytes):")
print(f" {updated_metadata_json[:100]}...")
return updated_metadata_json
def upload_new_metadata(self, metadata_json: str, doc_uuid: str):
"""Upload new metadata and return hash"""
print(f"\nā¬ļø STEP 6: UPLOADING NEW METADATA")
print("-" * 30)
# Calculate hash
metadata_hash = hashlib.sha256(metadata_json.encode()).hexdigest()
print(f"ā
New metadata hash: {metadata_hash}")
# Upload using working method from test_move_from_trash.py
headers = {
'Content-Type': 'application/octet-stream',
'rm-batch-number': '1',
'rm-filename': f'{doc_uuid}.metadata',
'rm-sync-id': str(uuid.uuid4()),
'User-Agent': 'reMarkable-desktop-win/3.11.1.1951',
'Accept-Encoding': 'gzip, deflate',
'Accept-Language': 'en-BE,*',
'Connection': 'Keep-Alive'
}
# Add CRC32C checksum
crc32c_header = compute_crc32c_header(metadata_json.encode())
if crc32c_header:
headers['x-goog-hash'] = crc32c_header
upload_response = self.session.put(
f"https://eu.tectonic.remarkable.com/sync/v3/files/{metadata_hash}",
data=metadata_json.encode(),
headers=headers
)
print(f"ā
Metadata upload response: {upload_response.status_code}")
if upload_response.status_code not in [200, 202]:
print(f"ā Upload failed: {upload_response.text}")
raise RuntimeError(f"Metadata upload failed: {upload_response.status_code}")
return metadata_hash
def upload_real_pagedata(self, doc_uuid: str):
"""Upload real pagedata (newline) to match real app documents"""
print(f"\nā¬ļø STEP 7: UPLOADING REAL PAGEDATA")
print("-" * 30)
# Real app pagedata is just a newline character
pagedata_content = "\n"
pagedata_hash = hashlib.sha256(pagedata_content.encode()).hexdigest()
print(f"ā
Real pagedata hash: {pagedata_hash}")
print(f"ā
Real pagedata content: {repr(pagedata_content)} ({len(pagedata_content)} bytes)")
# Upload pagedata using working method
headers = {
'Content-Type': 'application/octet-stream',
'rm-batch-number': '1',
'rm-filename': f'{doc_uuid}.pagedata',
'rm-sync-id': str(uuid.uuid4()),
'User-Agent': 'reMarkable-desktop-win/3.11.1.1951',
'Accept-Encoding': 'gzip, deflate',
'Accept-Language': 'en-BE,*',
'Connection': 'Keep-Alive'
}
# Add CRC32C checksum
crc32c_header = compute_crc32c_header(pagedata_content.encode())
if crc32c_header:
headers['x-goog-hash'] = crc32c_header
upload_response = self.session.put(
f"https://eu.tectonic.remarkable.com/sync/v3/files/{pagedata_hash}",
data=pagedata_content.encode(),
headers=headers
)
print(f"ā
Pagedata upload response: {upload_response.status_code}")
if upload_response.status_code not in [200, 202]:
print(f"ā Upload failed: {upload_response.text}")
raise RuntimeError(f"Pagedata upload failed: {upload_response.status_code}")
return pagedata_hash
def create_new_document_schema(self, doc_lines: list, new_metadata_hash: str, metadata_line: str, new_pagedata_hash: str = None):
"""Create new document schema with updated metadata hash and pagedata"""
print(f"\nšļø STEP 8: CREATING NEW DOCUMENT SCHEMA")
print("-" * 30)
# Replace metadata line and pagedata line with new hashes
new_lines = []
pagedata_line = None
# Find pagedata line
for line in doc_lines[1:]: # Skip version
if ':' in line and '.pagedata' in line:
pagedata_line = line
break
for line in doc_lines:
if line == metadata_line:
# Replace metadata hash but keep size
parts = line.split(':')
parts[0] = new_metadata_hash
new_line = ':'.join(parts)
new_lines.append(new_line)
print(f"ā
Updated metadata line:")
print(f" Old: {line}")
print(f" New: {new_line}")
elif new_pagedata_hash and line == pagedata_line:
# Replace pagedata hash and update size to 1 byte
parts = line.split(':')
parts[0] = new_pagedata_hash
parts[4] = '1' # Update size to 1 byte (newline)
new_line = ':'.join(parts)
new_lines.append(new_line)
print(f"ā
Updated pagedata line:")
print(f" Old: {line}")
print(f" New: {new_line}")
else:
new_lines.append(line)
new_doc_content = '\n'.join(new_lines)
print(f"ā
New document schema ({len(new_doc_content)} bytes):")
for i, line in enumerate(new_lines):
print(f" Line {i}: {line}")
return new_doc_content
def upload_new_document_schema(self, doc_content: str, doc_uuid: str):
"""Upload new document schema"""
print(f"\nā¬ļø STEP 9: UPLOADING NEW DOCUMENT SCHEMA")
print("-" * 30)
# Calculate hash
doc_hash = hashlib.sha256(doc_content.encode()).hexdigest()
print(f"ā
New document schema hash: {doc_hash}")
# Upload using working method
headers = {
'Content-Type': 'application/octet-stream',
'rm-batch-number': '1',
'rm-filename': f'{doc_uuid}.docSchema',
'rm-sync-id': str(uuid.uuid4()),
'User-Agent': 'reMarkable-desktop-win/3.11.1.1951',
'Accept-Encoding': 'gzip, deflate',
'Accept-Language': 'en-BE,*',
'Connection': 'Keep-Alive'
}
# Add CRC32C checksum
crc32c_header = compute_crc32c_header(doc_content.encode())
if crc32c_header:
headers['x-goog-hash'] = crc32c_header
upload_response = self.session.put(
f"https://eu.tectonic.remarkable.com/sync/v3/files/{doc_hash}",
data=doc_content.encode(),
headers=headers
)
print(f"ā
Document schema upload response: {upload_response.status_code}")
if upload_response.status_code not in [200, 202]:
print(f"ā Upload failed: {upload_response.text}")
raise RuntimeError(f"Document schema upload failed: {upload_response.status_code}")
return doc_hash
def update_root_docschema(self, root_content: str, doc_info: dict, new_doc_hash: str):
"""Update root.docSchema with new document hash"""
print(f"\nš STEP 10: UPDATING ROOT.DOCSCHEMA")
print("-" * 30)
# Replace old document line with new hash
old_line = doc_info['line']
parts = old_line.split(':')
parts[0] = new_doc_hash
new_line = ':'.join(parts)
print(f"ā
Updating root.docSchema entry:")
print(f" Old: {old_line}")
print(f" New: {new_line}")
# Replace in root content
new_root_content = root_content.replace(old_line, new_line)
print(f"ā
New root.docSchema size: {len(new_root_content)} bytes")
return new_root_content
def upload_new_root(self, root_content: str, generation: int):
"""Upload new root.docSchema and update roothash"""
print(f"\nā¬ļø STEP 11: UPLOADING NEW ROOT.DOCSCHEMA")
print("-" * 30)
# Calculate hash
root_hash = hashlib.sha256(root_content.encode()).hexdigest()
print(f"ā
New root hash: {root_hash}")
# Upload root content using working method
headers = {
'Content-Type': 'text/plain',
'rm-batch-number': '1',
'rm-filename': 'root.docSchema',
'rm-sync-id': str(uuid.uuid4()),
'User-Agent': 'reMarkable-desktop-win/3.11.1.1951',
'Accept-Encoding': 'gzip, deflate',
'Accept-Language': 'en-BE,*',
'Connection': 'Keep-Alive'
}
# Add CRC32C checksum
crc32c_header = compute_crc32c_header(root_content.encode())
if crc32c_header:
headers['x-goog-hash'] = crc32c_header
upload_response = self.session.put(
f"https://eu.tectonic.remarkable.com/sync/v3/files/{root_hash}",
data=root_content.encode(),
headers=headers
)
print(f"ā
Root content upload response: {upload_response.status_code}")
if upload_response.status_code not in [200, 202]:
print(f"ā Upload failed: {upload_response.text}")
raise RuntimeError(f"Root content upload failed: {upload_response.status_code}")
# Update root hash pointer
print(f"\nš STEP 12: UPDATING ROOT HASH POINTER")
print("-" * 30)
# Create root data exactly like working upload_manager.py
root_update_data = {
"broadcast": True,
"generation": generation,
"hash": root_hash
}
# Convert to JSON with 2-space indent like real app
root_content_body = json.dumps(root_update_data, indent=2).encode('utf-8')
# Headers exactly like working upload_manager.py
headers = {
'Content-Type': 'application/json',
'rm-batch-number': '1',
'rm-filename': 'roothash',
'rm-sync-id': str(uuid.uuid4()),
'User-Agent': 'reMarkable-desktop-win/3.11.1.1951',
'Accept-Encoding': 'gzip, deflate',
'Accept-Language': 'en-BE,*',
'Connection': 'Keep-Alive'
}
# Add CRC32C checksum
crc32c_header = compute_crc32c_header(root_content_body)
if crc32c_header:
headers['x-goog-hash'] = crc32c_header
# Use /sync/v3/root endpoint like working code
root_update_response = self.session.put(
"https://eu.tectonic.remarkable.com/sync/v3/root",
data=root_content_body,
headers=headers
)
print(f"ā
Root update response: {root_update_response.status_code}")
if root_update_response.status_code not in [200, 202]:
print(f"ā Root update failed: {root_update_response.text}")
raise RuntimeError(f"Root update failed: {root_update_response.status_code}")
return root_hash
def analyze_before_move(self):
"""Analyze cloud state and show what the script will do before executing"""
print(f"š PYLONTECH DOCUMENT MOVE ANALYSIS")
print("=" * 60)
print("This script will analyze the current cloud state and show you exactly")
print("what it will do before performing any move operations.")
print("")
try:
# Step 1-2: Analyze and find items
root_data, root_content, documents, folders = self.analyze_cloud_state()
pylontech_uuid, pylontech_doc, myfolder_uuid, myfolder_data, otherfolder_uuid, otherfolder_data = self.find_target_items(documents, folders)
if not all([pylontech_uuid, pylontech_doc, otherfolder_uuid, otherfolder_data]):
print("\nā ANALYSIS FAILED: Missing required items")
return False
# Show what will happen
print(f"\nš MOVE OPERATION PLAN")
print("=" * 30)
print(f"š Document to move: '{pylontech_doc['name']}'")
print(f" UUID: {pylontech_uuid}")
print(f" Current parent: {pylontech_doc['parent_uuid'][:8] if pylontech_doc['parent_uuid'] else 'root'}...")
print(f" Current location: {myfolder_data['name'] if pylontech_doc['parent_uuid'] == myfolder_uuid else 'Other location'}")
print(f"")
print(f"š Target folder: '{otherfolder_data['name']}'")
print(f" UUID: {otherfolder_uuid}")
print(f"")
print(f"š Operations that will be performed:")
print(f" 1. Retrieve pylontech document schema")
print(f" 2. Get current metadata from document")
print(f" 3. Update metadata parent field: {pylontech_doc['parent_uuid'][:8] if pylontech_doc['parent_uuid'] else 'root'}... ā {otherfolder_uuid[:8]}...")
print(f" 4. Upload new metadata with new hash")
print(f" 5. Upload updated pagedata with new hash")
print(f" 6. Create new document schema with new component hashes")
print(f" 7. Upload new document schema")
print(f" 8. Update root.docSchema with new document hash")
print(f" 9. Upload new root.docSchema")
print(f" 10. Update root hash pointer")
print(f"")
print(f"ā
Analysis complete. Ready to perform move operation.")
return {
'root_data': root_data,
'root_content': root_content,
'pylontech_uuid': pylontech_uuid,
'pylontech_doc': pylontech_doc,
'otherfolder_uuid': otherfolder_uuid,
'otherfolder_data': otherfolder_data
}
except Exception as e:
print(f"\nā Analysis failed: {e}")
return False
def execute_move(self, analysis_data):
"""Execute the actual move operation using the analysis data"""
print(f"\nš EXECUTING PYLONTECH MOVE OPERATION")
print("=" * 60)
try:
root_data = analysis_data['root_data']
root_content = analysis_data['root_content']
pylontech_uuid = analysis_data['pylontech_uuid']
pylontech_doc = analysis_data['pylontech_doc']
otherfolder_uuid = analysis_data['otherfolder_uuid']
otherfolder_data = analysis_data['otherfolder_data']
# Step 3: Get document schema
doc_content, doc_lines = self.get_document_schema(pylontech_doc['hash'])
# Step 4: Get current metadata
current_metadata, metadata_line = self.get_current_metadata(doc_lines)
# Step 5: Create updated metadata
updated_metadata_json = self.create_updated_metadata(
current_metadata,
otherfolder_uuid,
current_metadata.get('parent', '')
)
# Step 6: Upload new metadata
new_metadata_hash = self.upload_new_metadata(updated_metadata_json, pylontech_uuid)
# Step 7: Upload real pagedata
new_pagedata_hash = self.upload_real_pagedata(pylontech_uuid)
# Step 8: Create new document schema
new_doc_content = self.create_new_document_schema(doc_lines, new_metadata_hash, metadata_line, new_pagedata_hash)
# Step 9: Upload new document schema
new_doc_hash = self.upload_new_document_schema(new_doc_content, pylontech_uuid)
# Step 10: Update root.docSchema
doc_info_for_root = {
'line': None, # We'll need to find this in root content
'hash': pylontech_doc['hash']
}
# Find the document line in root content for updating
lines = root_content.strip().split('\n')
for line in lines[1:]: # Skip version header
if pylontech_uuid in line:
doc_info_for_root['line'] = line
break
if not doc_info_for_root['line']:
raise RuntimeError("Could not find document line in root.docSchema")
new_root_content = self.update_root_docschema(root_content, doc_info_for_root, new_doc_hash)
# Step 11-12: Upload new root and update pointer
new_root_hash = self.upload_new_root(new_root_content, root_data['generation'])
print(f"\nš SUCCESS! Pylontech document moved successfully!")
print(f" Document: {current_metadata.get('visibleName')}")
print(f" From: {current_metadata.get('parent', 'root')}")
print(f" To: {otherfolder_data['name']} ({otherfolder_uuid})")
print(f" New root hash: {new_root_hash}")
return True
except Exception as e:
print(f"\nā Move operation failed: {e}")
return False
Parameters
| Name | Type | Default | Kind |
|---|---|---|---|
bases |
- | - |
Parameter Details
bases: Parameter of type
Return Value
Returns unspecified type
Class Interface
Methods
__init__(self)
Purpose: Internal method: init
Returns: None
parse_directory_listing(self, content)
Purpose: Parse directory listing using the correct reMarkable format (from local_replica_v2.py)
Parameters:
content: Type: bytes
Returns: None
fetch_hash_content(self, hash_ref)
Purpose: Fetch content from reMarkable cloud by hash
Parameters:
hash_ref: Type: str
Returns: None
extract_metadata(self, metadata_hash)
Purpose: Extract metadata from hash
Parameters:
metadata_hash: Type: str
Returns: None
analyze_cloud_state(self)
Purpose: Analyze current cloud state using proper discovery method from local_replica_v2.py
Returns: None
find_target_items(self, documents, folders)
Purpose: Find pylontech document, Myfolder, and Otherfolder
Parameters:
documents: Parameterfolders: Parameter
Returns: None
get_document_schema(self, doc_hash)
Purpose: Retrieve document's docSchema
Parameters:
doc_hash: Type: str
Returns: None
get_current_metadata(self, doc_lines)
Purpose: Extract and fetch current metadata
Parameters:
doc_lines: Type: list
Returns: None
create_updated_metadata(self, current_metadata, new_parent_uuid, old_parent_uuid)
Purpose: Create updated metadata with new parent
Parameters:
current_metadata: Type: dictnew_parent_uuid: Type: strold_parent_uuid: Type: str
Returns: None
upload_new_metadata(self, metadata_json, doc_uuid)
Purpose: Upload new metadata and return hash
Parameters:
metadata_json: Type: strdoc_uuid: Type: str
Returns: See docstring for return details
upload_real_pagedata(self, doc_uuid)
Purpose: Upload real pagedata (newline) to match real app documents
Parameters:
doc_uuid: Type: str
Returns: None
create_new_document_schema(self, doc_lines, new_metadata_hash, metadata_line, new_pagedata_hash)
Purpose: Create new document schema with updated metadata hash and pagedata
Parameters:
doc_lines: Type: listnew_metadata_hash: Type: strmetadata_line: Type: strnew_pagedata_hash: Type: str
Returns: None
upload_new_document_schema(self, doc_content, doc_uuid)
Purpose: Upload new document schema
Parameters:
doc_content: Type: strdoc_uuid: Type: str
Returns: None
update_root_docschema(self, root_content, doc_info, new_doc_hash)
Purpose: Update root.docSchema with new document hash
Parameters:
root_content: Type: strdoc_info: Type: dictnew_doc_hash: Type: str
Returns: None
upload_new_root(self, root_content, generation)
Purpose: Upload new root.docSchema and update roothash
Parameters:
root_content: Type: strgeneration: Type: int
Returns: None
analyze_before_move(self)
Purpose: Analyze cloud state and show what the script will do before executing
Returns: None
execute_move(self, analysis_data)
Purpose: Execute the actual move operation using the analysis data
Parameters:
analysis_data: Parameter
Returns: None
Required Imports
import json
import time
import hashlib
import uuid
import base64
Usage Example
# Example usage:
# result = PylontechMover(bases)
Tags
Similar Components
AI-powered semantic similarity - components with related functionality:
-
class DocumentMover 76.4% similar
-
function main_v27 65.4% similar
-
function main_v86 53.3% similar
-
class DocumentToTrashMover 51.7% similar
-
function move_section 46.1% similar