class FileCloudAPI_v1
Python wrapper for the FileCloud REST API. This class provides methods to interact with FileCloud server APIs, handling authentication, session management, and various file operations.
/tf/active/vicechatdev/FC_api.py
19 - 3233
moderate
Purpose
Python wrapper for the FileCloud REST API. This class provides methods to interact with FileCloud server APIs, handling authentication, session management, and various file operations.
Source Code
class FileCloudAPI:
"""
Python wrapper for the FileCloud REST API.
This class provides methods to interact with FileCloud server APIs,
handling authentication, session management, and various file operations.
"""
def __init__(self, server_url: str, username: str = None, password: str = None):
"""
Initialize the FileCloud API client.
Args:
server_url: The URL of the FileCloud server (e.g., 'https://filecloud.example.com/')
username: Username for authentication (optional if authenticating later)
password: Password for authentication (optional if authenticating later)
"""
self.server_url = server_url.rstrip('/')
self.username = username
self.password = password
self.session = requests.session()
self.authenticated = False
self.authenticated_admin = False
self.headers = {'Accept': 'application/json'}
def login(self, username: str = None, password: str = None) -> bool:
"""
Authenticate with the FileCloud server.
Args:
username: Override the username provided during initialization
password: Override the password provided during initialization
Returns:
bool: True if authentication is successful, False otherwise.
"""
if username:
self.username = username
if password:
self.password = password
if not self.username or not self.password:
raise ValueError("Username and password are required for authentication")
login_endpoint = '/core/loginguest'
credentials = {'userid': self.username, 'password': self.password}
try:
response = self.session.post(
f"{self.server_url}{login_endpoint}",
data=credentials,
headers=self.headers
)
login_call = response.json()
if login_call['command'][0]['result'] == 1:
self.authenticated = True
self.authenticated_admin = False
return True
else:
self.authenticated = False
error_message = login_call['command'][0].get('message', 'Unknown error')
print(f"Login failed: {error_message}")
return False
except Exception as e:
print(f"Login error: {str(e)}")
self.authenticated = False
return False
def adminlogin(self, username: str = None, password: str = None) -> bool:
"""
Authenticate with the FileCloud server.
Args:
username: Override the username provided during initialization
password: Override the password provided during initialization
Returns:
bool: True if authentication is successful, False otherwise.
"""
if username:
self.username = username
if password:
self.password = password
if not self.username or not self.password:
raise ValueError("Username and password are required for authentication")
login_endpoint = '/admin/adminlogin'
credentials = {'adminuser': self.username, 'adminpassword': self.password}
try:
response = self.session.post(
f"{self.server_url}{login_endpoint}",
data=credentials,
headers=self.headers
)
login_call = response.json()
if login_call['command'][0]['result'] == 1:
self.authenticated_admin = True
self.authenticated = False
return True
else:
self.authenticated_admin = False
error_message = login_call['command'][0].get('message', 'Unknown error')
print(f"Login failed: {error_message}")
return False
except Exception as e:
print(f"Login error: {str(e)}")
self.authenticated_admin = False
return False
def authenticate(self, username: str = None, password: str = None) -> bool:
"""Alias for login() for backward compatibility"""
return self.login(username, password)
def logout(self) -> Dict:
"""
Logout from the FileCloud server.
Returns:
Dict: Response from the server.
"""
if not self.authenticated:
return {"success": True, "message": "Not logged in"}
logout_endpoint = '/core/locksession'
response = self.session.post(
f"{self.server_url}{logout_endpoint}",
cookies=self.session.cookies
)
self.authenticated = False
self.session.cookies.clear()
return self._parse_response(response)
def _ensure_authenticated(self):
"""
Ensure that the client is authenticated with the server.
Attempts to login if not already authenticated.
Raises:
Exception: If authentication fails.
"""
if not self.authenticated:
if not self.login():
raise Exception("Authentication failed. Please check your credentials.")
def _ensure_authenticated_admin(self):
"""
Ensure that the client is authenticated with the server.
Attempts to login if not already authenticated.
Raises:
Exception: If authentication fails.
"""
if not self.authenticated_admin:
if not self.adminlogin():
raise Exception("Authentication failed. Please check your credentials.")
def _parse_response(self, response: requests.Response) -> Dict:
"""
Parse response based on content type.
Args:
response: The HTTP response object.
Returns:
Dict: Parsed response as a dictionary.
"""
if response.status_code != 200:
return {
"success": False,
"message": f"HTTP error {response.status_code}: {response.reason}"
}
if response.text == 'OK':
return {"success": True, "message": "Operation completed successfully"}
content_type = response.headers.get('Content-Type', '')
try:
# Handle JSON response
if 'application/json' in content_type:
json_data = response.json()
# Extract share_id and share_url for add_share responses
if 'command' in json_data and isinstance(json_data['command'], list):
command_data = json_data['command'][0]
if 'shareid' in command_data:
json_data['share_id'] = command_data.get('shareid')
json_data['share_url'] = command_data.get('shareurl')
return json_data
# Handle XML response - needed for FileCloud APIs that return XML
elif 'text/xml' in content_type or 'application/xml' in content_type or response.text.strip().startswith('<?xml') or '<' in response.text:
try:
# Parse XML
root = ET.fromstring(response.text)
# Check for error response
if root.tag == 'error':
return {
"success": False,
"message": root.text if root.text else "Unknown error"
}
# Handle workflow list response format
if root.tag == 'workflows':
meta = root.find('meta')
total = int(meta.find('total').text) if meta and meta.find('total') is not None else 0
workflow_elements = root.findall('workflow')
workflows = []
for workflow_elem in workflow_elements:
workflow = {}
for child in workflow_elem:
# Remove leading underscore from tag names (like _id)
tag_name = child.tag[1:] if child.tag.startswith('_') else child.tag
workflow[tag_name] = child.text
workflows.append(workflow)
return {
"success": True,
"data": {
"workflows": {
"workflow": workflows
},
"meta": {
"total": total,
"count": len(workflows)
}
}
}
# Handle workflow detail response
elif root.tag == 'workflow':
workflow = {}
for child in root:
# Remove leading underscore from tag names (like _id)
tag_name = child.tag[1:] if child.tag.startswith('_') else child.tag
workflow[tag_name] = child.text
return {
"success": True,
"data": {
"workflow": workflow
}
}
# Handle metadata sets response
elif root.tag == 'sets':
sets_list = []
for set_elem in root.findall('set'):
set_dict = {}
for child in set_elem:
# Remove leading underscore from tag names
tag_name = child.tag[1:] if child.tag.startswith('_') else child.tag
set_dict[tag_name] = child.text
sets_list.append(set_dict)
return {
"success": True,
"data": {
"sets": sets_list
}
}
# Handle metadata attributes response
elif root.tag == 'attributes':
attrs_list = []
for attr_elem in root.findall('attribute'):
attr_dict = {}
for child in attr_elem:
# Remove leading underscore from tag names
tag_name = child.tag[1:] if child.tag.startswith('_') else child.tag
# Handle options list
if tag_name == 'options' and len(child) > 0:
options = []
for option in child.findall('option'):
options.append(option.text)
attr_dict[tag_name] = options
else:
attr_dict[tag_name] = child.text
attrs_list.append(attr_dict)
return {
"success": True,
"data": {
"attributes": attrs_list
}
}
# Handle workflow run list response
elif root.tag == 'runs':
meta = root.find('meta')
total = int(meta.find('total').text) if meta and meta.find('total') is not None else 0
run_elements = root.findall('run')
runs = []
for run_elem in run_elements:
run = {}
for child in run_elem:
tag_name = child.tag[1:] if child.tag.startswith('_') else child.tag
run[tag_name] = child.text
runs.append(run)
return {
"success": True,
"data": {
"runs": runs,
"meta": {
"total": total,
"count": len(runs)
}
}
}
# Handle workflow run detail response
elif root.tag == 'run':
run = {}
for child in root:
tag_name = child.tag[1:] if child.tag.startswith('_') else child.tag
run[tag_name] = child.text
return {
"success": True,
"data": {
"run": run
}
}
# Handle share responses
elif root.tag == 'shares' or root.tag == 'share':
result = {
"success": True,
"data": {
root.tag: self._xml_to_dict(root)
}
}
# Extract share ID and URL
share_id = None
share_url = None
if root.tag == 'share':
share_id_elem = root.find('shareid')
share_url_elem = root.find('shareurl')
if share_id_elem is not None:
share_id = share_id_elem.text
if share_url_elem is not None:
share_url = share_url_elem.text
elif root.tag == 'shares':
share_elem = root.find('share')
if share_elem is not None:
share_id_elem = share_elem.find('shareid')
share_url_elem = share_elem.find('shareurl')
if share_id_elem is not None:
share_id = share_id_elem.text
if share_url_elem is not None:
share_url = share_url_elem.text
if share_id:
result['share_id'] = share_id
if share_url:
result['share_url'] = share_url
return result
# Check for command structure (common in FileCloud API responses)
command_elem = root.find('command')
if command_elem is not None:
cmd_type = command_elem.find('type')
cmd_result = command_elem.find('result')
cmd_message = command_elem.find('message')
result = {
"success": cmd_result is not None and cmd_result.text == "1",
"command_type": cmd_type.text if cmd_type is not None else None,
"message": cmd_message.text if cmd_message is not None else None,
"data": {}
}
# Extract share ID and URL for add_share responses
share_id_elem = command_elem.find('shareid')
share_url_elem = command_elem.find('shareurl')
if share_id_elem is not None:
result['share_id'] = share_id_elem.text
if share_url_elem is not None:
result['share_url'] = share_url_elem.text
return result
# Generic XML to dict conversion as fallback
result = {
"success": True,
"xml_root_tag": root.tag,
"data": {root.tag: self._xml_to_dict(root)}
}
# Last attempt to extract share ID and URL from generic XML response
if 'shareid' in response.text or 'shareurl' in response.text:
import re
share_id_match = re.search(r'<shareid>([^<]+)</shareid>', response.text)
share_url_match = re.search(r'<shareurl>([^<]+)</shareurl>', response.text)
if share_id_match:
result['share_id'] = share_id_match.group(1)
if share_url_match:
result['share_url'] = share_url_match.group(1)
return result
except ET.ParseError as xml_err:
return {
"success": False,
"message": f"Failed to parse XML response: {str(xml_err)}",
"raw_content": response.text[:500] # Include first 500 chars for debugging
}
# Try to parse as JSON regardless of content type
elif response.text.strip().startswith('{') or response.text.strip().startswith('['):
json_data = response.json()
# Extract share_id and share_url for add_share responses in JSON format
if 'command' in json_data and isinstance(json_data['command'], list):
command_data = json_data['command'][0]
if 'shareid' in command_data:
json_data['share_id'] = command_data.get('shareid')
json_data['share_url'] = command_data.get('shareurl')
return json_data
# Return raw text for unrecognized formats
else:
# Check if response might contain share information
if 'shareid' in response.text or 'shareurl' in response.text:
import re
result = {
"success": True,
"message": "Extracted share information from unrecognized format",
"raw_content": response.text[:500] # Include first 500 chars for debugging
}
share_id_match = re.search(r'shareid["\s:=]+([^"&\s,]+)', response.text, re.IGNORECASE)
share_url_match = re.search(r'shareurl["\s:=]+([^"&\s,]+)', response.text, re.IGNORECASE)
if share_id_match:
result['share_id'] = share_id_match.group(1)
if share_url_match:
result['share_url'] = share_url_match.group(1)
return result
return {
"success": False,
"message": "Unrecognized response format",
"content_type": content_type,
"raw_content": response.text[:500] # Include first 500 chars for debugging
}
except json.JSONDecodeError as e:
# Check if the response text might contain share information
if 'shareid' in response.text or 'shareurl' in response.text:
import re
result = {
"success": True,
"message": "Extracted share information despite JSON parse error",
"content_type": content_type,
"raw_content": response.text[:500] # Include first 500 chars for debugging
}
share_id_match = re.search(r'shareid["\s:=]+([^"&\s,]+)', response.text, re.IGNORECASE)
share_url_match = re.search(r'shareurl["\s:=]+([^"&\s,]+)', response.text, re.IGNORECASE)
if share_id_match:
result['share_id'] = share_id_match.group(1)
if share_url_match:
result['share_url'] = share_url_match.group(1)
return result
return {
"success": False,
"message": f"Failed to parse response as JSON: {str(e)}",
"content_type": content_type,
"raw_content": response.text[:500] # Include first 500 chars for debugging
}
except Exception as e:
return {
"success": False,
"message": f"Error processing response: {str(e)}",
"content_type": content_type,
"raw_content": response.text[:500] # Include first 500 chars for debugging
}
def _xml_to_dict(self, element):
"""
Convert an XML element to a dictionary.
Args:
element: XML element to convert
Returns:
Dict or str: Converted element
"""
result = {}
# Handle attributes
for key, value in element.attrib.items():
result[f"@{key}"] = value
# Handle children
for child in element:
child_tag = child.tag
# Remove leading underscore from tag names (like _id)
if child_tag.startswith('_'):
child_tag = child_tag[1:]
if len(child) == 0 and not child.attrib:
# Simple text node
if child.text is None:
result[child_tag] = ''
else:
result[child_tag] = child.text.strip()
else:
# Complex node
child_dict = self._xml_to_dict(child)
if child_tag in result:
# If key already exists, convert to list if not already
if not isinstance(result[child_tag], list):
result[child_tag] = [result[child_tag]]
result[child_tag].append(child_dict)
else:
result[child_tag] = child_dict
# Handle element text
if element.text and element.text.strip() and not result:
return element.text.strip()
return result
def _extract_paths_from_xml(self, xml_content: str) -> List[str]:
"""
Extract file paths from XML search results.
Args:
xml_content: XML content from search results
Returns:
List[str]: List of file paths
"""
paths = []
try:
root = ET.fromstring(xml_content)
for entry in root.findall('.//entry'):
path_elem = entry.find('path')
if path_elem is not None and path_elem.text:
paths.append(path_elem.text)
except Exception as e:
print(f"Error parsing XML: {str(e)}")
return paths
# File Operations
def upload_file(self, local_file_path: str, remote_path: str,
filename: Optional[str] = None, overwrite: bool = False) -> Dict:
"""
Upload a file to the FileCloud server.
Args:
local_file_path: Path to the local file to upload
remote_path: Directory path on the server where to upload the file
filename: Optional name to use for the file on the server (default: original filename)
overwrite: Whether to overwrite existing files with the same name
Returns:
Dict: Response from the server
"""
self._ensure_authenticated()
upload_endpoint = '/core/upload'
if not filename:
filename = os.path.basename(local_file_path)
# Prepare upload parameters
upload_params = {
'appname': 'explorer',
'path': remote_path,
'offset': 0,
'complete': 1,
'filename': filename
}
if overwrite:
upload_params['overwrite'] = 1
try:
with open(local_file_path, 'rb') as file_obj:
files = {'file': (filename, file_obj)}
response = self.session.post(
f"{self.server_url}{upload_endpoint}",
params=upload_params,
files=files,
cookies=self.session.cookies
)
return self._parse_response(response)
except Exception as e:
return {"success": False, "message": f"Upload failed: {str(e)}"}
def download_file(self, remote_file_path: str, local_file_path: Optional[str] = None,
check_only: bool = False) -> Union[Dict, bytes]:
"""
Download a file from the FileCloud server.
Args:
remote_file_path: Full path to the file on the server
local_file_path: Path where to save the downloaded file (if None, returns content)
check_only: If True, only checks if the file is downloadable without downloading
Returns:
Union[Dict, bytes]: Either a response dict or the file content as bytes
"""
self._ensure_authenticated()
download_endpoint = '/core/downloadfile'
# Extract filename from the path
filename = os.path.basename(remote_file_path)
download_params = {
'filepath': remote_file_path,
'filename': filename
}
if check_only:
download_params['checkonly'] = 1
try:
response = self.session.get(
f"{self.server_url}{download_endpoint}",
params=download_params,
cookies=self.session.cookies,
stream=True # Use streaming for large files
)
if response.status_code != 200:
return {"success": False, "message": f"Download failed with status {response.status_code}"}
# If check_only, just return success
if check_only:
if response.text == 'OK':
return {"success": True, "message": "File can be downloaded"}
return {"success": False, "message": response.text}
# If local_file_path is provided, save the file
if local_file_path:
with open(local_file_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
return {"success": True, "message": f"File saved to {local_file_path}"}
# Otherwise, return the content
return response.content
except Exception as e:
return {"success": False, "message": f"Download failed: {str(e)}"}
def get_file_list(self, path: str, sort_by: str = "name", sort_dir: int = 1,
start: int = 0, limit: int = -1, include_metadata: bool = False) -> Dict:
"""
Get a list of files and directories in the specified path.
Args:
path: Path to list files from
sort_by: Field to sort by ("name", "date", or "size")
sort_dir: Sort direction (1 for ascending, -1 for descending)
start: Index to start from (for pagination)
limit: Maximum number of entries to return (-1 for unlimited)
include_metadata: Whether to include metadata information
Returns:
Dict: Response containing the file list
"""
self._ensure_authenticated()
list_endpoint = '/core/getfilelist'
params = {
'path': path,
'sortby': sort_by,
'sortdir': sort_dir,
'start': start,
'limit': limit
}
if include_metadata:
params['sendmetadatasetinfo'] = 1
try:
response = self.session.post(
f"{self.server_url}{list_endpoint}",
params=params,
cookies=self.session.cookies
)
result= self._parse_response(response)
if result.get("success") and response.text.strip().startswith('<'):
result["paths"] = self._extract_paths_from_xml(response.text)
return result
except Exception as e:
return {"success": False, "message": f"Failed to get file list: {str(e)}"}
def create_folder(self, path: str, folder_name: Optional[str] = None,
subpath: Optional[str] = None) -> Dict:
"""
Create a new folder on the server.
Args:
path: Path where to create the folder
folder_name: Name of the folder to create
subpath: Alternative to folder_name, creates all missing folders in this path
Returns:
Dict: Response from the server
"""
self._ensure_authenticated()
create_folder_endpoint = '/core/createfolder'
params = {'path': path}
if folder_name:
params['name'] = folder_name
if subpath:
params['subpath'] = subpath
try:
response = self.session.post(
f"{self.server_url}{create_folder_endpoint}",
params=params,
cookies=self.session.cookies
)
return self._parse_response(response)
except Exception as e:
return {"success": False, "message": f"Failed to create folder: {str(e)}"}
def delete_file(self, path: str, name: str) -> Dict:
"""
Delete a file or folder.
Args:
path: Path where the file/folder is located
name: Name of the file/folder to delete
Returns:
Dict: Response from the server
"""
self._ensure_authenticated()
delete_endpoint = '/core/deletefile'
params = {
'path': path,
'name': name
}
try:
response = self.session.post(
f"{self.server_url}{delete_endpoint}",
params=params,
cookies=self.session.cookies
)
return self._parse_response(response)
except Exception as e:
return {"success": False, "message": f"Failed to delete file: {str(e)}"}
def rename_or_move(self, from_path: str, to_path: str, overwrite: bool = False) -> Dict:
"""
Rename or move a file or folder.
Args:
from_path: Full path to the source file/folder
to_path: Full path to the destination
overwrite: Whether to overwrite if destination exists
Returns:
Dict: Response from the server
"""
self._ensure_authenticated()
move_endpoint = '/core/renameormove'
params = {
'fromname': from_path,
'toname': to_path
}
if overwrite:
params['overwrite'] = 1
try:
response = self.session.post(
f"{self.server_url}{move_endpoint}",
params=params,
cookies=self.session.cookies
)
return self._parse_response(response)
except Exception as e:
return {"success": False, "message": f"Failed to rename or move: {str(e)}"}
def copy_file(self, path: str, name: str, copy_to: str, overwrite: bool = False) -> Dict:
"""
Copy a file to a new location.
Args:
path: Path where the file is located
name: Name of the file to copy
copy_to: Full path to the destination
overwrite: Whether to overwrite if destination exists
Returns:
Dict: Response from the server
"""
self._ensure_authenticated()
copy_endpoint = '/core/copyfile'
params = {
'path': path,
'name': name,
'copyto': copy_to
}
if overwrite:
params['overwrite'] = 1
try:
response = self.session.post(
f"{self.server_url}{copy_endpoint}",
params=params,
cookies=self.session.cookies
)
return self._parse_response(response)
except Exception as e:
return {"success": False, "message": f"Failed to copy file: {str(e)}"}
# Search and Metadata Operations
def search(self, search_string: str, search_scope: str = '0', search_location: Optional[str] = None,
min_size: Optional[int] = None, max_size: Optional[int] = None,
start: int = 0, limit: str = '10000') -> Dict:
"""
Search for files and folders.
Args:
search_string: String to search for
search_scope: Search scope (0=All, 1=Paths, 2=Content, 3=Metadata)
search_location: Path to search within (None for all accessible locations)
min_size: Minimum file size in KB
max_size: Maximum file size in KB
start: Start index for pagination
limit: Maximum number of results to return
Returns:
Dict: Search results
"""
self._ensure_authenticated()
search_endpoint = '/core/dosearch'
params = {
'searchstring': search_string,
'searchscope': search_scope,
'start': start,
'limit': limit,
'maxsearchentries' : limit,
'refresh': 1
}
if search_location:
params['searchloc'] = search_location
if min_size is not None:
params['minsize'] = min_size
if max_size is not None:
params['maxsize'] = max_size
try:
response = self.session.post(
f"{self.server_url}{search_endpoint}",
params=params,
cookies=self.session.cookies
)
while 'INPROGRESS' in response.text:
logger.info("Search is in progress, waiting for completion...")
time.sleep(5)
params['refresh'] = 0
response = self.session.post(
f"{self.server_url}{search_endpoint}",
params=params,
cookies=self.session.cookies
)
result = self._parse_response(response)
# Extract paths if it's a successful XML response
if result.get("success") and response.text.strip().startswith('<'):
result["paths"] = self._extract_paths_from_xml(response.text)
return result
except Exception as e:
return {"success": False, "message": f"Search failed: {str(e)}"}
def search_metadata(self, search_string: str = "**", search_scope: str = '3',
attributes: Optional[List[Dict[str, Any]]] = None,
search_location: Optional[str] = None) -> Dict:
"""
Search for files based on metadata attributes.
Args:
search_string: String to search for (use '**' for all files)
search_scope: Search scope (0=All, 1=Paths, 2=Content, 3=Metadata)
attributes: List of attribute dictionaries, each containing one of these formats:
- Format 1: {'id': 'attr_id', 'value': 'attr_value', 'type': type_int}
- Format 2: {'setid': 'set_id', 'attributeid': 'attr_id', 'operator': 'equals', 'value': 'attr_value'}
search_location: Optional path to limit the search to
Returns:
Dict: Search results with paths
Example:
# Search for files with multiple metadata criteria
client.search_metadata(
attributes=[
{"id": "abc123", "value": "Important", "type": 1},
{"id": "def456", "value": "2023", "type": 2}
],
search_location="/user/documents"
)
"""
self._ensure_authenticated()
search_endpoint = '/core/dosearch' # Using the correct endpoint according to API spec
params = {
'searchstring': search_string,
'searchscope': search_scope,
'limit' : 10000
}
if search_location:
params['location'] = search_location
# Handle multiple metadata attributes
if attributes and len(attributes) > 0:
#params['searchattributes'] = 1
params['searchattr_total'] = len(attributes)
for i, attr in enumerate(attributes):
# Check which format the attribute is in
if 'id' in attr:
# Format 1: id, value, type
params[f'searchattr{i}_id'] = attr['id']
params[f'attributevalue{i}'] = attr['value']
if 'type' in attr:
params[f'searchattr{i}type'] = attr['type_id']
elif 'attributeid' in attr:
# Format 2: setid, attributeid, operator, value
#params[f'attributesetid{i}'] = attr.get('setid')
params[f'searchattr{i}_id'] = attr.get('attributeid')
#params[f'attributeoperator{i}'] = attr.get('operator', 'equals')
params[f'searchattr{i}_value'] = attr.get('value')
params[f'searchattr{i}_type'] = int(attr.get('type_id'))
try:
logger.info(f"Searching metadata with params: {params}")
response = self.session.post(
f"{self.server_url}{search_endpoint}",
data=params,
cookies=self.session.cookies
)
logger.info("resposne: %s", response.text)
result = self._parse_response(response)
# Extract paths from search results if successful and XML response
if result.get('success', False) and hasattr(response, 'text') and response.text.strip().startswith('<'):
paths = self._extract_paths_from_xml(response.text)
result['paths'] = paths
return result
except Exception as e:
return {"success": False, "message": f"Failed to search metadata: {str(e)}"}
# Also keep the old method signature for backward compatibility
def search_metadata_single(self, search_string: str = "**", search_scope: str = '3',
attribute_id: Optional[str] = None, attribute_type: Optional[int] = None,
attribute_value: Optional[str] = None) -> Dict:
"""
Search for files based on a single metadata attribute.
This method is kept for backward compatibility. The search_metadata method is preferred.
Args:
search_string: String to search for (use '**' for all files)
search_scope: Search scope (0=All, 1=Paths, 2=Content, 3=Metadata)
attribute_id: Metadata attribute ID to search for
attribute_type: Metadata attribute type (1=Text, 2=Integer, etc.)
attribute_value: Metadata attribute value to match
Returns:
Dict: Search results with paths
"""
if attribute_id and attribute_value:
attr = {"id": attribute_id, "value": attribute_value}
if attribute_type:
attr["type"] = attribute_type
return self.search_metadata(search_string, search_scope, [attr])
else:
return self.search_metadata(search_string, search_scope)
def get_metadata_values(self, file_path: str) -> Dict:
"""
Get metadata values for a specific file.
Args:
file_path: Path to the file
Returns:
Dict: Metadata values for the file with support for multiple sets
"""
self._ensure_authenticated()
metadata_endpoint = '/core/getmetadatavalues'
params = {
'fullpath': file_path
}
try:
response = self.session.post(
f"{self.server_url}{metadata_endpoint}",
params=params,
cookies=self.session.cookies
)
result = self._parse_response(response)
# Additional logging for metadata values response
if result.get('success'):
logger.debug(f"Successfully retrieved metadata values for {file_path}")
else:
logger.warning(f"Failed to retrieve metadata values: {result.get('message', 'Unknown error')}")
return result
except Exception as e:
logger.error(f"Error in get_metadata_values: {str(e)}")
return {"success": False, "message": f"Failed to get metadata values: {str(e)}"}
def save_attribute_values(self, file_path: str, set_id: str, attribute_values: Dict[str, str]) -> Dict:
"""
Save metadata attribute values for a file or folder.
Args:
file_path: Full path to the file or folder
set_id: ID of the metadata set
attribute_values: Dictionary of attribute IDs and values
Returns:
Dict: Response from the server
"""
self._ensure_authenticated()
save_endpoint = '/core/saveattributevalues'
params = {
'fullpath': file_path, # Use the correct parameter name
'setid': set_id,
'attributes_total': len(attribute_values)
}
# Add attribute values to params in the format expected by the API
for i, (attr_id, value) in enumerate(attribute_values.items()):
params[f'attribute{i}_attributeid'] = attr_id
params[f'attribute{i}_value'] = value
try:
response = self.session.post(
f"{self.server_url}{save_endpoint}",
data=params,
cookies=self.session.cookies
)
return self._parse_response(response)
except Exception as e:
return {"success": False, "message": f"Failed to save attributes: {str(e)}"}
def get_file_info(self, file_path: str) -> Dict:
"""
Get information about a file.
Args:
file_path: Full path to the file
Returns:
Dict: File information
"""
self._ensure_authenticated()
info_endpoint = '/core/fileinfo'
params = {
'file': file_path
}
try:
response = self.session.post(
f"{self.server_url}{info_endpoint}",
params=params,
cookies=self.session.cookies
)
return self._parse_response(response)
except Exception as e:
return {"success": False, "message": f"Failed to get file info: {str(e)}"}
# Advanced Directory Operations
def create_directory_tree(self, base_path: str, target_path: str) -> Dict:
"""
Create a directory tree, creating all necessary parent directories.
Args:
base_path: Base path where to create the directories
target_path: Path structure to create (relative to base_path)
Returns:
Dict: Response from the server
"""
self._ensure_authenticated()
path_elements = target_path.strip('/').split('/')
walking = True
cumul_path = ""
# Process each path element
for i, p in enumerate(path_elements):
if not p: # Skip empty elements
continue
if walking:
# Check if this path element exists
info_endpoint = '/core/getfilelist'
params = {
'path': f"{base_path}{cumul_path}"
}
try:
response = self.session.post(
f"{self.server_url}{info_endpoint}",
params=params,
cookies=self.session.cookies
)
parsed = xmltodict.parse(response.text)
found = False
# Check if the directory exists
try:
entries = parsed.get('entries', {}).get('entry', [])
if not isinstance(entries, list):
entries = [entries]
for e in entries:
if e.get('name') == p and e.get('type') == 'dir':
found = True
break
except:
pass
if found:
# Directory exists, move to next level
cumul_path = f"{cumul_path}/{p}"
else:
# Need to create this directory and possibly all subdirectories
create_endpoint = '/core/createfolder'
remaining_path = "/".join(path_elements[i:])
create_params = {
'path': f"{base_path}{cumul_path}",
'subpath': remaining_path
}
create_response = self.session.post(
f"{self.server_url}{create_endpoint}",
params=create_params,
cookies=self.session.cookies
)
walking = False # Stop walking since we've created all directories
return self._parse_response(create_response)
except Exception as e:
return {"success": False, "message": f"Failed to create directory tree: {str(e)}"}
# If we walked through the entire path, all directories exist
return {"success": True, "message": "All directories already exist"}
# ACL Management Operations
def add_acl_entry(self, path: str, entry_type: str, value: str,
permissions: str, flag: Optional[str]=None) -> Dict:
"""
Add an Access Control List (ACL) entry to a file or folder.
Args:
path: Path to the file or folder
entry_type: Type of entry ('user' or 'group')
value: Username or group name
permissions: Permission string (e.g., 'R', 'W', 'D', 'S')
flag: Optional flag parameter - 'allow', 'deny'
Returns:
Dict: Response from the server
"""
self._ensure_authenticated()
endpoint = '/core/addaclentry'
params = {
'path': path,
'type': entry_type,
'value': value,
'perm': permissions,
'flag': 'allow'
}
#if flag:
params['flag'] = 'allow'
try:
response = self.session.post(
f"{self.server_url}{endpoint}",
params=params,
cookies=self.session.cookies
)
return self._parse_response(response)
except Exception as e:
return {"success": False, "message": f"Failed to add ACL entry: {str(e)}"}
def delete_acl_entry(self, path: str, entry_type: str, value: str) -> Dict:
"""
Delete an Access Control List (ACL) entry from a file or folder.
Args:
path: Path to the file or folder
entry_type: Type of entry ('user' or 'group')
value: Username or group name
Returns:
Dict: Response from the server
"""
self._ensure_authenticated()
endpoint = '/core/deleteaclentry'
params = {
'path': path,
'type': entry_type,
'value': value
}
try:
response = self.session.post(
f"{self.server_url}{endpoint}",
params=params,
cookies=self.session.cookies
)
return self._parse_response(response)
except Exception as e:
return {"success": False, "message": f"Failed to delete ACL entry: {str(e)}"}
def get_acl(self, path: str, filter_term: Optional[str] = None,
list_inherited: bool = False) -> Dict:
"""
Get Access Control List (ACL) for a specific path.
Args:
path: Path to the file or folder
filter_term: Optional filter to apply to the results
list_inherited: Whether to include inherited permissions
Returns:
Dict: ACL information
"""
self._ensure_authenticated()
endpoint = '/core/getacl'
params = {
'path': path
}
if filter_term:
params['filter'] = filter_term
if list_inherited:
params['listInherited'] = 1
try:
response = self.session.post(
f"{self.server_url}{endpoint}",
params=params,
cookies=self.session.cookies
)
return self._parse_response(response)
except Exception as e:
return {"success": False, "message": f"Failed to get ACL: {str(e)}"}
def get_effective_acl(self, path: str, emailid: Optional[str] = None) -> Dict:
"""
Get effective ACL permissions for a user on a specific path.
Used to check for effective permissions for a user for a particular shared path
such as read, write, delete, share or manage permissions.
Args:
path: Path to the file or folder
emailid: Optional username (emailid) to check permissions for (if None, checks for current user)
Returns:
Dict: Effective ACL information showing what permissions the user has
"""
self._ensure_authenticated()
endpoint = '/core/geteffectiveacl'
params = {
'path': path
}
# Add user parameter if specified
if emailid:
params['emailid'] = emailid
try:
response = self.session.post(
f"{self.server_url}{endpoint}",
params=params,
cookies=self.session.cookies
)
logger.info("response: %s", response.text)
return self._parse_response(response)
except Exception as e:
return {"success": False, "message": f"Failed to get effective ACL: {str(e)}"}
def get_acls(self, filter_text: Optional[str] = None,
start: int = 0, limit: int = 100) -> Dict:
"""
Get all Access Control Lists (ACLs) in the system.
Args:
filter_text: Optional text to filter results
start: Start index for pagination
limit: Maximum number of results to return
Returns:
Dict: List of ACLs
"""
self._ensure_authenticated()
endpoint = '/core/getacls'
params = {
'start': start,
'limit': limit
}
if filter_text:
params['filtertext'] = filter_text
try:
response = self.session.get(
f"{self.server_url}{endpoint}",
params=params,
cookies=self.session.cookies
)
return self._parse_response(response)
except Exception as e:
return {"success": False, "message": f"Failed to get ACLs: {str(e)}"}
def get_all_acls_for_path(self, path: str) -> Dict:
"""
Get all Access Control Lists (ACLs) for a specific path.
Args:
path: Path to the file or folder
Returns:
Dict: All ACL information for the path
"""
self._ensure_authenticated()
endpoint = '/core/getallaclsforpath'
params = {
'path': path
}
try:
response = self.session.get(
f"{self.server_url}{endpoint}",
params=params,
cookies=self.session.cookies
)
return self._parse_response(response)
except Exception as e:
return {"success": False, "message": f"Failed to get all ACLs for path: {str(e)}"}
# ...existing code...
# Share Management Operations
def add_user_to_share(self, user_id: str, share_id: str) -> Dict:
"""
Add a user to a private share.
Args:
user_id: ID or username of the user to add
share_id: ID of the share
Returns:
Dict: Response from the server
"""
self._ensure_authenticated()
endpoint = '/core/addusertoshare'
params = {
'userid': user_id,
'shareid': share_id
}
logger.info(f"Adding user {user_id} to share {share_id}")
try:
response = self.session.post(
f"{self.server_url}{endpoint}",
params=params,
cookies=self.session.cookies
)
return self._parse_response(response)
except Exception as e:
return {"success": False, "message": f"Failed to add user to share: {str(e)}"}
def add_users_to_share(self, share_id: str, users: List[str],
send_email: bool = True, custom_subject: Optional[str] = None,
custom_message: Optional[str] = None, send_copy: bool = False) -> Dict:
"""
Add multiple users to a share.
Args:
share_id: ID of the share
users: List of usernames or email addresses to add to the share
send_email: Whether to send notification emails to the users
custom_subject: Custom subject for the notification email
custom_message: Custom message for the notification email
send_copy: Whether to send a copy of the notification to the sharer
Returns:
Dict: Response from the server
"""
self._ensure_authenticated()
endpoint = '/core/adduserstoshare'
params = {
'shareid': share_id,
'users': ','.join(users),
'sendemail': 1 if send_email else 0,
'sendcopy': 1 if send_copy else 0
}
# Add optional parameters if provided
if custom_subject:
params['custom_subject'] = custom_subject
if custom_message:
params['custom_message'] = custom_message
try:
response = self.session.post(
f"{self.server_url}{endpoint}",
params=params,
cookies=self.session.cookies
)
return self._parse_response(response)
except Exception as e:
return {"success": False, "message": f"Failed to add users to share: {str(e)}"}
def add_group_to_share(self, group_id: str, share_id: str) -> Dict:
"""
Add a group to a private share.
Args:
group_id: ID of the group to add
share_id: ID of the share
Returns:
Dict: Response from the server
"""
self._ensure_authenticated()
endpoint = '/core/addgrouptoshare'
params = {
'groupid': group_id,
'shareid': share_id
}
try:
response = self.session.post(
f"{self.server_url}{endpoint}",
params=params,
cookies=self.session.cookies
)
return self._parse_response(response)
except Exception as e:
return {"success": False, "message": f"Failed to add group to share: {str(e)}"}
def delete_user_from_share(self, user_id: str, share_id: str) -> Dict:
"""
Remove a user from a private share.
Args:
user_id: ID or username of the user to remove
share_id: ID of the share
Returns:
Dict: Response from the server
"""
self._ensure_authenticated()
endpoint = '/core/deleteuserfromshare'
params = {
'userid': user_id,
'shareid': share_id
}
try:
response = self.session.post(
f"{self.server_url}{endpoint}",
params=params,
cookies=self.session.cookies
)
return self._parse_response(response)
except Exception as e:
return {"success": False, "message": f"Failed to delete user from share: {str(e)}"}
def delete_group_from_share(self, group_id: str, share_id: str) -> Dict:
"""
Remove a group from a private share.
Args:
group_id: ID of the group to remove
share_id: ID of the share
Returns:
Dict: Response from the server
"""
self._ensure_authenticated()
endpoint = '/core/deletegroupfromshare'
params = {
'groupid': group_id,
'shareid': share_id
}
try:
response = self.session.post(
f"{self.server_url}{endpoint}",
params=params,
cookies=self.session.cookies
)
return self._parse_response(response)
except Exception as e:
return {"success": False, "message": f"Failed to delete group from share: {str(e)}"}
def get_share_for_path(self, path: str, share_id: Optional[str] = None) -> Dict:
"""
Get information about a share for a specific path.
Args:
path: Path to the shared file or folder
share_id: Optional share ID to filter by
Returns:
Dict: Share information
"""
self._ensure_authenticated()
endpoint = '/core/getshareforpath'
params = {
'path': path
}
if share_id:
params['shareid'] = share_id
try:
response = self.session.post(
f"{self.server_url}{endpoint}",
params=params,
cookies=self.session.cookies
)
return self._parse_response(response)
except Exception as e:
return {"success": False, "message": f"Failed to get share for path: {str(e)}"}
def get_share_for_id(self, share_id: str) -> Dict:
"""
Get information about a share using its ID.
Args:
share_id: ID of the share
Returns:
Dict: Share information
"""
self._ensure_authenticated()
endpoint = '/core/getshareforid'
params = {
'shareid': share_id
}
try:
response = self.session.post(
f"{self.server_url}{endpoint}",
params=params,
cookies=self.session.cookies
)
return self._parse_response(response)
except Exception as e:
return {"success": False, "message": f"Failed to get share for ID: {str(e)}"}
def update_share(self, share_id: str, **kwargs) -> Dict:
"""
Update share information and settings.
Args:
share_id: ID of the share to update
**kwargs: Parameters to update, which can include:
- sharename: Name of the share
- sharelocation: Location of the shared file/folder
- validityperiod: Validity period in days
- expirytimestamp: Expiry timestamp
- sharesizelimit: Share size limit
- maxdownloads: Maximum number of downloads
- hidenotification: Hide notification (1 or 0)
- sharepassword: Password for protected share
- allowpublicaccess: Allow public access (1 or 0)
- allowpublicupload: Allow public upload (1 or 0)
- allowpublicviewonly: Allow public view only (1 or 0)
- allowpublicuploadonly: Allow public upload only (1 or 0)
- newshareowner: New owner of the share
- defaultfile: Default file to open when accessing the share
Returns:
Dict: Response from the server
"""
self._ensure_authenticated()
endpoint = '/core/updateshare'
# Prepare parameters
params = {'shareid': share_id}
# Add optional parameters from kwargs
for key, value in kwargs.items():
params[key] = value
try:
response = self.session.post(
f"{self.server_url}{endpoint}",
params=params,
cookies=self.session.cookies
)
return self._parse_response(response)
except Exception as e:
return {"success": False, "message": f"Failed to update share: {str(e)}"}
def add_share(self, share_location: str, share_name: str, share_size_limit: Optional[int] = None,
max_downloads: Optional[int] = None, validity_period: Optional[int] = None,
expiry_timestamp: Optional[int] = None) -> Dict:
"""
Create a new share.
Args:
share_location: Path to the file/folder to share
share_name: Name for the share
share_size_limit: Size limit for the share in bytes
max_downloads: Maximum number of downloads allowed
validity_period: Validity period in days
expiry_timestamp: Expiry timestamp
Returns:
Dict: Response from the server with the new share ID
"""
self._ensure_authenticated()
endpoint = '/core/addshare'
# Prepare parameters
params = {
'sharelocation': share_location,
'sharename': share_name
}
# Add optional parameters if provided
if share_size_limit is not None:
params['sharesizelimit'] = share_size_limit
if max_downloads is not None:
params['maxdownloads'] = max_downloads
if validity_period is not None:
params['validityperiod'] = validity_period
if expiry_timestamp is not None:
params['expirytimestamp'] = expiry_timestamp
try:
response = self.session.post(
f"{self.server_url}{endpoint}",
params=params,
cookies=self.session.cookies
)
return self._parse_response(response)
except Exception as e:
return {"success": False, "message": f"Failed to add share: {str(e)}"}
def update_share_link(self, share_id: str, old_share_link: str, new_share_link: str,
notify_users: bool = False) -> Dict:
"""
Update the URL of a share.
Args:
share_id: ID of the share
old_share_link: Current share link
new_share_link: New share link to set
notify_users: Whether to notify users about the link change
Returns:
Dict: Response from the server
"""
self._ensure_authenticated()
endpoint = '/core/updatesharelink'
params = {
'shareid': share_id,
'oldsharelink': 'url'+old_share_link,
'newsharelink': 'url'+new_share_link,
'notifyusers': 1 if notify_users else 0
}
try:
response = self.session.post(
f"{self.server_url}{endpoint}",
params=params,
cookies=self.session.cookies
)
return self._parse_response(response)
except Exception as e:
return {"success": False, "message": f"Failed to update share link: {str(e)}"}
def get_shared_link(self, share_location: str) -> Dict:
"""
Get a share link for a specific location.
Args:
share_location: Path to the shared file or folder
Returns:
Dict: Share link information
"""
self._ensure_authenticated()
endpoint = '/core/getsharedlink'
params = {
'sharelocation': share_location
}
try:
response = self.session.get(
f"{self.server_url}{endpoint}",
params=params,
cookies=self.session.cookies
)
return self._parse_response(response)
except Exception as e:
return {"success": False, "message": f"Failed to get shared link: {str(e)}"}
def set_user_access_for_share(self, share_id: str, user_id: str,
download: Optional[bool] = None,
write: Optional[bool] = None,
share: Optional[bool] = None,
sync: Optional[bool] = None,
allow_manage: Optional[bool] = None,
disallow_delete: Optional[bool] = None) -> Dict:
"""
Set permissions for a user on a shared file/folder.
Args:
share_id: ID of the share
user_id: ID or username of the user
download: Whether the user can download
write: Whether the user can write/edit
share: Whether the user can share
sync: Whether the user can sync
allow_manage: Whether the user can manage the share
disallow_delete: Whether to disallow deletion
Returns:
Dict: Response from the server
"""
self._ensure_authenticated()
endpoint = '/core/setuseraccessforshare'
# Prepare parameters
params = {
'shareid': share_id,
'userid': user_id
}
# Add optional permission parameters if specified
if download is not None:
params['download'] = 'true' if download else 'false'
if write is not None:
params['write'] = 'true' if write else 'false'
if share is not None:
params['share'] = 'true' if share else 'false'
if sync is not None:
params['sync'] = 'true' if sync else 'false'
if allow_manage is not None:
params['allowmanage'] = 'true' if allow_manage else 'false'
if disallow_delete is not None:
params['disallowdelete'] = 'true' if disallow_delete else 'false'
try:
response = self.session.get(
f"{self.server_url}{endpoint}",
params=params,
cookies=self.session.cookies
)
return self._parse_response(response)
except Exception as e:
return {"success": False, "message": f"Failed to set user access for share: {str(e)}"}
def set_group_access_for_share(self, share_id: str, group_id: str,
download: Optional[bool] = None,
write: Optional[bool] = None,
share: Optional[bool] = None,
sync: Optional[bool] = None,
allow_manage: Optional[bool] = None,
disallow_delete: Optional[bool] = None) -> Dict:
"""
Set permissions for a group on a shared file/folder.
Args:
share_id: ID of the share
group_id: ID of the group
download: Whether the group can download
write: Whether the group can write/edit
share: Whether the group can share
sync: Whether the group can sync
allow_manage: Whether the group can manage the share
disallow_delete: Whether to disallow deletion
Returns:
Dict: Response from the server
"""
self._ensure_authenticated()
endpoint = '/core/setgroupaccessforshare'
# Prepare parameters
params = {
'shareid': share_id,
'groupid': group_id
}
# Add optional permission parameters if specified
if download is not None:
params['download'] = 'true' if download else 'false'
if write is not None:
params['write'] = 'true' if write else 'false'
if share is not None:
params['share'] = 'true' if share else 'false'
if sync is not None:
params['sync'] = 'true' if sync else 'false'
if allow_manage is not None:
params['allowmanage'] = 'true' if allow_manage else 'false'
if disallow_delete is not None:
params['disallowdelete'] = 'true' if disallow_delete else 'false'
try:
response = self.session.get(
f"{self.server_url}{endpoint}",
params=params,
cookies=self.session.cookies
)
return self._parse_response(response)
except Exception as e:
return {"success": False, "message": f"Failed to set group access for share: {str(e)}"}
# Add this method to the FileCloudAPI class
def delete_share(self, share_id: str) -> Dict:
"""
Delete a share.
Args:
share_id: ID of the share to delete
Returns:
Dict: Response from the server
"""
self._ensure_authenticated()
endpoint = '/core/deleteshare'
params = {
'shareid': share_id
}
try:
response = self.session.post(
f"{self.server_url}{endpoint}",
params=params,
cookies=self.session.cookies
)
return self._parse_response(response)
except Exception as e:
return {"success": False, "message": f"Failed to delete share: {str(e)}"}
def get_users_for_share(self, share_id: str) -> Dict:
"""
Get a list of users that are added explicitly to a share.
Args:
share_id: ID of the share
Returns:
Dict: Response containing the list of users with their access permissions
"""
self._ensure_authenticated()
endpoint = '/core/getusersforshare'
params = {
'shareid': share_id
}
try:
response = self.session.post(
f"{self.server_url}{endpoint}",
params=params,
cookies=self.session.cookies
)
return self._parse_response(response)
except Exception as e:
return {"success": False, "message": f"Failed to get users for share: {str(e)}"}
# Automation Workflow Operations
def get_automation_workflows(self, include_disabled: bool = False) -> Dict:
"""
Retrieve existing automation workflows.
Args:
include_disabled: Whether to include disabled workflows in the results
Returns:
Dict: List of available workflows
"""
self._ensure_authenticated()
workflows_endpoint = '/core/getautomationworkflows'
params = {}
if include_disabled:
params['include_disabled'] = 1
try:
# Make sure to use GET method
response = self.session.get(
f"{self.server_url}{workflows_endpoint}",
params=params,
cookies=self.session.cookies
)
return self._parse_response(response)
except Exception as e:
return {"success": False, "message": f"Failed to get automation workflows: {str(e)}"}
def get_automation_workflow_details(self, workflow_id: str) -> Dict:
"""
Retrieve details of a specific automation workflow.
Args:
workflow_id: ID of the workflow to retrieve
Returns:
Dict: Workflow details
"""
self._ensure_authenticated()
workflow_endpoint = '/core/getautomationworkflow'
params = {
'id': workflow_id
}
try:
# Use GET method
response = self.session.get(
f"{self.server_url}{workflow_endpoint}",
params=params,
cookies=self.session.cookies
)
return self._parse_response(response)
except Exception as e:
return {"success": False, "message": f"Failed to get workflow details: {str(e)}"}
def update_automation_workflow(self, workflow_id: str, workflow_data: Dict) -> Dict:
"""
Update an existing automation workflow.
Args:
workflow_id: ID of the workflow to update
workflow_data: Dictionary containing the updated workflow configuration
Returns:
Dict: Response from the server
"""
self._ensure_authenticated()
update_endpoint = '/core/updateautomationworkflow'
# Prepare the workflow data for submission
params = {
'id': workflow_id,
**self._prepare_workflow_data(workflow_data)
}
try:
# Keep using POST for update operations
response = self.session.post(
f"{self.server_url}{update_endpoint}",
data=params, # Use data instead of params for POST
cookies=self.session.cookies
)
return self._parse_response(response)
except Exception as e:
return {"success": False, "message": f"Failed to update workflow: {str(e)}"}
def create_automation_workflow(self, workflow_data: Dict) -> Dict:
"""
Create a new automation workflow.
Args:
workflow_data: Dictionary containing the workflow configuration
Returns:
Dict: Response from the server with the new workflow ID
"""
self._ensure_authenticated()
create_endpoint = '/core/createautomationworkflow'
# Prepare the workflow data for submission
params = self._prepare_workflow_data(workflow_data)
try:
# Use POST for create operations
response = self.session.post(
f"{self.server_url}{create_endpoint}",
data=params, # Use data for POST requests
cookies=self.session.cookies
)
return self._parse_response(response)
except Exception as e:
return {"success": False, "message": f"Failed to create workflow: {str(e)}"}
def delete_automation_workflow(self, workflow_id: str) -> Dict:
"""
Delete an automation workflow.
Args:
workflow_id: ID of the workflow to delete
Returns:
Dict: Response from the server
"""
self._ensure_authenticated()
delete_endpoint = '/core/deleteautomationworkflow'
params = {
'id': workflow_id
}
try:
response = self.session.post(
f"{self.server_url}{delete_endpoint}",
params=params,
cookies=self.session.cookies
)
return self._parse_response(response)
except Exception as e:
return {"success": False, "message": f"Failed to delete workflow: {str(e)}"}
def start_automation_workflow(self, workflow_id: str, path: str, file_names: Optional[List[str]] = None) -> Dict:
"""
Start an automation workflow on specific files.
Args:
workflow_id: ID of the workflow to start
path: Path where the files are located
file_names: List of file names to process (if None, processes all files in the path)
Returns:
Dict: Response from the server
"""
self._ensure_authenticated()
start_endpoint = '/core/startautomationworkflow'
params = {
'id': workflow_id,
'path': path
}
# If specific files are provided, add them to the request
if file_names and len(file_names) > 0:
params['count'] = len(file_names)
for i, name in enumerate(file_names, 1):
params[f'fn{i}'] = name
try:
response = self.session.post(
f"{self.server_url}{start_endpoint}",
params=params,
cookies=self.session.cookies
)
return self._parse_response(response)
except Exception as e:
return {"success": False, "message": f"Failed to start workflow: {str(e)}"}
def get_automation_workflow_runs(self, workflow_id: Optional[str] = None,
start: int = 0, limit: int = 100) -> Dict:
"""
Get the history of automation workflow runs.
Args:
workflow_id: Optional ID to filter by specific workflow
start: Starting index for pagination
limit: Maximum number of results to return
Returns:
Dict: List of workflow runs
"""
self._ensure_authenticated()
runs_endpoint = '/core/getautomationworkflowruns'
params = {
'start': start,
'limit': limit
}
if workflow_id:
params['id'] = workflow_id
try:
# Use GET method
response = self.session.get(
f"{self.server_url}{runs_endpoint}",
params=params,
cookies=self.session.cookies
)
return self._parse_response(response)
except Exception as e:
return {"success": False, "message": f"Failed to get workflow runs: {str(e)}"}
def get_automation_workflow_run_details(self, run_id: str) -> Dict:
"""
Get details about a specific workflow run.
Args:
run_id: ID of the workflow run to get details for
Returns:
Dict: Detailed information about the workflow run
"""
self._ensure_authenticated()
details_endpoint = '/core/getautomationworkflowrundetails'
params = {
'id': run_id
}
try:
# Use GET method
response = self.session.get(
f"{self.server_url}{details_endpoint}",
params=params,
cookies=self.session.cookies
)
return self._parse_response(response)
except Exception as e:
return {"success": False, "message": f"Failed to get workflow run details: {str(e)}"}
def _prepare_workflow_data(self, workflow_data: Dict) -> Dict:
"""
Helper method to prepare workflow data for API submission.
Args:
workflow_data: Dictionary containing workflow configuration
Returns:
Dict: Formatted parameters for API request
"""
params = {}
# Map common workflow properties
for key in ['name', 'description', 'enabled', 'trigger_type', 'path_filter', 'user_filter']:
if key in workflow_data:
params[key] = workflow_data[key]
# Handle conditions (if present)
if 'conditions' in workflow_data:
conditions = workflow_data['conditions']
params['conditions_total'] = len(conditions)
for i, condition in enumerate(conditions, 1): # Start index at 1
for condition_key, value in condition.items():
params[f'condition{i}_{condition_key}'] = value
# Handle actions (if present)
if 'actions' in workflow_data:
actions = workflow_data['actions']
params['actions_total'] = len(actions)
for i, action in enumerate(actions, 1): # Start index at 1
for action_key, value in action.items():
# Special handling for action parameters which may be nested
if action_key == 'parameters' and isinstance(value, dict):
for param_key, param_value in value.items():
# Special handling for attributes array
if param_key == 'attributes' and isinstance(param_value, list):
params[f'action{i}_parameter_{param_key}_total'] = len(param_value)
for j, attr in enumerate(param_value, 1): # Start index at 1
for attr_key, attr_val in attr.items():
params[f'action{i}_parameter_{param_key}{j}_{attr_key}'] = attr_val
# Special handling for users array
elif param_key == 'users' and isinstance(param_value, list):
params[f'action{i}_parameter_{param_key}'] = ','.join(param_value)
else:
params[f'action{i}_parameter_{param_key}'] = param_value
else:
params[f'action{i}_{action_key}'] = value
# Handle schedule (if present)
if 'schedule' in workflow_data:
schedule = workflow_data['schedule']
for schedule_key, value in schedule.items():
params[f'schedule_{schedule_key}'] = value
return params
def get_metadata_sets(self) -> Dict:
"""
Get all available metadata sets.
Returns:
Dict: Raw response containing metadata sets information
"""
self._ensure_authenticated_admin()
try:
endpoint = '/admin/getmetadatasetdefinitions'
# Make API call directly using session instead of _api_request
response = self.session.post(
f"{self.server_url}{endpoint}",
data={'end': '100'},
cookies=self.session.cookies
)
# Return the parsed response without additional processing
result = self._parse_response(response)
# Log successful retrieval
if result.get('success'):
logger.debug(f"Successfully retrieved metadata sets")
else:
logger.warning(f"Failed to retrieve metadata sets: {result.get('message')}")
return result
except Exception as e:
logger.error(f"Error in get_metadata_sets: {str(e)}")
return {
"success": False,
"message": f"Failed to get metadata sets: {str(e)}"
}
def get_metadata_attributes(self, set_id: str) -> Dict:
"""
Get attributes for a specific metadata set.
Args:
set_id: ID of the metadata set
Returns:
Dict: Raw response containing metadata attributes
"""
self._ensure_authenticated_admin()
endpoint = '/admin/getmetadataset'
# Make direct API call
try:
response = self.session.post(
f"{self.server_url}{endpoint}",
data={'setId': set_id},
cookies=self.session.cookies
)
# Return parsed response without additional formatting
result = self._parse_response(response)
# Log result
if result.get('success'):
logger.debug(f"Successfully retrieved metadata attributes for set {set_id}")
else:
logger.warning(f"Failed to retrieve metadata attributes: {result.get('message')}")
return result
except Exception as e:
logger.error(f"Error in get_metadata_attributes: {str(e)}")
return {
"success": False,
"message": f"Failed to get metadata attributes: {str(e)}"
}
def get_metadata_values(self, file_path: str) -> Dict:
"""
Get metadata values for a specific file.
Args:
file_path: Path to the file
Returns:
Dict: Metadata values for the file with support for multiple sets
"""
self._ensure_authenticated()
metadata_endpoint = '/core/getmetadatavalues'
params = {
'fullpath': file_path
}
try:
response = self.session.post(
f"{self.server_url}{metadata_endpoint}",
params=params,
cookies=self.session.cookies
)
result = self._parse_response(response)
# Additional logging for metadata values response
if result.get('success'):
logger.debug(f"Successfully retrieved metadata values for {file_path}")
else:
logger.warning(f"Failed to retrieve metadata values: {result.get('message', 'Unknown error')}")
return result
except Exception as e:
logger.error(f"Error in get_metadata_values: {str(e)}")
return {"success": False, "message": f"Failed to get metadata values: {str(e)}"}
def save_attribute_values(self, file_path: str, set_id: str, attribute_values: Dict[str, str]) -> Dict:
"""
Save metadata attribute values for a file or folder.
Args:
file_path: Full path to the file or folder
set_id: ID of the metadata set
attribute_values: Dictionary of attribute IDs and values
Returns:
Dict: Response from the server
"""
self._ensure_authenticated()
save_endpoint = '/core/saveattributevalues'
params = {
'fullpath': file_path, # Use the correct parameter name
'setid': set_id,
'attributes_total': len(attribute_values)
}
# Add attribute values to params in the format expected by the API
for i, (attr_id, value) in enumerate(attribute_values.items()):
params[f'attribute{i}_attributeid'] = attr_id
params[f'attribute{i}_value'] = value
try:
response = self.session.post(
f"{self.server_url}{save_endpoint}",
data=params,
cookies=self.session.cookies
)
return self._parse_response(response)
except Exception as e:
return {"success": False, "message": f"Failed to save attributes: {str(e)}"}
def add_set_to_file_object(self, file_path: str, set_id: str) -> Dict:
"""
Add a metadata set to a file or folder.
Args:
file_path: Full path to the file or folder
set_id: ID of the metadata set to add
Returns:
Dict: Response from the server
"""
self._ensure_authenticated()
endpoint = '/core/addsettofileobject'
params = {
'fullpath': file_path,
'setid': set_id
}
try:
response = self.session.post(
f"{self.server_url}{endpoint}",
data=params,
cookies=self.session.cookies
)
return self._parse_response(response)
except Exception as e:
return {"success": False, "message": f"Failed to add metadata set to file object: {str(e)}"}
# ==========================================
# Controlled Document System Extensions
# ==========================================
def create_document_folder(self, folder_path: str, create_parents: bool = True) -> Dict[str, Any]:
"""
Create a folder for document storage.
Args:
folder_path: Folder path to create
create_parents: Whether to create parent folders if they don't exist
Returns:
Response dictionary with status information
"""
if create_parents:
# Split path and create parent folders recursively
parts = folder_path.strip('/').split('/')
current_path = ""
for part in parts:
if part:
current_path += f"/{part}"
# Check if folder exists
folder_exists = self.check_folder_exists(current_path)
if not folder_exists:
# Create folder
self.create_folder(current_path)
return {"success": True, "path": folder_path}
else:
# Just create the specified folder
return self.create_folder(folder_path)
def upload_controlled_document(
self,
file_data: Union[bytes, BinaryIO],
folder_path: str,
filename: str,
metadata: Dict[str, Any] = None,
version_comment: str = None
) -> Dict[str, Any]:
"""
Upload a controlled document with special handling.
Args:
file_data: File content as bytes or file-like object
folder_path: Target folder path
filename: Filename to use
metadata: Document metadata to attach
version_comment: Comment for version history
Returns:
Response dictionary with uploaded file information
"""
# Ensure folder exists
self.create_document_folder(folder_path)
# Full path for file
full_path = f"{folder_path}/{filename}"
# Check if file already exists (for versioning)
file_exists = self.check_file_exists(full_path)
# Upload or update file
if file_exists:
result = self.update_file(full_path, file_data, version_comment)
else:
result = self.upload_file(folder_path, filename, file_data)
# Add metadata if provided
if metadata and result.get('success', False):
self.set_file_metadata(full_path, metadata)
# Add metadata to result
result['metadata'] = metadata
return result
def get_document_with_metadata(self, file_path: str) -> Dict[str, Any]:
"""
Get document content and metadata in a single call.
Args:
file_path: Path to the document
Returns:
Dictionary with file content and metadata
"""
# Get file content
content_result = self.download_file(file_path)
# Get metadata
metadata_result = self.get_file_metadata(file_path)
# Get version history
version_result = self.get_file_versions(file_path)
# Combine results
return {
"success": content_result.get('success', False),
"file_path": file_path,
"content": content_result.get('content'),
"metadata": metadata_result.get('metadata', {}),
"versions": version_result.get('versions', []),
"error": content_result.get('error') or metadata_result.get('error')
}
def set_file_metadata(self, file_path: str, metadata: Dict[str, Any]) -> Dict[str, Any]:
"""
Set custom metadata for a file.
Args:
file_path: Path to the file
metadata: Dictionary of metadata key-value pairs
Returns:
Response dictionary
"""
# Format metadata for FileCloud API
formatted_metadata = self._format_metadata_for_api(metadata)
# API endpoint
endpoint = "core/setCustomMetadata"
# Request data
request_data = {
"filepath": file_path,
"custommetadata": formatted_metadata
}
# Make API call
response = self._api_request(endpoint, params=request_data,method='POST')
return {
"success": self._is_success(response),
"file_path": file_path,
"metadata": metadata,
"error": self._get_error_message(response)
}
def get_file_metadata(self, file_path: str) -> Dict[str, Any]:
"""
Get custom metadata for a file.
Args:
file_path: Path to the file
Returns:
Response dictionary with metadata
"""
# API endpoint
endpoint = "core/getMetadata"
# Request data
request_data = {
"filepath": file_path
}
# Make API call
response = self._api_request(endpoint, params=request_data,method='POST')
# Extract metadata from response
metadata = {}
if self._is_success(response) and 'metadata' in response:
raw_metadata = response['metadata']
# Parse custom metadata
if 'custommetadata' in raw_metadata:
metadata = self._parse_api_metadata(raw_metadata['custommetadata'])
return {
"success": self._is_success(response),
"file_path": file_path,
"metadata": metadata,
"error": self._get_error_message(response)
}
def get_file_versions(self, file_path: str) -> Dict[str, Any]:
"""
Get version history for a file.
Args:
file_path: Path to the file
Returns:
Response dictionary with version history
"""
# API endpoint
endpoint = "core/getVersions"
# Request data
request_data = {
"filepath": file_path
}
# Make API call
response = self._api_request(endpoint, params=request_data,method='POST')
# Extract versions
versions = []
if self._is_success(response) and 'versioninfo' in response:
versions = response['versioninfo']
return {
"success": self._is_success(response),
"file_path": file_path,
"versions": versions,
"error": self._get_error_message(response)
}
def restore_file_version(self, file_path: str, version_id: str) -> Dict[str, Any]:
"""
Restore a previous version of a file.
Args:
file_path: Path to the file
version_id: Version ID to restore
Returns:
Response dictionary
"""
# API endpoint
endpoint = "core/restoreVersion"
# Request data
request_data = {
"filepath": file_path,
"versionid": version_id
}
# Make API call
response = self._api_request(endpoint, params=request_data,method='POST')
return {
"success": self._is_success(response),
"file_path": file_path,
"version_id": version_id,
"error": self._get_error_message(response)
}
def set_document_permissions(
self,
file_path: str,
users: Dict[str, List[str]] = None,
groups: Dict[str, List[str]] = None
) -> Dict[str, Any]:
"""
Set document access permissions.
Args:
file_path: Path to the document
users: Dictionary mapping usernames to permission lists
groups: Dictionary mapping groups to permission lists
Returns:
Response dictionary
"""
# Create permission structure expected by FileCloud
permissions = []
# Add user permissions
if users:
for username, perms in users.items():
permissions.append({
"type": "user",
"name": username,
"permissions": perms
})
# Add group permissions
if groups:
for groupname, perms in groups.items():
permissions.append({
"type": "group",
"name": groupname,
"permissions": perms
})
# API endpoint
endpoint = "core/setPermissions"
# Request data
request_data = {
"filepath": file_path,
"permissions": json.dumps(permissions)
}
# Make API call
response = self._api_request(endpoint, params=request_data,method='POST')
return {
"success": self._is_success(response),
"file_path": file_path,
"error": self._get_error_message(response)
}
def create_document_share(
self,
file_path: str,
share_type: str = "view",
password: Optional[str] = None,
expiry_days: Optional[int] = None,
notify_emails: Optional[List[str]] = None
) -> Dict[str, Any]:
"""
Create a shareable link for a document.
Args:
file_path: Path to the document
share_type: Type of share (view, download, upload, collaborate)
password: Optional password protection
expiry_days: Number of days until link expires
notify_emails: List of emails to notify about the share
Returns:
Response dictionary with share link
"""
# API endpoint
endpoint = "core/createShare"
# Request data
request_data = {
"filepath": file_path,
"type": share_type
}
# Add optional parameters
if password:
request_data["password"] = password
if expiry_days is not None:
# Convert to seconds
expiry_time = int(time.time()) + (expiry_days * 86400)
request_data["expiry"] = expiry_time
if notify_emails:
request_data["emails"] = ",".join(notify_emails)
# Make API call
response = self._api_request(endpoint, params=request_data,method='POST')
# Extract share URL
share_url = None
if self._is_success(response) and 'shareinfo' in response:
share_url = response['shareinfo'].get('url')
return {
"success": self._is_success(response),
"file_path": file_path,
"share_url": share_url,
"share_id": response.get('shareinfo', {}).get('id'),
"error": self._get_error_message(response)
}
def search_documents(
self,
search_text: str,
folder_path: Optional[str] = None,
metadata: Optional[Dict[str, str]] = None,
doc_type: Optional[str] = None,
max_results: int = 100
) -> Dict[str, Any]:
"""
Search for documents using text and metadata.
Args:
search_text: Text to search for
folder_path: Optional folder path to limit search
metadata: Optional metadata criteria as key-value pairs
doc_type: Optional document type to filter by
max_results: Maximum number of results to return
Returns:
Response dictionary with search results
"""
# API endpoint
endpoint = "core/searchFiles"
# Build query
query = search_text
# Add metadata constraints if provided
if metadata:
for key, value in metadata.items():
query += f" metadata:{key}:{value}"
# Add document type constraint if provided
if doc_type:
query += f" metadata:doc_type:{doc_type}"
# Request data
request_data = {
"searchtext": query,
"maxhits": max_results
}
# Add folder path constraint if provided
if folder_path:
request_data["folderpath"] = folder_path
# Make API call
response = self._api_request(endpoint, params=request_data,method='POST')
# Extract search results
results = []
if self._is_success(response) and 'searchresults' in response:
results = response['searchresults']
return {
"success": self._is_success(response),
"results": results,
"count": len(results),
"error": self._get_error_message(response)
}
def start_document_workflow(
self,
file_path: str,
workflow_name: str,
workflow_data: Dict[str, Any]
) -> Dict[str, Any]:
"""
Start a workflow for a document.
Args:
file_path: Path to the document
workflow_name: Name of the workflow to start
workflow_data: Initial workflow data
Returns:
Response dictionary with workflow information
"""
# API endpoint
endpoint = "workflow/startWorkflow"
# Request data
request_data = {
"filepath": file_path,
"workflowname": workflow_name,
"workflowdata": json.dumps(workflow_data)
}
# Make API call
response = self._api_request(endpoint, params=request_data,method='POST')
return {
"success": self._is_success(response),
"file_path": file_path,
"workflow_id": response.get('workflowid'),
"error": self._get_error_message(response)
}
def check_folder_exists(self, folder_path: str) -> bool:
"""
Check if a folder exists.
Args:
folder_path: Path to check
Returns:
True if folder exists, False otherwise
"""
try:
# Ensure client is authenticated
self._ensure_authenticated()
# API endpoint
endpoint = '/core/fileexists'
url = f"{self.server_url}{endpoint}"
# Make request directly with data parameter
response = self.session.post(
url,
data={"file": folder_path},
cookies=self.session.cookies
)
logger.debug(f"Response status: {response.status_code}")
logger.debug(f"Response text: {response.text[:500]}")
# Parse the response
if response.status_code == 200:
# Check for known success patterns
if response.text.strip() == "1":
return True
elif response.text.strip() == "0":
return False
# Try parsing as JSON if possible
try:
data = response.json()
if isinstance(data, dict):
return data.get('success', False)
except:
pass
# Try parsing XML response
if '<result>1</result>' in response.text:
return True
elif '<result>0</result>' in response.text:
return False
# Check for success/error message patterns
if 'success' in response.text.lower():
return True
elif 'not found' in response.text.lower() or 'error' in response.text.lower():
return False
# Default to false for any other case
return False
except Exception as e:
logger.error(f"Error in check_folder_exists: {e}")
return False
def check_file_exists(self, file_path: str) -> bool:
"""
Check if a file exists.
Args:
file_path: Path to check
Returns:
True if file exists, False otherwise
"""
try:
# Ensure client is authenticated
self._ensure_authenticated()
# API endpoint
endpoint = '/core/fileexists'
url = f"{self.server_url}{endpoint}"
# Make request directly with data parameter
response = self.session.post(
url,
data={"file": file_path},
cookies=self.session.cookies
)
logger.debug(f"Response status: {response.status_code}")
logger.debug(f"Response text: {response.text[:500]}")
# Parse the response
if response.status_code == 200:
# Check for known success patterns
if response.text.strip() == "1":
return True
elif response.text.strip() == "0":
return False
# Try parsing as JSON if possible
try:
data = response.json()
if isinstance(data, dict):
return data.get('success', False)
except:
pass
# Try parsing XML response
if '<result>1</result>' in response.text:
return True
elif '<result>0</result>' in response.text:
return False
# Check for success/error message patterns
if 'success' in response.text.lower():
return True
elif 'not found' in response.text.lower() or 'error' in response.text.lower():
return False
# Default to false for any other case
return False
except Exception as e:
logger.error(f"Error in check_file_exists: {e}")
return False
# ==========================================
# Helper Methods
# ==========================================
def _format_metadata_for_api(self, metadata: Dict[str, Any]) -> str:
"""
Format metadata dictionary for FileCloud API.
Args:
metadata: Dictionary of metadata
Returns:
Formatted metadata string
"""
formatted = []
for key, value in metadata.items():
# Convert non-string values to JSON strings
if not isinstance(value, str):
value = json.dumps(value)
formatted.append(f"{key}={value}")
return "|".join(formatted)
def _parse_api_metadata(self, metadata_str: str) -> Dict[str, Any]:
"""
Parse metadata string from FileCloud API.
Args:
metadata_str: Metadata string from API
Returns:
Dictionary of parsed metadata
"""
metadata = {}
if not metadata_str:
return metadata
# Split by the separator
items = metadata_str.split("|")
for item in items:
if "=" in item:
key, value = item.split("=", 1)
# Try to parse JSON values
try:
parsed_value = json.loads(value)
metadata[key] = parsed_value
except (json.JSONDecodeError, TypeError):
# Keep as string if not valid JSON
metadata[key] = value
return metadata
def _api_request(self, endpoint: str, data: dict = None, params: dict = None, files: dict = None, method: str = "POST") -> dict:
"""
Make a request to the FileCloud API.
Args:
endpoint: API endpoint to call (without server URL)
data: Dictionary of form data to send
params: Dictionary of URL parameters
files: Dictionary of files to upload
method: HTTP method (POST or GET)
Returns:
Dictionary with parsed response
"""
self._ensure_authenticated()
# Prepare URL
url = f"{self.server_url}{endpoint}"
try:
# Make the request based on method
if method.upper() == "GET":
response = self.session.get(
url,
params=params,
cookies=self.session.cookies
)
else: # Default to POST
response = self.session.post(
url,
data=data,
params=params,
files=files,
cookies=self.session.cookies
)
# Parse and return the response
print(f"API request to {url} returned status code {response.status_code}")
return self._parse_response(response)
except Exception as e:
return {
"success": False,
"message": f"API request failed: {str(e)}"
}
Parameters
| Name | Type | Default | Kind |
|---|---|---|---|
bases |
- | - |
Parameter Details
bases: Parameter of type
Return Value
Returns unspecified type
Class Interface
Methods
__init__(self, server_url, username, password)
Purpose: Initialize the FileCloud API client. Args: server_url: The URL of the FileCloud server (e.g., 'https://filecloud.example.com/') username: Username for authentication (optional if authenticating later) password: Password for authentication (optional if authenticating later)
Parameters:
server_url: Type: strusername: Type: strpassword: Type: str
Returns: None
login(self, username, password) -> bool
Purpose: Authenticate with the FileCloud server. Args: username: Override the username provided during initialization password: Override the password provided during initialization Returns: bool: True if authentication is successful, False otherwise.
Parameters:
username: Type: strpassword: Type: str
Returns: Returns bool
adminlogin(self, username, password) -> bool
Purpose: Authenticate with the FileCloud server. Args: username: Override the username provided during initialization password: Override the password provided during initialization Returns: bool: True if authentication is successful, False otherwise.
Parameters:
username: Type: strpassword: Type: str
Returns: Returns bool
authenticate(self, username, password) -> bool
Purpose: Alias for login() for backward compatibility
Parameters:
username: Type: strpassword: Type: str
Returns: Returns bool
logout(self) -> Dict
Purpose: Logout from the FileCloud server. Returns: Dict: Response from the server.
Returns: Returns Dict
_ensure_authenticated(self)
Purpose: Ensure that the client is authenticated with the server. Attempts to login if not already authenticated. Raises: Exception: If authentication fails.
Returns: None
_ensure_authenticated_admin(self)
Purpose: Ensure that the client is authenticated with the server. Attempts to login if not already authenticated. Raises: Exception: If authentication fails.
Returns: None
_parse_response(self, response) -> Dict
Purpose: Parse response based on content type. Args: response: The HTTP response object. Returns: Dict: Parsed response as a dictionary.
Parameters:
response: Type: requests.Response
Returns: Returns Dict
_xml_to_dict(self, element)
Purpose: Convert an XML element to a dictionary. Args: element: XML element to convert Returns: Dict or str: Converted element
Parameters:
element: Parameter
Returns: See docstring for return details
_extract_paths_from_xml(self, xml_content) -> List[str]
Purpose: Extract file paths from XML search results. Args: xml_content: XML content from search results Returns: List[str]: List of file paths
Parameters:
xml_content: Type: str
Returns: Returns List[str]
upload_file(self, local_file_path, remote_path, filename, overwrite) -> Dict
Purpose: Upload a file to the FileCloud server. Args: local_file_path: Path to the local file to upload remote_path: Directory path on the server where to upload the file filename: Optional name to use for the file on the server (default: original filename) overwrite: Whether to overwrite existing files with the same name Returns: Dict: Response from the server
Parameters:
local_file_path: Type: strremote_path: Type: strfilename: Type: Optional[str]overwrite: Type: bool
Returns: Returns Dict
download_file(self, remote_file_path, local_file_path, check_only) -> Union[Dict, bytes]
Purpose: Download a file from the FileCloud server. Args: remote_file_path: Full path to the file on the server local_file_path: Path where to save the downloaded file (if None, returns content) check_only: If True, only checks if the file is downloadable without downloading Returns: Union[Dict, bytes]: Either a response dict or the file content as bytes
Parameters:
remote_file_path: Type: strlocal_file_path: Type: Optional[str]check_only: Type: bool
Returns: Returns Union[Dict, bytes]
get_file_list(self, path, sort_by, sort_dir, start, limit, include_metadata) -> Dict
Purpose: Get a list of files and directories in the specified path. Args: path: Path to list files from sort_by: Field to sort by ("name", "date", or "size") sort_dir: Sort direction (1 for ascending, -1 for descending) start: Index to start from (for pagination) limit: Maximum number of entries to return (-1 for unlimited) include_metadata: Whether to include metadata information Returns: Dict: Response containing the file list
Parameters:
path: Type: strsort_by: Type: strsort_dir: Type: intstart: Type: intlimit: Type: intinclude_metadata: Type: bool
Returns: Returns Dict
create_folder(self, path, folder_name, subpath) -> Dict
Purpose: Create a new folder on the server. Args: path: Path where to create the folder folder_name: Name of the folder to create subpath: Alternative to folder_name, creates all missing folders in this path Returns: Dict: Response from the server
Parameters:
path: Type: strfolder_name: Type: Optional[str]subpath: Type: Optional[str]
Returns: Returns Dict
delete_file(self, path, name) -> Dict
Purpose: Delete a file or folder. Args: path: Path where the file/folder is located name: Name of the file/folder to delete Returns: Dict: Response from the server
Parameters:
path: Type: strname: Type: str
Returns: Returns Dict
rename_or_move(self, from_path, to_path, overwrite) -> Dict
Purpose: Rename or move a file or folder. Args: from_path: Full path to the source file/folder to_path: Full path to the destination overwrite: Whether to overwrite if destination exists Returns: Dict: Response from the server
Parameters:
from_path: Type: strto_path: Type: stroverwrite: Type: bool
Returns: Returns Dict
copy_file(self, path, name, copy_to, overwrite) -> Dict
Purpose: Copy a file to a new location. Args: path: Path where the file is located name: Name of the file to copy copy_to: Full path to the destination overwrite: Whether to overwrite if destination exists Returns: Dict: Response from the server
Parameters:
path: Type: strname: Type: strcopy_to: Type: stroverwrite: Type: bool
Returns: Returns Dict
search(self, search_string, search_scope, search_location, min_size, max_size, start, limit) -> Dict
Purpose: Search for files and folders. Args: search_string: String to search for search_scope: Search scope (0=All, 1=Paths, 2=Content, 3=Metadata) search_location: Path to search within (None for all accessible locations) min_size: Minimum file size in KB max_size: Maximum file size in KB start: Start index for pagination limit: Maximum number of results to return Returns: Dict: Search results
Parameters:
search_string: Type: strsearch_scope: Type: strsearch_location: Type: Optional[str]min_size: Type: Optional[int]max_size: Type: Optional[int]start: Type: intlimit: Type: str
Returns: Returns Dict
search_metadata(self, search_string, search_scope, attributes, search_location) -> Dict
Purpose: Search for files based on metadata attributes. Args: search_string: String to search for (use '**' for all files) search_scope: Search scope (0=All, 1=Paths, 2=Content, 3=Metadata) attributes: List of attribute dictionaries, each containing one of these formats: - Format 1: {'id': 'attr_id', 'value': 'attr_value', 'type': type_int} - Format 2: {'setid': 'set_id', 'attributeid': 'attr_id', 'operator': 'equals', 'value': 'attr_value'} search_location: Optional path to limit the search to Returns: Dict: Search results with paths Example: # Search for files with multiple metadata criteria client.search_metadata( attributes=[ {"id": "abc123", "value": "Important", "type": 1}, {"id": "def456", "value": "2023", "type": 2} ], search_location="/user/documents" )
Parameters:
search_string: Type: strsearch_scope: Type: strattributes: Type: Optional[List[Dict[str, Any]]]search_location: Type: Optional[str]
Returns: Returns Dict
search_metadata_single(self, search_string, search_scope, attribute_id, attribute_type, attribute_value) -> Dict
Purpose: Search for files based on a single metadata attribute. This method is kept for backward compatibility. The search_metadata method is preferred. Args: search_string: String to search for (use '**' for all files) search_scope: Search scope (0=All, 1=Paths, 2=Content, 3=Metadata) attribute_id: Metadata attribute ID to search for attribute_type: Metadata attribute type (1=Text, 2=Integer, etc.) attribute_value: Metadata attribute value to match Returns: Dict: Search results with paths
Parameters:
search_string: Type: strsearch_scope: Type: strattribute_id: Type: Optional[str]attribute_type: Type: Optional[int]attribute_value: Type: Optional[str]
Returns: Returns Dict
get_metadata_values(self, file_path) -> Dict
Purpose: Get metadata values for a specific file. Args: file_path: Path to the file Returns: Dict: Metadata values for the file with support for multiple sets
Parameters:
file_path: Type: str
Returns: Returns Dict
save_attribute_values(self, file_path, set_id, attribute_values) -> Dict
Purpose: Save metadata attribute values for a file or folder. Args: file_path: Full path to the file or folder set_id: ID of the metadata set attribute_values: Dictionary of attribute IDs and values Returns: Dict: Response from the server
Parameters:
file_path: Type: strset_id: Type: strattribute_values: Type: Dict[str, str]
Returns: Returns Dict
get_file_info(self, file_path) -> Dict
Purpose: Get information about a file. Args: file_path: Full path to the file Returns: Dict: File information
Parameters:
file_path: Type: str
Returns: Returns Dict
create_directory_tree(self, base_path, target_path) -> Dict
Purpose: Create a directory tree, creating all necessary parent directories. Args: base_path: Base path where to create the directories target_path: Path structure to create (relative to base_path) Returns: Dict: Response from the server
Parameters:
base_path: Type: strtarget_path: Type: str
Returns: Returns Dict
add_acl_entry(self, path, entry_type, value, permissions, flag) -> Dict
Purpose: Add an Access Control List (ACL) entry to a file or folder. Args: path: Path to the file or folder entry_type: Type of entry ('user' or 'group') value: Username or group name permissions: Permission string (e.g., 'R', 'W', 'D', 'S') flag: Optional flag parameter - 'allow', 'deny' Returns: Dict: Response from the server
Parameters:
path: Type: strentry_type: Type: strvalue: Type: strpermissions: Type: strflag: Type: Optional[str]
Returns: Returns Dict
delete_acl_entry(self, path, entry_type, value) -> Dict
Purpose: Delete an Access Control List (ACL) entry from a file or folder. Args: path: Path to the file or folder entry_type: Type of entry ('user' or 'group') value: Username or group name Returns: Dict: Response from the server
Parameters:
path: Type: strentry_type: Type: strvalue: Type: str
Returns: Returns Dict
get_acl(self, path, filter_term, list_inherited) -> Dict
Purpose: Get Access Control List (ACL) for a specific path. Args: path: Path to the file or folder filter_term: Optional filter to apply to the results list_inherited: Whether to include inherited permissions Returns: Dict: ACL information
Parameters:
path: Type: strfilter_term: Type: Optional[str]list_inherited: Type: bool
Returns: Returns Dict
get_effective_acl(self, path, emailid) -> Dict
Purpose: Get effective ACL permissions for a user on a specific path. Used to check for effective permissions for a user for a particular shared path such as read, write, delete, share or manage permissions. Args: path: Path to the file or folder emailid: Optional username (emailid) to check permissions for (if None, checks for current user) Returns: Dict: Effective ACL information showing what permissions the user has
Parameters:
path: Type: stremailid: Type: Optional[str]
Returns: Returns Dict
get_acls(self, filter_text, start, limit) -> Dict
Purpose: Get all Access Control Lists (ACLs) in the system. Args: filter_text: Optional text to filter results start: Start index for pagination limit: Maximum number of results to return Returns: Dict: List of ACLs
Parameters:
filter_text: Type: Optional[str]start: Type: intlimit: Type: int
Returns: Returns Dict
get_all_acls_for_path(self, path) -> Dict
Purpose: Get all Access Control Lists (ACLs) for a specific path. Args: path: Path to the file or folder Returns: Dict: All ACL information for the path
Parameters:
path: Type: str
Returns: Returns Dict
add_user_to_share(self, user_id, share_id) -> Dict
Purpose: Add a user to a private share. Args: user_id: ID or username of the user to add share_id: ID of the share Returns: Dict: Response from the server
Parameters:
user_id: Type: strshare_id: Type: str
Returns: Returns Dict
add_users_to_share(self, share_id, users, send_email, custom_subject, custom_message, send_copy) -> Dict
Purpose: Add multiple users to a share. Args: share_id: ID of the share users: List of usernames or email addresses to add to the share send_email: Whether to send notification emails to the users custom_subject: Custom subject for the notification email custom_message: Custom message for the notification email send_copy: Whether to send a copy of the notification to the sharer Returns: Dict: Response from the server
Parameters:
share_id: Type: strusers: Type: List[str]send_email: Type: boolcustom_subject: Type: Optional[str]custom_message: Type: Optional[str]send_copy: Type: bool
Returns: Returns Dict
add_group_to_share(self, group_id, share_id) -> Dict
Purpose: Add a group to a private share. Args: group_id: ID of the group to add share_id: ID of the share Returns: Dict: Response from the server
Parameters:
group_id: Type: strshare_id: Type: str
Returns: Returns Dict
delete_user_from_share(self, user_id, share_id) -> Dict
Purpose: Remove a user from a private share. Args: user_id: ID or username of the user to remove share_id: ID of the share Returns: Dict: Response from the server
Parameters:
user_id: Type: strshare_id: Type: str
Returns: Returns Dict
delete_group_from_share(self, group_id, share_id) -> Dict
Purpose: Remove a group from a private share. Args: group_id: ID of the group to remove share_id: ID of the share Returns: Dict: Response from the server
Parameters:
group_id: Type: strshare_id: Type: str
Returns: Returns Dict
get_share_for_path(self, path, share_id) -> Dict
Purpose: Get information about a share for a specific path. Args: path: Path to the shared file or folder share_id: Optional share ID to filter by Returns: Dict: Share information
Parameters:
path: Type: strshare_id: Type: Optional[str]
Returns: Returns Dict
get_share_for_id(self, share_id) -> Dict
Purpose: Get information about a share using its ID. Args: share_id: ID of the share Returns: Dict: Share information
Parameters:
share_id: Type: str
Returns: Returns Dict
update_share(self, share_id) -> Dict
Purpose: Update share information and settings. Args: share_id: ID of the share to update **kwargs: Parameters to update, which can include: - sharename: Name of the share - sharelocation: Location of the shared file/folder - validityperiod: Validity period in days - expirytimestamp: Expiry timestamp - sharesizelimit: Share size limit - maxdownloads: Maximum number of downloads - hidenotification: Hide notification (1 or 0) - sharepassword: Password for protected share - allowpublicaccess: Allow public access (1 or 0) - allowpublicupload: Allow public upload (1 or 0) - allowpublicviewonly: Allow public view only (1 or 0) - allowpublicuploadonly: Allow public upload only (1 or 0) - newshareowner: New owner of the share - defaultfile: Default file to open when accessing the share Returns: Dict: Response from the server
Parameters:
share_id: Type: str
Returns: Returns Dict
add_share(self, share_location, share_name, share_size_limit, max_downloads, validity_period, expiry_timestamp) -> Dict
Purpose: Create a new share. Args: share_location: Path to the file/folder to share share_name: Name for the share share_size_limit: Size limit for the share in bytes max_downloads: Maximum number of downloads allowed validity_period: Validity period in days expiry_timestamp: Expiry timestamp Returns: Dict: Response from the server with the new share ID
Parameters:
share_location: Type: strshare_name: Type: strshare_size_limit: Type: Optional[int]max_downloads: Type: Optional[int]validity_period: Type: Optional[int]expiry_timestamp: Type: Optional[int]
Returns: Returns Dict
update_share_link(self, share_id, old_share_link, new_share_link, notify_users) -> Dict
Purpose: Update the URL of a share. Args: share_id: ID of the share old_share_link: Current share link new_share_link: New share link to set notify_users: Whether to notify users about the link change Returns: Dict: Response from the server
Parameters:
share_id: Type: strold_share_link: Type: strnew_share_link: Type: strnotify_users: Type: bool
Returns: Returns Dict
get_shared_link(self, share_location) -> Dict
Purpose: Get a share link for a specific location. Args: share_location: Path to the shared file or folder Returns: Dict: Share link information
Parameters:
share_location: Type: str
Returns: Returns Dict
set_user_access_for_share(self, share_id, user_id, download, write, share, sync, allow_manage, disallow_delete) -> Dict
Purpose: Set permissions for a user on a shared file/folder. Args: share_id: ID of the share user_id: ID or username of the user download: Whether the user can download write: Whether the user can write/edit share: Whether the user can share sync: Whether the user can sync allow_manage: Whether the user can manage the share disallow_delete: Whether to disallow deletion Returns: Dict: Response from the server
Parameters:
share_id: Type: struser_id: Type: strdownload: Type: Optional[bool]write: Type: Optional[bool]share: Type: Optional[bool]sync: Type: Optional[bool]allow_manage: Type: Optional[bool]disallow_delete: Type: Optional[bool]
Returns: Returns Dict
set_group_access_for_share(self, share_id, group_id, download, write, share, sync, allow_manage, disallow_delete) -> Dict
Purpose: Set permissions for a group on a shared file/folder. Args: share_id: ID of the share group_id: ID of the group download: Whether the group can download write: Whether the group can write/edit share: Whether the group can share sync: Whether the group can sync allow_manage: Whether the group can manage the share disallow_delete: Whether to disallow deletion Returns: Dict: Response from the server
Parameters:
share_id: Type: strgroup_id: Type: strdownload: Type: Optional[bool]write: Type: Optional[bool]share: Type: Optional[bool]sync: Type: Optional[bool]allow_manage: Type: Optional[bool]disallow_delete: Type: Optional[bool]
Returns: Returns Dict
delete_share(self, share_id) -> Dict
Purpose: Delete a share. Args: share_id: ID of the share to delete Returns: Dict: Response from the server
Parameters:
share_id: Type: str
Returns: Returns Dict
get_users_for_share(self, share_id) -> Dict
Purpose: Get a list of users that are added explicitly to a share. Args: share_id: ID of the share Returns: Dict: Response containing the list of users with their access permissions
Parameters:
share_id: Type: str
Returns: Returns Dict
get_automation_workflows(self, include_disabled) -> Dict
Purpose: Retrieve existing automation workflows. Args: include_disabled: Whether to include disabled workflows in the results Returns: Dict: List of available workflows
Parameters:
include_disabled: Type: bool
Returns: Returns Dict
get_automation_workflow_details(self, workflow_id) -> Dict
Purpose: Retrieve details of a specific automation workflow. Args: workflow_id: ID of the workflow to retrieve Returns: Dict: Workflow details
Parameters:
workflow_id: Type: str
Returns: Returns Dict
update_automation_workflow(self, workflow_id, workflow_data) -> Dict
Purpose: Update an existing automation workflow. Args: workflow_id: ID of the workflow to update workflow_data: Dictionary containing the updated workflow configuration Returns: Dict: Response from the server
Parameters:
workflow_id: Type: strworkflow_data: Type: Dict
Returns: Returns Dict
create_automation_workflow(self, workflow_data) -> Dict
Purpose: Create a new automation workflow. Args: workflow_data: Dictionary containing the workflow configuration Returns: Dict: Response from the server with the new workflow ID
Parameters:
workflow_data: Type: Dict
Returns: Returns Dict
delete_automation_workflow(self, workflow_id) -> Dict
Purpose: Delete an automation workflow. Args: workflow_id: ID of the workflow to delete Returns: Dict: Response from the server
Parameters:
workflow_id: Type: str
Returns: Returns Dict
start_automation_workflow(self, workflow_id, path, file_names) -> Dict
Purpose: Start an automation workflow on specific files. Args: workflow_id: ID of the workflow to start path: Path where the files are located file_names: List of file names to process (if None, processes all files in the path) Returns: Dict: Response from the server
Parameters:
workflow_id: Type: strpath: Type: strfile_names: Type: Optional[List[str]]
Returns: Returns Dict
get_automation_workflow_runs(self, workflow_id, start, limit) -> Dict
Purpose: Get the history of automation workflow runs. Args: workflow_id: Optional ID to filter by specific workflow start: Starting index for pagination limit: Maximum number of results to return Returns: Dict: List of workflow runs
Parameters:
workflow_id: Type: Optional[str]start: Type: intlimit: Type: int
Returns: Returns Dict
get_automation_workflow_run_details(self, run_id) -> Dict
Purpose: Get details about a specific workflow run. Args: run_id: ID of the workflow run to get details for Returns: Dict: Detailed information about the workflow run
Parameters:
run_id: Type: str
Returns: Returns Dict
_prepare_workflow_data(self, workflow_data) -> Dict
Purpose: Helper method to prepare workflow data for API submission. Args: workflow_data: Dictionary containing workflow configuration Returns: Dict: Formatted parameters for API request
Parameters:
workflow_data: Type: Dict
Returns: Returns Dict
get_metadata_sets(self) -> Dict
Purpose: Get all available metadata sets. Returns: Dict: Raw response containing metadata sets information
Returns: Returns Dict
get_metadata_attributes(self, set_id) -> Dict
Purpose: Get attributes for a specific metadata set. Args: set_id: ID of the metadata set Returns: Dict: Raw response containing metadata attributes
Parameters:
set_id: Type: str
Returns: Returns Dict
get_metadata_values(self, file_path) -> Dict
Purpose: Get metadata values for a specific file. Args: file_path: Path to the file Returns: Dict: Metadata values for the file with support for multiple sets
Parameters:
file_path: Type: str
Returns: Returns Dict
save_attribute_values(self, file_path, set_id, attribute_values) -> Dict
Purpose: Save metadata attribute values for a file or folder. Args: file_path: Full path to the file or folder set_id: ID of the metadata set attribute_values: Dictionary of attribute IDs and values Returns: Dict: Response from the server
Parameters:
file_path: Type: strset_id: Type: strattribute_values: Type: Dict[str, str]
Returns: Returns Dict
add_set_to_file_object(self, file_path, set_id) -> Dict
Purpose: Add a metadata set to a file or folder. Args: file_path: Full path to the file or folder set_id: ID of the metadata set to add Returns: Dict: Response from the server
Parameters:
file_path: Type: strset_id: Type: str
Returns: Returns Dict
create_document_folder(self, folder_path, create_parents) -> Dict[str, Any]
Purpose: Create a folder for document storage. Args: folder_path: Folder path to create create_parents: Whether to create parent folders if they don't exist Returns: Response dictionary with status information
Parameters:
folder_path: Type: strcreate_parents: Type: bool
Returns: Returns Dict[str, Any]
upload_controlled_document(self, file_data, folder_path, filename, metadata, version_comment) -> Dict[str, Any]
Purpose: Upload a controlled document with special handling. Args: file_data: File content as bytes or file-like object folder_path: Target folder path filename: Filename to use metadata: Document metadata to attach version_comment: Comment for version history Returns: Response dictionary with uploaded file information
Parameters:
file_data: Type: Union[bytes, BinaryIO]folder_path: Type: strfilename: Type: strmetadata: Type: Dict[str, Any]version_comment: Type: str
Returns: Returns Dict[str, Any]
get_document_with_metadata(self, file_path) -> Dict[str, Any]
Purpose: Get document content and metadata in a single call. Args: file_path: Path to the document Returns: Dictionary with file content and metadata
Parameters:
file_path: Type: str
Returns: Returns Dict[str, Any]
set_file_metadata(self, file_path, metadata) -> Dict[str, Any]
Purpose: Set custom metadata for a file. Args: file_path: Path to the file metadata: Dictionary of metadata key-value pairs Returns: Response dictionary
Parameters:
file_path: Type: strmetadata: Type: Dict[str, Any]
Returns: Returns Dict[str, Any]
get_file_metadata(self, file_path) -> Dict[str, Any]
Purpose: Get custom metadata for a file. Args: file_path: Path to the file Returns: Response dictionary with metadata
Parameters:
file_path: Type: str
Returns: Returns Dict[str, Any]
get_file_versions(self, file_path) -> Dict[str, Any]
Purpose: Get version history for a file. Args: file_path: Path to the file Returns: Response dictionary with version history
Parameters:
file_path: Type: str
Returns: Returns Dict[str, Any]
restore_file_version(self, file_path, version_id) -> Dict[str, Any]
Purpose: Restore a previous version of a file. Args: file_path: Path to the file version_id: Version ID to restore Returns: Response dictionary
Parameters:
file_path: Type: strversion_id: Type: str
Returns: Returns Dict[str, Any]
set_document_permissions(self, file_path, users, groups) -> Dict[str, Any]
Purpose: Set document access permissions. Args: file_path: Path to the document users: Dictionary mapping usernames to permission lists groups: Dictionary mapping groups to permission lists Returns: Response dictionary
Parameters:
file_path: Type: strusers: Type: Dict[str, List[str]]groups: Type: Dict[str, List[str]]
Returns: Returns Dict[str, Any]
create_document_share(self, file_path, share_type, password, expiry_days, notify_emails) -> Dict[str, Any]
Purpose: Create a shareable link for a document. Args: file_path: Path to the document share_type: Type of share (view, download, upload, collaborate) password: Optional password protection expiry_days: Number of days until link expires notify_emails: List of emails to notify about the share Returns: Response dictionary with share link
Parameters:
file_path: Type: strshare_type: Type: strpassword: Type: Optional[str]expiry_days: Type: Optional[int]notify_emails: Type: Optional[List[str]]
Returns: Returns Dict[str, Any]
search_documents(self, search_text, folder_path, metadata, doc_type, max_results) -> Dict[str, Any]
Purpose: Search for documents using text and metadata. Args: search_text: Text to search for folder_path: Optional folder path to limit search metadata: Optional metadata criteria as key-value pairs doc_type: Optional document type to filter by max_results: Maximum number of results to return Returns: Response dictionary with search results
Parameters:
search_text: Type: strfolder_path: Type: Optional[str]metadata: Type: Optional[Dict[str, str]]doc_type: Type: Optional[str]max_results: Type: int
Returns: Returns Dict[str, Any]
start_document_workflow(self, file_path, workflow_name, workflow_data) -> Dict[str, Any]
Purpose: Start a workflow for a document. Args: file_path: Path to the document workflow_name: Name of the workflow to start workflow_data: Initial workflow data Returns: Response dictionary with workflow information
Parameters:
file_path: Type: strworkflow_name: Type: strworkflow_data: Type: Dict[str, Any]
Returns: Returns Dict[str, Any]
check_folder_exists(self, folder_path) -> bool
Purpose: Check if a folder exists. Args: folder_path: Path to check Returns: True if folder exists, False otherwise
Parameters:
folder_path: Type: str
Returns: Returns bool
check_file_exists(self, file_path) -> bool
Purpose: Check if a file exists. Args: file_path: Path to check Returns: True if file exists, False otherwise
Parameters:
file_path: Type: str
Returns: Returns bool
_format_metadata_for_api(self, metadata) -> str
Purpose: Format metadata dictionary for FileCloud API. Args: metadata: Dictionary of metadata Returns: Formatted metadata string
Parameters:
metadata: Type: Dict[str, Any]
Returns: Returns str
_parse_api_metadata(self, metadata_str) -> Dict[str, Any]
Purpose: Parse metadata string from FileCloud API. Args: metadata_str: Metadata string from API Returns: Dictionary of parsed metadata
Parameters:
metadata_str: Type: str
Returns: Returns Dict[str, Any]
_api_request(self, endpoint, data, params, files, method) -> dict
Purpose: Make a request to the FileCloud API. Args: endpoint: API endpoint to call (without server URL) data: Dictionary of form data to send params: Dictionary of URL parameters files: Dictionary of files to upload method: HTTP method (POST or GET) Returns: Dictionary with parsed response
Parameters:
endpoint: Type: strdata: Type: dictparams: Type: dictfiles: Type: dictmethod: Type: str
Returns: Returns dict
Required Imports
import requests
import os
import xmltodict
import time
import io
Usage Example
# Example usage:
# result = FileCloudAPI(bases)
Tags
Similar Components
AI-powered semantic similarity - components with related functionality:
-
class FileCloudAPI 98.7% similar
-
class FileCloudClient_v1 71.8% similar
-
class FileCloudClient 70.0% similar
-
function get_filecloud_client 54.7% similar
-
class FileCloudIntegration 54.0% similar