class RemarkableRestClient
Direct REST API client for reMarkable Cloud without external dependencies
/tf/active/vicechatdev/e-ink-llm/remarkable_rest_client.py
22 - 775
moderate
Purpose
Direct REST API client for reMarkable Cloud without external dependencies
Source Code
class RemarkableRestClient:
"""Direct REST API client for reMarkable Cloud without external dependencies"""
def __init__(self, config_dir: Optional[str] = None):
self.config_dir = Path(config_dir) if config_dir else Path.home() / '.eink-llm'
self.config_dir.mkdir(exist_ok=True)
self.device_token_file = self.config_dir / 'remarkable_device_token'
self.config_file = self.config_dir / 'remarkable_config.json'
self.logger = logging.getLogger(__name__)
self.device_token = None
self.user_token = None
self.storage_host = None
self.authenticated = False
# API endpoints (UPDATED to working endpoints from ddvk/rmapi project)
self.base_url = "https://webapp-prod.cloud.remarkable.engineering"
self.device_endpoint = f"{self.base_url}/token/json/2/device/new"
self.user_endpoint = f"{self.base_url}/token/json/2/user/new"
self.service_manager_url = "https://service-manager-production-dot-remarkable-production.appspot.com/service/json/1/document-storage?environment=production&apiVer=2"
self.device_register_url = "https://my.remarkable.com/connect/desktop"
# Load existing device token if available
self._load_device_token()
def _load_device_token(self):
"""Load device token from file if it exists"""
if self.device_token_file.exists():
try:
with open(self.device_token_file, 'r') as f:
self.device_token = f.read().strip()
print("🔑 Loaded existing device token")
except Exception as e:
self.logger.error(f"Error loading device token: {e}")
def _save_device_token(self, token: str):
"""Save device token to file"""
try:
with open(self.device_token_file, 'w') as f:
f.write(token)
self.device_token = token
print("💾 Device token saved")
except Exception as e:
self.logger.error(f"Error saving device token: {e}")
def register_device(self, one_time_code: str) -> bool:
"""
Register a new device with reMarkable Cloud
Args:
one_time_code: 8-character code from reMarkable account
Returns:
True if registration successful
"""
device_id = str(uuid.uuid4())
payload = {
"code": one_time_code,
"deviceDesc": "desktop-windows", # Using same as rmcl
"deviceID": device_id
}
print("🔐 Registering new device with reMarkable Cloud...")
# Add proper headers (matching rmcl user agent)
headers = {
"Content-Type": "application/json",
"User-Agent": "rmcl <https://github.com/rschroll/rmcl>"
}
try:
response = requests.post(
self.device_endpoint,
json=payload,
headers=headers,
timeout=30
)
if response.status_code == 200:
device_token = response.text.strip()
if device_token and len(device_token) > 10: # Basic validation
self._save_device_token(device_token)
print("✅ Device registered successfully!")
return True
else:
print(f"❌ Invalid device token received: {device_token[:50]}...")
return False
else:
print(f"❌ Device registration failed: HTTP {response.status_code}")
if response.text:
print(f" Response: {response.text[:100]}")
return False
except Exception as e:
print(f"❌ Device registration error: {e}")
return False
def get_user_token(self) -> bool:
"""
Get a fresh user token using the device token
Returns:
True if user token obtained successfully
"""
if not self.device_token:
print("❌ No device token available. Please register device first.")
return False
try:
print("🔄 Getting fresh user token...")
headers = {
"Authorization": f"Bearer {self.device_token}",
"User-Agent": "remarkable-desktop-linux/2.15.1.382"
}
response = requests.post(
self.user_endpoint,
headers=headers,
timeout=30
)
if response.status_code == 200:
self.user_token = response.text.strip()
print("✅ User token obtained")
return True
else:
print(f"❌ User token request failed: {response.status_code}")
print(f"Response: {response.text[:200]}")
# If token is invalid, remove the device token
if response.status_code == 401:
print("🗑️ Device token appears invalid, removing...")
if self.device_token_file.exists():
self.device_token_file.unlink()
self.device_token = None
return False
except Exception as e:
self.logger.error(f"User token error: {e}")
print(f"❌ User token error: {e}")
return False
def discover_storage_host(self) -> bool:
"""
Discover the document storage service host
Returns:
True if host discovered successfully
"""
try:
print("🔍 Discovering storage service host...")
response = requests.get(self.service_manager_url, timeout=30)
if response.status_code == 200:
service_info = response.json()
self.storage_host = service_info.get("Host")
if self.storage_host:
print(f"✅ Storage host: {self.storage_host}")
return True
else:
print("❌ No storage host in response")
return False
else:
print(f"❌ Service discovery failed: {response.status_code}")
return False
except Exception as e:
self.logger.error(f"Service discovery error: {e}")
print(f"❌ Service discovery error: {e}")
return False
async def authenticate(self, one_time_code: Optional[str] = None) -> bool:
"""
Authenticate with reMarkable Cloud
Args:
one_time_code: One-time code for device registration (if needed)
Returns:
True if authentication successful
"""
try:
# If we have a device token, try to get user token
if self.device_token:
print("🔑 Using existing device token...")
if self.get_user_token() and self.discover_storage_host():
self.authenticated = True
return True
# If one-time code provided, register new device
if one_time_code:
if self.register_device(one_time_code):
if self.get_user_token() and self.discover_storage_host():
self.authenticated = True
return True
print("❌ Authentication failed. Please provide a one-time code.")
return False
except Exception as e:
self.logger.error(f"Authentication error: {e}")
print(f"❌ Authentication error: {e}")
return False
def list_documents(self) -> List[Dict]:
"""
List all documents and folders in reMarkable Cloud using Chrome extension API
Returns:
List of document/folder metadata dictionaries
"""
if not self.authenticated or not self.user_token:
print("❌ Not authenticated")
return []
try:
# Use the Chrome extension API endpoint with storage host from token
storage_host = self._get_storage_host()
list_url = f"{storage_host}/doc/v2/files"
headers = {
"Authorization": f"Bearer {self.user_token}",
"rM-Source": "E-Ink-LLM-Assistant"
}
response = requests.get(list_url, headers=headers, timeout=30)
if response.status_code == 200:
docs = response.json()
print(f"📁 Retrieved {len(docs)} items from cloud")
return docs
else:
print(f"❌ Failed to list documents: {response.status_code}")
return []
except Exception as e:
self.logger.error(f"Error listing documents: {e}")
print(f"❌ Error listing documents: {e}")
return []
def get_document_download_url(self, doc_id: str) -> Optional[str]:
"""
Get download URL for a specific document
Args:
doc_id: Document UUID
Returns:
Download URL or None if failed
"""
if not self.authenticated or not self.storage_host:
print("❌ Not authenticated")
return None
try:
single_url = f"https://{self.storage_host}/document-storage/json/2/docs?doc={doc_id}&withBlob=true"
response = requests.get(
single_url,
headers={"Authorization": f"Bearer {self.user_token}"},
timeout=30
)
if response.status_code == 200:
doc_meta = response.json()
if doc_meta and len(doc_meta) > 0:
blob_url = doc_meta[0].get("BlobURLGet")
return blob_url
print(f"❌ Failed to get download URL for {doc_id}")
return None
except Exception as e:
self.logger.error(f"Error getting download URL: {e}")
return None
def download_document(self, doc_id: str, doc_name: str, output_dir: Path) -> Optional[Path]:
"""
Download a document from reMarkable Cloud
Args:
doc_id: Document UUID
doc_name: Document name for filename
output_dir: Directory to save the file
Returns:
Path to downloaded file or None if failed
"""
try:
blob_url = self.get_document_download_url(doc_id)
if not blob_url:
return None
# Download the content
response = requests.get(blob_url, timeout=60)
if response.status_code != 200:
print(f"❌ Failed to download content: {response.status_code}")
return None
# Create output directory
output_dir.mkdir(parents=True, exist_ok=True)
# Generate safe filename
safe_name = "".join(c for c in doc_name if c.isalnum() or c in (' ', '-', '_')).rstrip()
if not safe_name:
safe_name = f"document_{doc_id[:8]}"
# Save as ZIP first (reMarkable format)
zip_path = output_dir / f"{safe_name}.zip"
with open(zip_path, 'wb') as f:
f.write(response.content)
# Try to extract PDF if available
try:
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
# Look for PDF file in the ZIP
pdf_files = [name for name in zip_ref.namelist() if name.endswith('.pdf')]
if pdf_files:
# Extract the PDF
pdf_path = output_dir / f"{safe_name}.pdf"
with zip_ref.open(pdf_files[0]) as pdf_file:
with open(pdf_path, 'wb') as out_file:
out_file.write(pdf_file.read())
# Clean up ZIP file
zip_path.unlink()
print(f"📥 Downloaded PDF: {doc_name} -> {pdf_path.name}")
return pdf_path
except Exception:
# If PDF extraction fails, keep the ZIP
pass
print(f"📥 Downloaded ZIP: {doc_name} -> {zip_path.name}")
return zip_path
except Exception as e:
self.logger.error(f"Error downloading {doc_name}: {e}")
print(f"❌ Failed to download {doc_name}: {e}")
return None
def create_pdf_zip_package(self, pdf_path: Path, doc_name: str) -> bytes:
"""
Create a ZIP package for PDF upload in reMarkable format
Args:
pdf_path: Path to PDF file
doc_name: Document name
Returns:
ZIP package as bytes
"""
try:
# Create temporary ZIP file
with tempfile.NamedTemporaryFile() as temp_zip:
with zipfile.ZipFile(temp_zip, 'w', zipfile.ZIP_DEFLATED) as zip_ref:
# Add the PDF file
zip_ref.write(pdf_path, f"{doc_name}.pdf")
# Create .content metadata file
content_metadata = {
"extraMetadata": {},
"fileType": "pdf",
"fontName": "",
"lastOpenedPage": 0,
"lineHeight": -1,
"margins": 100,
"pageCount": 1, # Will be updated by reMarkable
"textScale": 1,
"transform": {
"m11": 1, "m12": 0, "m13": 0,
"m21": 0, "m22": 1, "m23": 0,
"m31": 0, "m32": 0, "m33": 1
}
}
zip_ref.writestr(f"{doc_name}.content", json.dumps(content_metadata))
# Create .pagedata file (empty for PDF)
zip_ref.writestr(f"{doc_name}.pagedata", "")
# Read the ZIP content
temp_zip.seek(0)
return temp_zip.read()
except Exception as e:
self.logger.error(f"Error creating ZIP package: {e}")
raise
def upload_document(self, file_path: Path, folder_id: str = "",
document_name: Optional[str] = None) -> bool:
"""
Upload a PDF document to reMarkable Cloud
Args:
file_path: Path to PDF file
folder_id: Target folder ID (empty string for root)
document_name: Name for the document
Returns:
True if upload successful
"""
if not self.authenticated or not self.storage_host:
print("❌ Not authenticated")
return False
try:
if not file_path.exists():
print(f"❌ File not found: {file_path}")
return False
if file_path.suffix.lower() != '.pdf':
print(f"❌ Only PDF files supported. Got: {file_path.suffix}")
return False
# Use filename if no document name provided
if not document_name:
document_name = file_path.stem
print(f"📤 Uploading {file_path.name} as '{document_name}'...")
# Step 1: Create upload request
new_id = str(uuid.uuid4())
timestamp = datetime.utcnow().isoformat() + "Z"
req_payload = [{
"ID": new_id,
"Version": 1,
"ModifiedClient": timestamp
}]
req_url = f"https://{self.storage_host}/document-storage/json/2/upload/request"
response = requests.put(
req_url,
json=req_payload,
headers={"Authorization": f"Bearer {self.user_token}"},
timeout=30
)
if response.status_code != 200:
print(f"❌ Upload request failed: {response.status_code}")
return False
upload_data = response.json()
blob_put_url = upload_data[0]["BlobURLPut"]
# Step 2: Upload the content
zip_content = self.create_pdf_zip_package(file_path, document_name)
upload_response = requests.put(blob_put_url, data=zip_content, timeout=60)
if upload_response.status_code != 200:
print(f"❌ Content upload failed: {upload_response.status_code}")
return False
# Step 3: Update metadata
meta_payload = [{
"ID": new_id,
"Version": 1,
"ModifiedClient": timestamp,
"Type": "DocumentType",
"VissibleName": document_name,
"Parent": folder_id,
"Bookmarked": False
}]
update_url = f"https://{self.storage_host}/document-storage/json/2/upload/update-status"
meta_response = requests.put(
update_url,
json=meta_payload,
headers={"Authorization": f"Bearer {self.user_token}"},
timeout=30
)
if meta_response.status_code == 200:
print(f"✅ Upload successful: {document_name}")
return True
else:
print(f"❌ Metadata update failed: {meta_response.status_code}")
return False
except Exception as e:
self.logger.error(f"Error uploading {file_path}: {e}")
print(f"❌ Upload failed: {e}")
return False
def find_folder_by_name(self, folder_name: str) -> Optional[str]:
"""
Find a folder by name and return its ID
Args:
folder_name: Name of the folder to find
Returns:
Folder ID or None if not found
"""
docs = self.list_documents()
for doc in docs:
if (doc.get("Type") == "CollectionType" and
doc.get("VissibleName") == folder_name):
return doc.get("ID")
return None
def get_documents_in_folder(self, folder_id: str = "") -> List[Dict]:
"""
Get all documents in a specific folder
Args:
folder_id: Folder ID (empty string for root)
Returns:
List of document metadata dictionaries
"""
docs = self.list_documents()
folder_docs = []
for doc in docs:
if (doc.get("Type") == "DocumentType" and
doc.get("Parent") == folder_id):
folder_docs.append(doc)
return folder_docs
async def create_folder(self, folder_name: str, parent_id: Optional[str] = None) -> bool:
"""
Create a folder in reMarkable Cloud
Args:
folder_name: Name of the folder to create
parent_id: Parent folder ID (None for root)
Returns:
True if folder created successfully, False otherwise
"""
try:
storage_host = self._get_storage_host()
if not storage_host:
print("❌ Could not determine storage host")
return False
folder_id = await self._create_folder(folder_name, parent_id, storage_host)
return folder_id is not None
except Exception as e:
print(f"❌ Error creating folder '{folder_name}': {e}")
return False
async def upload_content(self, content: bytes, filename: str, folder_path: str = None,
file_type: str = "application/pdf") -> Optional[str]:
"""
Upload content directly to reMarkable Cloud using new API endpoints
Args:
content: File content as bytes
filename: Name of the file
folder_path: Target folder path (e.g., "/My Folder")
file_type: MIME type of the file
Returns:
Document ID if successful, None otherwise
"""
if not self.authenticated:
print("❌ Not authenticated with reMarkable Cloud")
return None
try:
# Get storage host from user token
storage_host = self._get_storage_host()
# Find or create target folder
parent_id = None
if folder_path:
parent_id = await self._ensure_folder_exists(folder_path, storage_host)
# Create metadata (based on Chrome extension pattern)
metadata = {
'file_name': filename,
'type': 'DocumentType'
}
if parent_id:
metadata['parent'] = parent_id
# Encode metadata as base64
import base64
meta_encoded = base64.b64encode(json.dumps(metadata).encode()).decode()
headers = {
'Authorization': f'Bearer {self.user_token}',
'rM-Source': 'E-Ink-LLM-Assistant',
'rM-Meta': meta_encoded,
'Content-Type': file_type,
'User-Agent': 'E-Ink-LLM-Assistant/1.0'
}
url = f"{storage_host}/doc/v2/files"
response = requests.post(url, headers=headers, data=content, timeout=60)
if response.status_code in [200, 201]:
# Extract document ID from Location header
location = response.headers.get('Location', '')
document_id = location.split('/')[-1] if location else None
print(f"✅ Uploaded {filename} to reMarkable Cloud (ID: {document_id})")
return document_id
else:
print(f"❌ Upload failed: {response.status_code} - {response.text}")
return None
except Exception as e:
print(f"❌ Upload error: {e}")
return None
def _get_storage_host(self) -> str:
"""
Extract storage host from JWT token (tectonic service)
Based on Chrome extension logic
"""
try:
import base64
# Decode JWT payload
payload = self.user_token.split('.')[1]
# Add padding if needed
payload += '=' * (4 - len(payload) % 4)
decoded = json.loads(base64.b64decode(payload))
tectonic = decoded.get('tectonic')
if tectonic and isinstance(tectonic, str) and tectonic:
return f"https://{tectonic}.tectonic.remarkable.com"
else:
# Fallback to internal cloud
return "https://internal.cloud.remarkable.com"
except Exception:
# Fallback if token parsing fails
return "https://internal.cloud.remarkable.com"
async def _ensure_folder_exists(self, folder_path: str, storage_host: str) -> Optional[str]:
"""
Ensure folder exists, create if necessary
Args:
folder_path: Path to folder (starting with /)
storage_host: Storage service host URL
Returns:
Folder ID if successful
"""
try:
# Get current files to find existing folder
folder_id = await self._find_folder_by_path(folder_path, storage_host)
if folder_id:
return folder_id
# Create folder(s) as needed
path_parts = [part for part in folder_path.split('/') if part]
current_parent = None
current_path = ""
for part in path_parts:
current_path += f"/{part}"
existing_id = await self._find_folder_by_path(current_path, storage_host)
if existing_id:
current_parent = existing_id
else:
# Create this folder
current_parent = await self._create_folder(part, current_parent, storage_host)
if not current_parent:
raise Exception(f"Failed to create folder '{part}'")
return current_parent
except Exception as e:
print(f"❌ Failed to ensure folder exists: {e}")
return None
async def _find_folder_by_path(self, folder_path: str, storage_host: str) -> Optional[str]:
"""Find folder ID by path"""
try:
if folder_path == "/" or folder_path == "":
return None # Root folder
# Get all files/folders
url = f"{storage_host}/doc/v2/files"
headers = {
'Authorization': f'Bearer {self.user_token}',
'rM-Source': 'E-Ink-LLM-Assistant'
}
response = requests.get(url, headers=headers, timeout=30)
if response.status_code != 200:
return None
files = response.json()
path_parts = [part for part in folder_path.split('/') if part]
current_parent = None
for part in path_parts:
found = False
for item in files:
if (item.get('visibleName') == part and
item.get('type') == 'CollectionType' and
item.get('parent') == current_parent):
current_parent = item.get('id')
found = True
break
if not found:
return None
return current_parent
except Exception:
return None
async def _create_folder(self, folder_name: str, parent_id: Optional[str], storage_host: str) -> Optional[str]:
"""Create a single folder"""
try:
import base64
# Create folder metadata
metadata = {
'file_name': folder_name,
'type': 'CollectionType'
}
if parent_id:
metadata['parent'] = parent_id
# Encode metadata as base64
meta_encoded = base64.b64encode(json.dumps(metadata).encode()).decode()
headers = {
'Authorization': f'Bearer {self.user_token}',
'rM-Source': 'E-Ink-LLM-Assistant',
'rM-Meta': meta_encoded,
'Content-Type': 'folder' # Special content type for folders
}
url = f"{storage_host}/doc/v2/files"
response = requests.post(url, headers=headers, data='', timeout=30)
if response.status_code in [200, 201]:
# Extract folder ID from Location header
location = response.headers.get('Location', '')
folder_id = location.split('/')[-1] if location else None
print(f"✅ Created folder '{folder_name}' (ID: {folder_id})")
return folder_id
else:
print(f"❌ Failed to create folder '{folder_name}': {response.status_code} - {response.text}")
return None
except Exception as e:
print(f"❌ Error creating folder '{folder_name}': {e}")
return None
Parameters
| Name | Type | Default | Kind |
|---|---|---|---|
bases |
- | - |
Parameter Details
bases: Parameter of type
Return Value
Returns unspecified type
Class Interface
Methods
__init__(self, config_dir)
Purpose: Internal method: init
Parameters:
config_dir: Type: Optional[str]
Returns: None
_load_device_token(self)
Purpose: Load device token from file if it exists
Returns: None
_save_device_token(self, token)
Purpose: Save device token to file
Parameters:
token: Type: str
Returns: None
register_device(self, one_time_code) -> bool
Purpose: Register a new device with reMarkable Cloud Args: one_time_code: 8-character code from reMarkable account Returns: True if registration successful
Parameters:
one_time_code: Type: str
Returns: Returns bool
get_user_token(self) -> bool
Purpose: Get a fresh user token using the device token Returns: True if user token obtained successfully
Returns: Returns bool
discover_storage_host(self) -> bool
Purpose: Discover the document storage service host Returns: True if host discovered successfully
Returns: Returns bool
list_documents(self) -> List[Dict]
Purpose: List all documents and folders in reMarkable Cloud using Chrome extension API Returns: List of document/folder metadata dictionaries
Returns: Returns List[Dict]
get_document_download_url(self, doc_id) -> Optional[str]
Purpose: Get download URL for a specific document Args: doc_id: Document UUID Returns: Download URL or None if failed
Parameters:
doc_id: Type: str
Returns: Returns Optional[str]
download_document(self, doc_id, doc_name, output_dir) -> Optional[Path]
Purpose: Download a document from reMarkable Cloud Args: doc_id: Document UUID doc_name: Document name for filename output_dir: Directory to save the file Returns: Path to downloaded file or None if failed
Parameters:
doc_id: Type: strdoc_name: Type: stroutput_dir: Type: Path
Returns: Returns Optional[Path]
create_pdf_zip_package(self, pdf_path, doc_name) -> bytes
Purpose: Create a ZIP package for PDF upload in reMarkable format Args: pdf_path: Path to PDF file doc_name: Document name Returns: ZIP package as bytes
Parameters:
pdf_path: Type: Pathdoc_name: Type: str
Returns: Returns bytes
upload_document(self, file_path, folder_id, document_name) -> bool
Purpose: Upload a PDF document to reMarkable Cloud Args: file_path: Path to PDF file folder_id: Target folder ID (empty string for root) document_name: Name for the document Returns: True if upload successful
Parameters:
file_path: Type: Pathfolder_id: Type: strdocument_name: Type: Optional[str]
Returns: Returns bool
find_folder_by_name(self, folder_name) -> Optional[str]
Purpose: Find a folder by name and return its ID Args: folder_name: Name of the folder to find Returns: Folder ID or None if not found
Parameters:
folder_name: Type: str
Returns: Returns Optional[str]
get_documents_in_folder(self, folder_id) -> List[Dict]
Purpose: Get all documents in a specific folder Args: folder_id: Folder ID (empty string for root) Returns: List of document metadata dictionaries
Parameters:
folder_id: Type: str
Returns: Returns List[Dict]
_get_storage_host(self) -> str
Purpose: Extract storage host from JWT token (tectonic service) Based on Chrome extension logic
Returns: Returns str
Required Imports
import asyncio
import json
import logging
import tempfile
import time
Usage Example
# Example usage:
# result = RemarkableRestClient(bases)
Similar Components
AI-powered semantic similarity - components with related functionality:
-
class RemarkableAPIClient 71.9% similar
-
class RemarkableCloudManager 70.6% similar
-
class Client 68.7% similar
-
class RemarkableConfig 64.2% similar
-
class RemarkableAuth 62.5% similar