class FileCloudAPI
Python wrapper for the FileCloud REST API. This class provides methods to interact with FileCloud server APIs, handling authentication, session management, and various file operations.
/tf/active/vicechatdev/FC_api copy.py
16 - 2165
moderate
Purpose
Python wrapper for the FileCloud REST API. This class provides methods to interact with FileCloud server APIs, handling authentication, session management, and various file operations.
Source Code
class FileCloudAPI:
"""
Python wrapper for the FileCloud REST API.
This class provides methods to interact with FileCloud server APIs,
handling authentication, session management, and various file operations.
"""
def __init__(self, server_url: str, username: str = None, password: str = None):
"""
Initialize the FileCloud API client.
Args:
server_url: The URL of the FileCloud server (e.g., 'https://filecloud.example.com/')
username: Username for authentication (optional if authenticating later)
password: Password for authentication (optional if authenticating later)
"""
self.server_url = server_url.rstrip('/')
self.username = username
self.password = password
self.session = requests.session()
self.authenticated = False
self.authenticated_admin = False
self.headers = {'Accept': 'application/json'}
def login(self, username: str = None, password: str = None) -> bool:
"""
Authenticate with the FileCloud server.
Args:
username: Override the username provided during initialization
password: Override the password provided during initialization
Returns:
bool: True if authentication is successful, False otherwise.
"""
if username:
self.username = username
if password:
self.password = password
if not self.username or not self.password:
raise ValueError("Username and password are required for authentication")
login_endpoint = '/core/loginguest'
credentials = {'userid': self.username, 'password': self.password}
try:
response = self.session.post(
f"{self.server_url}{login_endpoint}",
data=credentials,
headers=self.headers
)
login_call = response.json()
if login_call['command'][0]['result'] == 1:
self.authenticated = True
self.authenticated_admin = False
return True
else:
self.authenticated = False
error_message = login_call['command'][0].get('message', 'Unknown error')
print(f"Login failed: {error_message}")
return False
except Exception as e:
print(f"Login error: {str(e)}")
self.authenticated = False
return False
def adminlogin(self, username: str = None, password: str = None) -> bool:
"""
Authenticate with the FileCloud server.
Args:
username: Override the username provided during initialization
password: Override the password provided during initialization
Returns:
bool: True if authentication is successful, False otherwise.
"""
if username:
self.username = username
if password:
self.password = password
if not self.username or not self.password:
raise ValueError("Username and password are required for authentication")
login_endpoint = '/admin/adminlogin'
credentials = {'adminuser': self.username, 'adminpassword': self.password}
try:
response = self.session.post(
f"{self.server_url}{login_endpoint}",
data=credentials,
headers=self.headers
)
login_call = response.json()
if login_call['command'][0]['result'] == 1:
self.authenticated_admin = True
self.authenticated = False
return True
else:
self.authenticated_admin = False
error_message = login_call['command'][0].get('message', 'Unknown error')
print(f"Login failed: {error_message}")
return False
except Exception as e:
print(f"Login error: {str(e)}")
self.authenticated_admin = False
return False
def authenticate(self, username: str = None, password: str = None) -> bool:
"""Alias for login() for backward compatibility"""
return self.login(username, password)
def logout(self) -> Dict:
"""
Logout from the FileCloud server.
Returns:
Dict: Response from the server.
"""
if not self.authenticated:
return {"success": True, "message": "Not logged in"}
logout_endpoint = '/core/locksession'
response = self.session.post(
f"{self.server_url}{logout_endpoint}",
cookies=self.session.cookies
)
self.authenticated = False
self.session.cookies.clear()
return self._parse_response(response)
def _ensure_authenticated(self):
"""
Ensure that the client is authenticated with the server.
Attempts to login if not already authenticated.
Raises:
Exception: If authentication fails.
"""
if not self.authenticated:
if not self.login():
raise Exception("Authentication failed. Please check your credentials.")
def _ensure_authenticated_admin(self):
"""
Ensure that the client is authenticated with the server.
Attempts to login if not already authenticated.
Raises:
Exception: If authentication fails.
"""
if not self.authenticated_admin:
if not self.adminlogin():
raise Exception("Authentication failed. Please check your credentials.")
def _parse_response(self, response: requests.Response) -> Dict:
"""
Parse response based on content type.
Args:
response: The HTTP response object.
Returns:
Dict: Parsed response as a dictionary.
"""
if response.status_code != 200:
return {
"success": False,
"message": f"HTTP error {response.status_code}: {response.reason}"
}
if response.text == 'OK':
return {"success": True, "message": "Operation completed successfully"}
content_type = response.headers.get('Content-Type', '')
try:
# Handle JSON response
if 'application/json' in content_type:
return response.json()
# Handle XML response - needed for FileCloud APIs that return XML
elif 'text/xml' in content_type or 'application/xml' in content_type or response.text.strip().startswith('<?xml') or '<' in response.text:
try:
# Parse XML
root = ET.fromstring(response.text)
# Check for error response
if root.tag == 'error':
return {
"success": False,
"message": root.text if root.text else "Unknown error"
}
# Handle workflow list response format
if root.tag == 'workflows':
meta = root.find('meta')
total = int(meta.find('total').text) if meta and meta.find('total') is not None else 0
workflow_elements = root.findall('workflow')
workflows = []
for workflow_elem in workflow_elements:
workflow = {}
for child in workflow_elem:
# Remove leading underscore from tag names (like _id)
tag_name = child.tag[1:] if child.tag.startswith('_') else child.tag
workflow[tag_name] = child.text
workflows.append(workflow)
return {
"success": True,
"data": {
"workflows": {
"workflow": workflows
},
"meta": {
"total": total,
"count": len(workflows)
}
}
}
# Handle workflow detail response
elif root.tag == 'workflow':
workflow = {}
for child in root:
# Remove leading underscore from tag names (like _id)
tag_name = child.tag[1:] if child.tag.startswith('_') else child.tag
workflow[tag_name] = child.text
return {
"success": True,
"data": {
"workflow": workflow
}
}
# Handle metadata sets response
elif root.tag == 'sets':
sets_list = []
for set_elem in root.findall('set'):
set_dict = {}
for child in set_elem:
# Remove leading underscore from tag names
tag_name = child.tag[1:] if child.tag.startswith('_') else child.tag
set_dict[tag_name] = child.text
sets_list.append(set_dict)
return {
"success": True,
"data": {
"sets": sets_list
}
}
# Handle metadata attributes response
elif root.tag == 'attributes':
attrs_list = []
for attr_elem in root.findall('attribute'):
attr_dict = {}
for child in attr_elem:
# Remove leading underscore from tag names
tag_name = child.tag[1:] if child.tag.startswith('_') else child.tag
# Handle options list
if tag_name == 'options' and len(child) > 0:
options = []
for option in child.findall('option'):
options.append(option.text)
attr_dict[tag_name] = options
else:
attr_dict[tag_name] = child.text
attrs_list.append(attr_dict)
return {
"success": True,
"data": {
"attributes": attrs_list
}
}
# Handle workflow run list response
elif root.tag == 'runs':
meta = root.find('meta')
total = int(meta.find('total').text) if meta and meta.find('total') is not None else 0
run_elements = root.findall('run')
runs = []
for run_elem in run_elements:
run = {}
for child in run_elem:
tag_name = child.tag[1:] if child.tag.startswith('_') else child.tag
run[tag_name] = child.text
runs.append(run)
return {
"success": True,
"data": {
"runs": runs,
"meta": {
"total": total,
"count": len(runs)
}
}
}
# Handle workflow run detail response
elif root.tag == 'run':
run = {}
for child in root:
tag_name = child.tag[1:] if child.tag.startswith('_') else child.tag
run[tag_name] = child.text
return {
"success": True,
"data": {
"run": run
}
}
# Check for command structure (common in FileCloud API responses)
command_elem = root.find('command')
if command_elem is not None:
cmd_type = command_elem.find('type')
cmd_result = command_elem.find('result')
cmd_message = command_elem.find('message')
result = {
"success": cmd_result is not None and cmd_result.text == "1",
"command_type": cmd_type.text if cmd_type is not None else None,
"message": cmd_message.text if cmd_message is not None else None,
"data": {}
}
return result
# Generic XML to dict conversion as fallback
return {
"success": True,
"xml_root_tag": root.tag,
"data": {root.tag: self._xml_to_dict(root)}
}
except ET.ParseError as xml_err:
return {
"success": False,
"message": f"Failed to parse XML response: {str(xml_err)}",
"raw_content": response.text[:500] # Include first 500 chars for debugging
}
# Try to parse as JSON regardless of content type
elif response.text.strip().startswith('{') or response.text.strip().startswith('['):
return response.json()
# Return raw text for unrecognized formats
else:
return {
"success": False,
"message": "Unrecognized response format",
"content_type": content_type,
"raw_content": response.text[:500] # Include first 500 chars for debugging
}
except json.JSONDecodeError as e:
return {
"success": False,
"message": f"Failed to parse response as JSON: {str(e)}",
"content_type": content_type,
"raw_content": response.text[:500] # Include first 500 chars for debugging
}
except Exception as e:
return {
"success": False,
"message": f"Error processing response: {str(e)}",
"content_type": content_type,
"raw_content": response.text[:500] # Include first 500 chars for debugging
}
def _xml_to_dict(self, element):
"""
Convert an XML element to a dictionary.
Args:
element: XML element to convert
Returns:
Dict or str: Converted element
"""
result = {}
# Handle attributes
for key, value in element.attrib.items():
result[f"@{key}"] = value
# Handle children
for child in element:
child_tag = child.tag
# Remove leading underscore from tag names (like _id)
if child_tag.startswith('_'):
child_tag = child_tag[1:]
if len(child) == 0 and not child.attrib:
# Simple text node
if child.text is None:
result[child_tag] = ''
else:
result[child_tag] = child.text.strip()
else:
# Complex node
child_dict = self._xml_to_dict(child)
if child_tag in result:
# If key already exists, convert to list if not already
if not isinstance(result[child_tag], list):
result[child_tag] = [result[child_tag]]
result[child_tag].append(child_dict)
else:
result[child_tag] = child_dict
# Handle element text
if element.text and element.text.strip() and not result:
return element.text.strip()
return result
def _extract_paths_from_xml(self, xml_content: str) -> List[str]:
"""
Extract file paths from XML search results.
Args:
xml_content: XML content from search results
Returns:
List[str]: List of file paths
"""
paths = []
try:
root = ET.fromstring(xml_content)
for entry in root.findall('.//entry'):
path_elem = entry.find('path')
if path_elem is not None and path_elem.text:
paths.append(path_elem.text)
except Exception as e:
print(f"Error parsing XML: {str(e)}")
return paths
# File Operations
def upload_file(self, local_file_path: str, remote_path: str,
filename: Optional[str] = None, overwrite: bool = False) -> Dict:
"""
Upload a file to the FileCloud server.
Args:
local_file_path: Path to the local file to upload
remote_path: Directory path on the server where to upload the file
filename: Optional name to use for the file on the server (default: original filename)
overwrite: Whether to overwrite existing files with the same name
Returns:
Dict: Response from the server
"""
self._ensure_authenticated()
upload_endpoint = '/core/upload'
if not filename:
filename = os.path.basename(local_file_path)
# Prepare upload parameters
upload_params = {
'appname': 'explorer',
'path': remote_path,
'offset': 0,
'complete': 1,
'filename': filename
}
if overwrite:
upload_params['overwrite'] = 1
try:
with open(local_file_path, 'rb') as file_obj:
files = {'file': (filename, file_obj)}
response = self.session.post(
f"{self.server_url}{upload_endpoint}",
params=upload_params,
files=files,
cookies=self.session.cookies
)
return self._parse_response(response)
except Exception as e:
return {"success": False, "message": f"Upload failed: {str(e)}"}
def download_file(self, remote_file_path: str, local_file_path: Optional[str] = None,
check_only: bool = False) -> Union[Dict, bytes]:
"""
Download a file from the FileCloud server.
Args:
remote_file_path: Full path to the file on the server
local_file_path: Path where to save the downloaded file (if None, returns content)
check_only: If True, only checks if the file is downloadable without downloading
Returns:
Union[Dict, bytes]: Either a response dict or the file content as bytes
"""
self._ensure_authenticated()
download_endpoint = '/core/downloadfile'
# Extract filename from the path
filename = os.path.basename(remote_file_path)
download_params = {
'filepath': remote_file_path,
'filename': filename
}
if check_only:
download_params['checkonly'] = 1
try:
response = self.session.get(
f"{self.server_url}{download_endpoint}",
params=download_params,
cookies=self.session.cookies,
stream=True # Use streaming for large files
)
if response.status_code != 200:
return {"success": False, "message": f"Download failed with status {response.status_code}"}
# If check_only, just return success
if check_only:
if response.text == 'OK':
return {"success": True, "message": "File can be downloaded"}
return {"success": False, "message": response.text}
# If local_file_path is provided, save the file
if local_file_path:
with open(local_file_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
return {"success": True, "message": f"File saved to {local_file_path}"}
# Otherwise, return the content
return response.content
except Exception as e:
return {"success": False, "message": f"Download failed: {str(e)}"}
def get_file_list(self, path: str, sort_by: str = "name", sort_dir: int = 1,
start: int = 0, limit: int = -1, include_metadata: bool = False) -> Dict:
"""
Get a list of files and directories in the specified path.
Args:
path: Path to list files from
sort_by: Field to sort by ("name", "date", or "size")
sort_dir: Sort direction (1 for ascending, -1 for descending)
start: Index to start from (for pagination)
limit: Maximum number of entries to return (-1 for unlimited)
include_metadata: Whether to include metadata information
Returns:
Dict: Response containing the file list
"""
self._ensure_authenticated()
list_endpoint = '/core/getfilelist'
params = {
'path': path,
'sortby': sort_by,
'sortdir': sort_dir,
'start': start,
'limit': limit
}
if include_metadata:
params['sendmetadatasetinfo'] = 1
try:
response = self.session.post(
f"{self.server_url}{list_endpoint}",
params=params,
cookies=self.session.cookies
)
result= self._parse_response(response)
if result.get("success") and response.text.strip().startswith('<'):
result["paths"] = self._extract_paths_from_xml(response.text)
return result
except Exception as e:
return {"success": False, "message": f"Failed to get file list: {str(e)}"}
def create_folder(self, path: str, folder_name: Optional[str] = None,
subpath: Optional[str] = None) -> Dict:
"""
Create a new folder on the server.
Args:
path: Path where to create the folder
folder_name: Name of the folder to create
subpath: Alternative to folder_name, creates all missing folders in this path
Returns:
Dict: Response from the server
"""
self._ensure_authenticated()
create_folder_endpoint = '/core/createfolder'
params = {'path': path}
if folder_name:
params['name'] = folder_name
if subpath:
params['subpath'] = subpath
try:
response = self.session.post(
f"{self.server_url}{create_folder_endpoint}",
params=params,
cookies=self.session.cookies
)
return self._parse_response(response)
except Exception as e:
return {"success": False, "message": f"Failed to create folder: {str(e)}"}
def delete_file(self, path: str, name: str) -> Dict:
"""
Delete a file or folder.
Args:
path: Path where the file/folder is located
name: Name of the file/folder to delete
Returns:
Dict: Response from the server
"""
self._ensure_authenticated()
delete_endpoint = '/core/deletefile'
params = {
'path': path,
'name': name
}
try:
response = self.session.post(
f"{self.server_url}{delete_endpoint}",
params=params,
cookies=self.session.cookies
)
return self._parse_response(response)
except Exception as e:
return {"success": False, "message": f"Failed to delete file: {str(e)}"}
def rename_or_move(self, from_path: str, to_path: str, overwrite: bool = False) -> Dict:
"""
Rename or move a file or folder.
Args:
from_path: Full path to the source file/folder
to_path: Full path to the destination
overwrite: Whether to overwrite if destination exists
Returns:
Dict: Response from the server
"""
self._ensure_authenticated()
move_endpoint = '/core/renameormove'
params = {
'fromname': from_path,
'toname': to_path
}
if overwrite:
params['overwrite'] = 1
try:
response = self.session.post(
f"{self.server_url}{move_endpoint}",
params=params,
cookies=self.session.cookies
)
return self._parse_response(response)
except Exception as e:
return {"success": False, "message": f"Failed to rename or move: {str(e)}"}
def copy_file(self, path: str, name: str, copy_to: str, overwrite: bool = False) -> Dict:
"""
Copy a file to a new location.
Args:
path: Path where the file is located
name: Name of the file to copy
copy_to: Full path to the destination
overwrite: Whether to overwrite if destination exists
Returns:
Dict: Response from the server
"""
self._ensure_authenticated()
copy_endpoint = '/core/copyfile'
params = {
'path': path,
'name': name,
'copyto': copy_to
}
if overwrite:
params['overwrite'] = 1
try:
response = self.session.post(
f"{self.server_url}{copy_endpoint}",
params=params,
cookies=self.session.cookies
)
return self._parse_response(response)
except Exception as e:
return {"success": False, "message": f"Failed to copy file: {str(e)}"}
# Search and Metadata Operations
def search(self, search_string: str, search_scope: str = '0', search_location: Optional[str] = None,
min_size: Optional[int] = None, max_size: Optional[int] = None,
start: int = 0, limit: int = 10) -> Dict:
"""
Search for files and folders.
Args:
search_string: String to search for
search_scope: Search scope (0=All, 1=Paths, 2=Content, 3=Metadata)
search_location: Path to search within (None for all accessible locations)
min_size: Minimum file size in KB
max_size: Maximum file size in KB
start: Start index for pagination
limit: Maximum number of results to return
Returns:
Dict: Search results
"""
self._ensure_authenticated()
search_endpoint = '/core/dosearch'
params = {
'searchstring': search_string,
'searchscope': search_scope,
'start': start,
'limit': limit
}
if search_location:
params['searchloc'] = search_location
if min_size is not None:
params['minsize'] = min_size
if max_size is not None:
params['maxsize'] = max_size
try:
response = self.session.post(
f"{self.server_url}{search_endpoint}",
params=params,
cookies=self.session.cookies
)
result = self._parse_response(response)
# Extract paths if it's a successful XML response
if result.get("success") and response.text.strip().startswith('<'):
result["paths"] = self._extract_paths_from_xml(response.text)
return result
except Exception as e:
return {"success": False, "message": f"Search failed: {str(e)}"}
def search_metadata(self, search_string: str = "**", search_scope: str = '3',
attributes: Optional[List[Dict[str, Any]]] = None,
search_location: Optional[str] = None) -> Dict:
"""
Search for files based on metadata attributes.
Args:
search_string: String to search for (use '**' for all files)
search_scope: Search scope (0=All, 1=Paths, 2=Content, 3=Metadata)
attributes: List of attribute dictionaries, each containing one of these formats:
- Format 1: {'id': 'attr_id', 'value': 'attr_value', 'type': type_int}
- Format 2: {'setid': 'set_id', 'attributeid': 'attr_id', 'operator': 'equals', 'value': 'attr_value'}
search_location: Optional path to limit the search to
Returns:
Dict: Search results with paths
Example:
# Search for files with multiple metadata criteria
client.search_metadata(
attributes=[
{"id": "abc123", "value": "Important", "type": 1},
{"id": "def456", "value": "2023", "type": 2}
],
search_location="/user/documents"
)
"""
self._ensure_authenticated()
search_endpoint = '/core/search' # Using the correct endpoint according to API spec
params = {
'search': search_string,
'scope': search_scope
}
if search_location:
params['location'] = search_location
# Handle multiple metadata attributes
if attributes and len(attributes) > 0:
params['searchattributes'] = 1
params['attributescount'] = len(attributes)
for i, attr in enumerate(attributes, 1):
# Check which format the attribute is in
if 'id' in attr:
# Format 1: id, value, type
params[f'attributeid{i}'] = attr['id']
params[f'attributevalue{i}'] = attr['value']
if 'type' in attr:
params[f'attributetype{i}'] = attr['type']
elif 'attributeid' in attr:
# Format 2: setid, attributeid, operator, value
params[f'attributesetid{i}'] = attr.get('setid')
params[f'attributeid{i}'] = attr.get('attributeid')
params[f'attributeoperator{i}'] = attr.get('operator', 'equals')
params[f'attributevalue{i}'] = attr.get('value')
try:
response = self.session.post(
f"{self.server_url}{search_endpoint}",
data=params,
cookies=self.session.cookies
)
result = self._parse_response(response)
# Extract paths from search results if successful and XML response
if result.get('success', False) and hasattr(response, 'text') and response.text.strip().startswith('<'):
paths = self._extract_paths_from_xml(response.text)
result['paths'] = paths
return result
except Exception as e:
return {"success": False, "message": f"Failed to search metadata: {str(e)}"}
# Also keep the old method signature for backward compatibility
def search_metadata_single(self, search_string: str = "**", search_scope: str = '3',
attribute_id: Optional[str] = None, attribute_type: Optional[int] = None,
attribute_value: Optional[str] = None) -> Dict:
"""
Search for files based on a single metadata attribute.
This method is kept for backward compatibility. The search_metadata method is preferred.
Args:
search_string: String to search for (use '**' for all files)
search_scope: Search scope (0=All, 1=Paths, 2=Content, 3=Metadata)
attribute_id: Metadata attribute ID to search for
attribute_type: Metadata attribute type (1=Text, 2=Integer, etc.)
attribute_value: Metadata attribute value to match
Returns:
Dict: Search results with paths
"""
if attribute_id and attribute_value:
attr = {"id": attribute_id, "value": attribute_value}
if attribute_type:
attr["type"] = attribute_type
return self.search_metadata(search_string, search_scope, [attr])
else:
return self.search_metadata(search_string, search_scope)
def get_metadata_values(self, file_path: str) -> Dict:
"""
Get metadata values for a file or folder.
Args:
file_path: Full path to the file or folder
Returns:
Dict: Metadata values
"""
self._ensure_authenticated_admin()
metadata_endpoint = '/core/getmetadatavalues'
params = {
'fullpath': file_path
}
try:
response = self.session.post(
f"{self.server_url}{metadata_endpoint}",
params=params,
cookies=self.session.cookies
)
return self._parse_response(response)
except Exception as e:
return {"success": False, "message": f"Failed to get metadata: {str(e)}"}
def save_attribute_values(self, file_path: str, set_id: str,
attribute_values: Dict[str, str]) -> Dict:
"""
Save metadata attribute values for a file or folder.
Args:
file_path: Full path to the file or folder
set_id: ID of the metadata set
attribute_values: Dictionary of attribute IDs and values
Returns:
Dict: Response from the server
"""
self._ensure_authenticated()
save_endpoint = '/core/saveattributevalues'
params = {
'fullpath': file_path,
'setid': set_id,
'attributes_total': len(attribute_values)
}
# Add attribute values to params
for i, (attr_id, value) in enumerate(attribute_values.items()):
params[f'attribute{i}_attributeid'] = attr_id
params[f'attribute{i}_value'] = value
try:
response = self.session.post(
f"{self.server_url}{save_endpoint}",
params=params,
cookies=self.session.cookies
)
return self._parse_response(response)
except Exception as e:
return {"success": False, "message": f"Failed to save attributes: {str(e)}"}
def get_file_info(self, file_path: str) -> Dict:
"""
Get information about a file.
Args:
file_path: Full path to the file
Returns:
Dict: File information
"""
self._ensure_authenticated()
info_endpoint = '/core/fileinfo'
params = {
'file': file_path
}
try:
response = self.session.post(
f"{self.server_url}{info_endpoint}",
params=params,
cookies=self.session.cookies
)
return self._parse_response(response)
except Exception as e:
return {"success": False, "message": f"Failed to get file info: {str(e)}"}
# Advanced Directory Operations
def create_directory_tree(self, base_path: str, target_path: str) -> Dict:
"""
Create a directory tree, creating all necessary parent directories.
Args:
base_path: Base path where to create the directories
target_path: Path structure to create (relative to base_path)
Returns:
Dict: Response from the server
"""
self._ensure_authenticated()
path_elements = target_path.strip('/').split('/')
walking = True
cumul_path = ""
# Process each path element
for i, p in enumerate(path_elements):
if not p: # Skip empty elements
continue
if walking:
# Check if this path element exists
info_endpoint = '/core/getfilelist'
params = {
'path': f"{base_path}{cumul_path}"
}
try:
response = self.session.post(
f"{self.server_url}{info_endpoint}",
params=params,
cookies=self.session.cookies
)
parsed = xmltodict.parse(response.text)
found = False
# Check if the directory exists
try:
entries = parsed.get('entries', {}).get('entry', [])
if not isinstance(entries, list):
entries = [entries]
for e in entries:
if e.get('name') == p and e.get('type') == 'dir':
found = True
break
except:
pass
if found:
# Directory exists, move to next level
cumul_path = f"{cumul_path}/{p}"
else:
# Need to create this directory and possibly all subdirectories
create_endpoint = '/core/createfolder'
remaining_path = "/".join(path_elements[i:])
create_params = {
'path': f"{base_path}{cumul_path}",
'subpath': remaining_path
}
create_response = self.session.post(
f"{self.server_url}{create_endpoint}",
params=create_params,
cookies=self.session.cookies
)
walking = False # Stop walking since we've created all directories
return self._parse_response(create_response)
except Exception as e:
return {"success": False, "message": f"Failed to create directory tree: {str(e)}"}
# If we walked through the entire path, all directories exist
return {"success": True, "message": "All directories already exist"}
# Automation Workflow Operations
def get_automation_workflows(self, include_disabled: bool = False) -> Dict:
"""
Retrieve existing automation workflows.
Args:
include_disabled: Whether to include disabled workflows in the results
Returns:
Dict: List of available workflows
"""
self._ensure_authenticated()
workflows_endpoint = '/core/getautomationworkflows'
params = {}
if include_disabled:
params['include_disabled'] = 1
try:
# Make sure to use GET method
response = self.session.get(
f"{self.server_url}{workflows_endpoint}",
params=params,
cookies=self.session.cookies
)
return self._parse_response(response)
except Exception as e:
return {"success": False, "message": f"Failed to get automation workflows: {str(e)}"}
def get_automation_workflow_details(self, workflow_id: str) -> Dict:
"""
Retrieve details of a specific automation workflow.
Args:
workflow_id: ID of the workflow to retrieve
Returns:
Dict: Workflow details
"""
self._ensure_authenticated()
workflow_endpoint = '/core/getautomationworkflow'
params = {
'id': workflow_id
}
try:
# Use GET method
response = self.session.get(
f"{self.server_url}{workflow_endpoint}",
params=params,
cookies=self.session.cookies
)
return self._parse_response(response)
except Exception as e:
return {"success": False, "message": f"Failed to get workflow details: {str(e)}"}
def update_automation_workflow(self, workflow_id: str, workflow_data: Dict) -> Dict:
"""
Update an existing automation workflow.
Args:
workflow_id: ID of the workflow to update
workflow_data: Dictionary containing the updated workflow configuration
Returns:
Dict: Response from the server
"""
self._ensure_authenticated()
update_endpoint = '/core/updateautomationworkflow'
# Prepare the workflow data for submission
params = {
'id': workflow_id,
**self._prepare_workflow_data(workflow_data)
}
try:
# Keep using POST for update operations
response = self.session.post(
f"{self.server_url}{update_endpoint}",
data=params, # Use data instead of params for POST
cookies=self.session.cookies
)
return self._parse_response(response)
except Exception as e:
return {"success": False, "message": f"Failed to update workflow: {str(e)}"}
def create_automation_workflow(self, workflow_data: Dict) -> Dict:
"""
Create a new automation workflow.
Args:
workflow_data: Dictionary containing the workflow configuration
Returns:
Dict: Response from the server with the new workflow ID
"""
self._ensure_authenticated()
create_endpoint = '/core/createautomationworkflow'
# Prepare the workflow data for submission
params = self._prepare_workflow_data(workflow_data)
try:
# Use POST for create operations
response = self.session.post(
f"{self.server_url}{create_endpoint}",
data=params, # Use data for POST requests
cookies=self.session.cookies
)
return self._parse_response(response)
except Exception as e:
return {"success": False, "message": f"Failed to create workflow: {str(e)}"}
def delete_automation_workflow(self, workflow_id: str) -> Dict:
"""
Delete an automation workflow.
Args:
workflow_id: ID of the workflow to delete
Returns:
Dict: Response from the server
"""
self._ensure_authenticated()
delete_endpoint = '/core/deleteautomationworkflow'
params = {
'id': workflow_id
}
try:
response = self.session.post(
f"{self.server_url}{delete_endpoint}",
params=params,
cookies=self.session.cookies
)
return self._parse_response(response)
except Exception as e:
return {"success": False, "message": f"Failed to delete workflow: {str(e)}"}
def start_automation_workflow(self, workflow_id: str, path: str, file_names: Optional[List[str]] = None) -> Dict:
"""
Start an automation workflow on specific files.
Args:
workflow_id: ID of the workflow to start
path: Path where the files are located
file_names: List of file names to process (if None, processes all files in the path)
Returns:
Dict: Response from the server
"""
self._ensure_authenticated()
start_endpoint = '/core/startautomationworkflow'
params = {
'id': workflow_id,
'path': path
}
# If specific files are provided, add them to the request
if file_names and len(file_names) > 0:
params['count'] = len(file_names)
for i, name in enumerate(file_names, 1):
params[f'fn{i}'] = name
try:
response = self.session.post(
f"{self.server_url}{start_endpoint}",
params=params,
cookies=self.session.cookies
)
return self._parse_response(response)
except Exception as e:
return {"success": False, "message": f"Failed to start workflow: {str(e)}"}
def get_automation_workflow_runs(self, workflow_id: Optional[str] = None,
start: int = 0, limit: int = 100) -> Dict:
"""
Get the history of automation workflow runs.
Args:
workflow_id: Optional ID to filter by specific workflow
start: Starting index for pagination
limit: Maximum number of results to return
Returns:
Dict: List of workflow runs
"""
self._ensure_authenticated()
runs_endpoint = '/core/getautomationworkflowruns'
params = {
'start': start,
'limit': limit
}
if workflow_id:
params['id'] = workflow_id
try:
# Use GET method
response = self.session.get(
f"{self.server_url}{runs_endpoint}",
params=params,
cookies=self.session.cookies
)
return self._parse_response(response)
except Exception as e:
return {"success": False, "message": f"Failed to get workflow runs: {str(e)}"}
def get_automation_workflow_run_details(self, run_id: str) -> Dict:
"""
Get details about a specific workflow run.
Args:
run_id: ID of the workflow run to get details for
Returns:
Dict: Detailed information about the workflow run
"""
self._ensure_authenticated()
details_endpoint = '/core/getautomationworkflowrundetails'
params = {
'id': run_id
}
try:
# Use GET method
response = self.session.get(
f"{self.server_url}{details_endpoint}",
params=params,
cookies=self.session.cookies
)
return self._parse_response(response)
except Exception as e:
return {"success": False, "message": f"Failed to get workflow run details: {str(e)}"}
def _prepare_workflow_data(self, workflow_data: Dict) -> Dict:
"""
Helper method to prepare workflow data for API submission.
Args:
workflow_data: Dictionary containing workflow configuration
Returns:
Dict: Formatted parameters for API request
"""
params = {}
# Map common workflow properties
for key in ['name', 'description', 'enabled', 'trigger_type', 'path_filter', 'user_filter']:
if key in workflow_data:
params[key] = workflow_data[key]
# Handle conditions (if present)
if 'conditions' in workflow_data:
conditions = workflow_data['conditions']
params['conditions_total'] = len(conditions)
for i, condition in enumerate(conditions, 1): # Start index at 1
for condition_key, value in condition.items():
params[f'condition{i}_{condition_key}'] = value
# Handle actions (if present)
if 'actions' in workflow_data:
actions = workflow_data['actions']
params['actions_total'] = len(actions)
for i, action in enumerate(actions, 1): # Start index at 1
for action_key, value in action.items():
# Special handling for action parameters which may be nested
if action_key == 'parameters' and isinstance(value, dict):
for param_key, param_value in value.items():
# Special handling for attributes array
if param_key == 'attributes' and isinstance(param_value, list):
params[f'action{i}_parameter_{param_key}_total'] = len(param_value)
for j, attr in enumerate(param_value, 1): # Start index at 1
for attr_key, attr_val in attr.items():
params[f'action{i}_parameter_{param_key}{j}_{attr_key}'] = attr_val
# Special handling for users array
elif param_key == 'users' and isinstance(param_value, list):
params[f'action{i}_parameter_{param_key}'] = ','.join(param_value)
else:
params[f'action{i}_parameter_{param_key}'] = param_value
else:
params[f'action{i}_{action_key}'] = value
# Handle schedule (if present)
if 'schedule' in workflow_data:
schedule = workflow_data['schedule']
for schedule_key, value in schedule.items():
params[f'schedule_{schedule_key}'] = value
return params
def get_metadata_sets(self) -> Dict:
"""
Get all available metadata sets.
Returns:
Dict: List of metadata sets with success status and error details
"""
self._ensure_authenticated_admin()
try:
endpoint = '/admin/getmetadatasetdefinitions'
request_data = {
"end" : '100'
}
# Make API call
response = self._api_request(endpoint, data=request_data, method='POST')
# response = self.session.get(
# f"{self.server_url}/core/getavailablemetadatasets",
# cookies=self.session.cookies
#)
print("response", response)
# Include full response details for debugging
# result = self._parse_response(response)
# print("result", result)
# result['status_code'] = response.status_code
# Add raw content for debugging if not a success
#if response.status_code != 200:
# result['raw_response'] = response.text[:500] # First 500 chars
return response
except Exception as e:
return {
"success": False,
"message": f"Failed to get metadata sets: {str(e)}",
"exception": str(e)
}
def get_metadata_attributes(self, set_id: str) -> Dict:
"""
Get attributes for a specific metadata set.
Args:
set_id: ID of the metadata set
Returns:
Dict: List of attributes in the set
"""
self._ensure_authenticated_admin()
endpoint = '/admin/getmetadataset'
request_data = {
'setId': set_id
}
try:
response = self._api_request(endpoint, data=request_data, method='POST')
# response = self.session.get(
# f"{self.server_url}/admin/getmetadataset",
# params=params,
# cookies=self.session.cookies
# )
print("response", response)
return response
except Exception as e:
return {
"success": False,
"message": f"Failed to get metadata attributes: {str(e)}",
"exception": str(e)
}
def get_metadata_values(self, file_path: str) -> Dict:
"""
Get metadata values for a specific file.
Args:
file_path: Path to the file
Returns:
Dict: Metadata values for the file
"""
self._ensure_authenticated()
params = {
'path': file_path
}
try:
response = self.session.get(
f"{self.server_url}/core/getmetadatavalues",
params=params,
cookies=self.session.cookies
)
return self._parse_response(response)
except Exception as e:
return {"success": False, "message": f"Failed to get metadata values: {str(e)}"}
def save_attribute_values(self, file_path: str, set_id: str, attribute_values: Dict[str, str]) -> Dict:
"""
Save metadata values for a specific file.
Args:
file_path: Path to the file
set_id: ID of the metadata set
attribute_values: Dictionary of attribute IDs and values
Returns:
Dict: Response from the API
"""
self._ensure_authenticated()
params = {
'path': file_path,
'setid': set_id
}
# Add attribute values to params
for attr_id, value in attribute_values.items():
params[f'a_{attr_id}'] = value
try:
response = self.session.post(
f"{self.server_url}/core/saveattributevalues",
data=params,
cookies=self.session.cookies
)
return self._parse_response(response)
except Exception as e:
return {"success": False, "message": f"Failed to save metadata values: {str(e)}"}
# ==========================================
# Controlled Document System Extensions
# ==========================================
def create_document_folder(self, folder_path: str, create_parents: bool = True) -> Dict[str, Any]:
"""
Create a folder for document storage.
Args:
folder_path: Folder path to create
create_parents: Whether to create parent folders if they don't exist
Returns:
Response dictionary with status information
"""
if create_parents:
# Split path and create parent folders recursively
parts = folder_path.strip('/').split('/')
current_path = ""
for part in parts:
if part:
current_path += f"/{part}"
# Check if folder exists
folder_exists = self.check_folder_exists(current_path)
if not folder_exists:
# Create folder
self.create_folder(current_path)
return {"success": True, "path": folder_path}
else:
# Just create the specified folder
return self.create_folder(folder_path)
def upload_controlled_document(
self,
file_data: Union[bytes, BinaryIO],
folder_path: str,
filename: str,
metadata: Dict[str, Any] = None,
version_comment: str = None
) -> Dict[str, Any]:
"""
Upload a controlled document with special handling.
Args:
file_data: File content as bytes or file-like object
folder_path: Target folder path
filename: Filename to use
metadata: Document metadata to attach
version_comment: Comment for version history
Returns:
Response dictionary with uploaded file information
"""
# Ensure folder exists
self.create_document_folder(folder_path)
# Full path for file
full_path = f"{folder_path}/{filename}"
# Check if file already exists (for versioning)
file_exists = self.check_file_exists(full_path)
# Upload or update file
if file_exists:
result = self.update_file(full_path, file_data, version_comment)
else:
result = self.upload_file(folder_path, filename, file_data)
# Add metadata if provided
if metadata and result.get('success', False):
self.set_file_metadata(full_path, metadata)
# Add metadata to result
result['metadata'] = metadata
return result
def get_document_with_metadata(self, file_path: str) -> Dict[str, Any]:
"""
Get document content and metadata in a single call.
Args:
file_path: Path to the document
Returns:
Dictionary with file content and metadata
"""
# Get file content
content_result = self.download_file(file_path)
# Get metadata
metadata_result = self.get_file_metadata(file_path)
# Get version history
version_result = self.get_file_versions(file_path)
# Combine results
return {
"success": content_result.get('success', False),
"file_path": file_path,
"content": content_result.get('content'),
"metadata": metadata_result.get('metadata', {}),
"versions": version_result.get('versions', []),
"error": content_result.get('error') or metadata_result.get('error')
}
def set_file_metadata(self, file_path: str, metadata: Dict[str, Any]) -> Dict[str, Any]:
"""
Set custom metadata for a file.
Args:
file_path: Path to the file
metadata: Dictionary of metadata key-value pairs
Returns:
Response dictionary
"""
# Format metadata for FileCloud API
formatted_metadata = self._format_metadata_for_api(metadata)
# API endpoint
endpoint = "core/setCustomMetadata"
# Request data
request_data = {
"filepath": file_path,
"custommetadata": formatted_metadata
}
# Make API call
response = self._api_request(endpoint, params=request_data,method='POST')
return {
"success": self._is_success(response),
"file_path": file_path,
"metadata": metadata,
"error": self._get_error_message(response)
}
def get_file_metadata(self, file_path: str) -> Dict[str, Any]:
"""
Get custom metadata for a file.
Args:
file_path: Path to the file
Returns:
Response dictionary with metadata
"""
# API endpoint
endpoint = "core/getMetadata"
# Request data
request_data = {
"filepath": file_path
}
# Make API call
response = self._api_request(endpoint, params=request_data,method='POST')
# Extract metadata from response
metadata = {}
if self._is_success(response) and 'metadata' in response:
raw_metadata = response['metadata']
# Parse custom metadata
if 'custommetadata' in raw_metadata:
metadata = self._parse_api_metadata(raw_metadata['custommetadata'])
return {
"success": self._is_success(response),
"file_path": file_path,
"metadata": metadata,
"error": self._get_error_message(response)
}
def get_file_versions(self, file_path: str) -> Dict[str, Any]:
"""
Get version history for a file.
Args:
file_path: Path to the file
Returns:
Response dictionary with version history
"""
# API endpoint
endpoint = "core/getVersions"
# Request data
request_data = {
"filepath": file_path
}
# Make API call
response = self._api_request(endpoint, params=request_data,method='POST')
# Extract versions
versions = []
if self._is_success(response) and 'versioninfo' in response:
versions = response['versioninfo']
return {
"success": self._is_success(response),
"file_path": file_path,
"versions": versions,
"error": self._get_error_message(response)
}
def restore_file_version(self, file_path: str, version_id: str) -> Dict[str, Any]:
"""
Restore a previous version of a file.
Args:
file_path: Path to the file
version_id: Version ID to restore
Returns:
Response dictionary
"""
# API endpoint
endpoint = "core/restoreVersion"
# Request data
request_data = {
"filepath": file_path,
"versionid": version_id
}
# Make API call
response = self._api_request(endpoint, params=request_data,method='POST')
return {
"success": self._is_success(response),
"file_path": file_path,
"version_id": version_id,
"error": self._get_error_message(response)
}
def set_document_permissions(
self,
file_path: str,
users: Dict[str, List[str]] = None,
groups: Dict[str, List[str]] = None
) -> Dict[str, Any]:
"""
Set document access permissions.
Args:
file_path: Path to the document
users: Dictionary mapping usernames to permission lists
groups: Dictionary mapping groups to permission lists
Returns:
Response dictionary
"""
# Create permission structure expected by FileCloud
permissions = []
# Add user permissions
if users:
for username, perms in users.items():
permissions.append({
"type": "user",
"name": username,
"permissions": perms
})
# Add group permissions
if groups:
for groupname, perms in groups.items():
permissions.append({
"type": "group",
"name": groupname,
"permissions": perms
})
# API endpoint
endpoint = "core/setPermissions"
# Request data
request_data = {
"filepath": file_path,
"permissions": json.dumps(permissions)
}
# Make API call
response = self._api_request(endpoint, params=request_data,method='POST')
return {
"success": self._is_success(response),
"file_path": file_path,
"error": self._get_error_message(response)
}
def create_document_share(
self,
file_path: str,
share_type: str = "view",
password: Optional[str] = None,
expiry_days: Optional[int] = None,
notify_emails: Optional[List[str]] = None
) -> Dict[str, Any]:
"""
Create a shareable link for a document.
Args:
file_path: Path to the document
share_type: Type of share (view, download, upload, collaborate)
password: Optional password protection
expiry_days: Number of days until link expires
notify_emails: List of emails to notify about the share
Returns:
Response dictionary with share link
"""
# API endpoint
endpoint = "core/createShare"
# Request data
request_data = {
"filepath": file_path,
"type": share_type
}
# Add optional parameters
if password:
request_data["password"] = password
if expiry_days is not None:
# Convert to seconds
expiry_time = int(time.time()) + (expiry_days * 86400)
request_data["expiry"] = expiry_time
if notify_emails:
request_data["emails"] = ",".join(notify_emails)
# Make API call
response = self._api_request(endpoint, params=request_data,method='POST')
# Extract share URL
share_url = None
if self._is_success(response) and 'shareinfo' in response:
share_url = response['shareinfo'].get('url')
return {
"success": self._is_success(response),
"file_path": file_path,
"share_url": share_url,
"share_id": response.get('shareinfo', {}).get('id'),
"error": self._get_error_message(response)
}
def search_documents(
self,
search_text: str,
folder_path: Optional[str] = None,
metadata: Optional[Dict[str, str]] = None,
doc_type: Optional[str] = None,
max_results: int = 100
) -> Dict[str, Any]:
"""
Search for documents using text and metadata.
Args:
search_text: Text to search for
folder_path: Optional folder path to limit search
metadata: Optional metadata criteria as key-value pairs
doc_type: Optional document type to filter by
max_results: Maximum number of results to return
Returns:
Response dictionary with search results
"""
# API endpoint
endpoint = "core/searchFiles"
# Build query
query = search_text
# Add metadata constraints if provided
if metadata:
for key, value in metadata.items():
query += f" metadata:{key}:{value}"
# Add document type constraint if provided
if doc_type:
query += f" metadata:doc_type:{doc_type}"
# Request data
request_data = {
"searchtext": query,
"maxhits": max_results
}
# Add folder path constraint if provided
if folder_path:
request_data["folderpath"] = folder_path
# Make API call
response = self._api_request(endpoint, params=request_data,method='POST')
# Extract search results
results = []
if self._is_success(response) and 'searchresults' in response:
results = response['searchresults']
return {
"success": self._is_success(response),
"results": results,
"count": len(results),
"error": self._get_error_message(response)
}
def start_document_workflow(
self,
file_path: str,
workflow_name: str,
workflow_data: Dict[str, Any]
) -> Dict[str, Any]:
"""
Start a workflow for a document.
Args:
file_path: Path to the document
workflow_name: Name of the workflow to start
workflow_data: Initial workflow data
Returns:
Response dictionary with workflow information
"""
# API endpoint
endpoint = "workflow/startWorkflow"
# Request data
request_data = {
"filepath": file_path,
"workflowname": workflow_name,
"workflowdata": json.dumps(workflow_data)
}
# Make API call
response = self._api_request(endpoint, params=request_data,method='POST')
return {
"success": self._is_success(response),
"file_path": file_path,
"workflow_id": response.get('workflowid'),
"error": self._get_error_message(response)
}
def check_folder_exists(self, folder_path: str) -> bool:
"""
Check if a folder exists.
Args:
folder_path: Path to check
Returns:
True if folder exists, False otherwise
"""
# API endpoint
endpoint = '/core/fileexists'
# Request data
request_data = {
"file": folder_path
}
# Make API call
response = self._api_request(endpoint, params=request_data,method='POST')
print(("reponse", response))
# Check result
if response['success']:
return True
return False
def check_file_exists(self, file_path: str) -> bool:
"""
Check if a file exists.
Args:
file_path: Path to check
Returns:
True if file exists, False otherwise
"""
# API endpoint
endpoint = "core/fileexists"
# Request data
request_data = {
"file": file_path
}
# Make API call
response = self._api_request(endpoint, params=request_data,method='POST')
# Check result
if response['success']:
return True
return False
# ==========================================
# Helper Methods
# ==========================================
def _format_metadata_for_api(self, metadata: Dict[str, Any]) -> str:
"""
Format metadata dictionary for FileCloud API.
Args:
metadata: Dictionary of metadata
Returns:
Formatted metadata string
"""
formatted = []
for key, value in metadata.items():
# Convert non-string values to JSON strings
if not isinstance(value, str):
value = json.dumps(value)
formatted.append(f"{key}={value}")
return "|".join(formatted)
def _parse_api_metadata(self, metadata_str: str) -> Dict[str, Any]:
"""
Parse metadata string from FileCloud API.
Args:
metadata_str: Metadata string from API
Returns:
Dictionary of parsed metadata
"""
metadata = {}
if not metadata_str:
return metadata
# Split by the separator
items = metadata_str.split("|")
for item in items:
if "=" in item:
key, value = item.split("=", 1)
# Try to parse JSON values
try:
parsed_value = json.loads(value)
metadata[key] = parsed_value
except (json.JSONDecodeError, TypeError):
# Keep as string if not valid JSON
metadata[key] = value
return metadata
def _api_request(self, endpoint: str, data: dict = None, params: dict = None, files: dict = None, method: str = "POST") -> dict:
"""
Make a request to the FileCloud API.
Args:
endpoint: API endpoint to call (without server URL)
data: Dictionary of form data to send
params: Dictionary of URL parameters
files: Dictionary of files to upload
method: HTTP method (POST or GET)
Returns:
Dictionary with parsed response
"""
self._ensure_authenticated()
# Prepare URL
url = f"{self.server_url}{endpoint}"
try:
# Make the request based on method
if method.upper() == "GET":
response = self.session.get(
url,
params=params,
cookies=self.session.cookies
)
else: # Default to POST
response = self.session.post(
url,
data=data,
params=params,
files=files,
cookies=self.session.cookies
)
# Parse and return the response
print(f"API request to {url} returned status code {response.status_code}")
return self._parse_response(response)
except Exception as e:
return {
"success": False,
"message": f"API request failed: {str(e)}"
}
Parameters
| Name | Type | Default | Kind |
|---|---|---|---|
bases |
- | - |
Parameter Details
bases: Parameter of type
Return Value
Returns unspecified type
Class Interface
Methods
__init__(self, server_url, username, password)
Purpose: Initialize the FileCloud API client. Args: server_url: The URL of the FileCloud server (e.g., 'https://filecloud.example.com/') username: Username for authentication (optional if authenticating later) password: Password for authentication (optional if authenticating later)
Parameters:
server_url: Type: strusername: Type: strpassword: Type: str
Returns: None
login(self, username, password) -> bool
Purpose: Authenticate with the FileCloud server. Args: username: Override the username provided during initialization password: Override the password provided during initialization Returns: bool: True if authentication is successful, False otherwise.
Parameters:
username: Type: strpassword: Type: str
Returns: Returns bool
adminlogin(self, username, password) -> bool
Purpose: Authenticate with the FileCloud server. Args: username: Override the username provided during initialization password: Override the password provided during initialization Returns: bool: True if authentication is successful, False otherwise.
Parameters:
username: Type: strpassword: Type: str
Returns: Returns bool
authenticate(self, username, password) -> bool
Purpose: Alias for login() for backward compatibility
Parameters:
username: Type: strpassword: Type: str
Returns: Returns bool
logout(self) -> Dict
Purpose: Logout from the FileCloud server. Returns: Dict: Response from the server.
Returns: Returns Dict
_ensure_authenticated(self)
Purpose: Ensure that the client is authenticated with the server. Attempts to login if not already authenticated. Raises: Exception: If authentication fails.
Returns: None
_ensure_authenticated_admin(self)
Purpose: Ensure that the client is authenticated with the server. Attempts to login if not already authenticated. Raises: Exception: If authentication fails.
Returns: None
_parse_response(self, response) -> Dict
Purpose: Parse response based on content type. Args: response: The HTTP response object. Returns: Dict: Parsed response as a dictionary.
Parameters:
response: Type: requests.Response
Returns: Returns Dict
_xml_to_dict(self, element)
Purpose: Convert an XML element to a dictionary. Args: element: XML element to convert Returns: Dict or str: Converted element
Parameters:
element: Parameter
Returns: See docstring for return details
_extract_paths_from_xml(self, xml_content) -> List[str]
Purpose: Extract file paths from XML search results. Args: xml_content: XML content from search results Returns: List[str]: List of file paths
Parameters:
xml_content: Type: str
Returns: Returns List[str]
upload_file(self, local_file_path, remote_path, filename, overwrite) -> Dict
Purpose: Upload a file to the FileCloud server. Args: local_file_path: Path to the local file to upload remote_path: Directory path on the server where to upload the file filename: Optional name to use for the file on the server (default: original filename) overwrite: Whether to overwrite existing files with the same name Returns: Dict: Response from the server
Parameters:
local_file_path: Type: strremote_path: Type: strfilename: Type: Optional[str]overwrite: Type: bool
Returns: Returns Dict
download_file(self, remote_file_path, local_file_path, check_only) -> Union[Dict, bytes]
Purpose: Download a file from the FileCloud server. Args: remote_file_path: Full path to the file on the server local_file_path: Path where to save the downloaded file (if None, returns content) check_only: If True, only checks if the file is downloadable without downloading Returns: Union[Dict, bytes]: Either a response dict or the file content as bytes
Parameters:
remote_file_path: Type: strlocal_file_path: Type: Optional[str]check_only: Type: bool
Returns: Returns Union[Dict, bytes]
get_file_list(self, path, sort_by, sort_dir, start, limit, include_metadata) -> Dict
Purpose: Get a list of files and directories in the specified path. Args: path: Path to list files from sort_by: Field to sort by ("name", "date", or "size") sort_dir: Sort direction (1 for ascending, -1 for descending) start: Index to start from (for pagination) limit: Maximum number of entries to return (-1 for unlimited) include_metadata: Whether to include metadata information Returns: Dict: Response containing the file list
Parameters:
path: Type: strsort_by: Type: strsort_dir: Type: intstart: Type: intlimit: Type: intinclude_metadata: Type: bool
Returns: Returns Dict
create_folder(self, path, folder_name, subpath) -> Dict
Purpose: Create a new folder on the server. Args: path: Path where to create the folder folder_name: Name of the folder to create subpath: Alternative to folder_name, creates all missing folders in this path Returns: Dict: Response from the server
Parameters:
path: Type: strfolder_name: Type: Optional[str]subpath: Type: Optional[str]
Returns: Returns Dict
delete_file(self, path, name) -> Dict
Purpose: Delete a file or folder. Args: path: Path where the file/folder is located name: Name of the file/folder to delete Returns: Dict: Response from the server
Parameters:
path: Type: strname: Type: str
Returns: Returns Dict
rename_or_move(self, from_path, to_path, overwrite) -> Dict
Purpose: Rename or move a file or folder. Args: from_path: Full path to the source file/folder to_path: Full path to the destination overwrite: Whether to overwrite if destination exists Returns: Dict: Response from the server
Parameters:
from_path: Type: strto_path: Type: stroverwrite: Type: bool
Returns: Returns Dict
copy_file(self, path, name, copy_to, overwrite) -> Dict
Purpose: Copy a file to a new location. Args: path: Path where the file is located name: Name of the file to copy copy_to: Full path to the destination overwrite: Whether to overwrite if destination exists Returns: Dict: Response from the server
Parameters:
path: Type: strname: Type: strcopy_to: Type: stroverwrite: Type: bool
Returns: Returns Dict
search(self, search_string, search_scope, search_location, min_size, max_size, start, limit) -> Dict
Purpose: Search for files and folders. Args: search_string: String to search for search_scope: Search scope (0=All, 1=Paths, 2=Content, 3=Metadata) search_location: Path to search within (None for all accessible locations) min_size: Minimum file size in KB max_size: Maximum file size in KB start: Start index for pagination limit: Maximum number of results to return Returns: Dict: Search results
Parameters:
search_string: Type: strsearch_scope: Type: strsearch_location: Type: Optional[str]min_size: Type: Optional[int]max_size: Type: Optional[int]start: Type: intlimit: Type: int
Returns: Returns Dict
search_metadata(self, search_string, search_scope, attributes, search_location) -> Dict
Purpose: Search for files based on metadata attributes. Args: search_string: String to search for (use '**' for all files) search_scope: Search scope (0=All, 1=Paths, 2=Content, 3=Metadata) attributes: List of attribute dictionaries, each containing one of these formats: - Format 1: {'id': 'attr_id', 'value': 'attr_value', 'type': type_int} - Format 2: {'setid': 'set_id', 'attributeid': 'attr_id', 'operator': 'equals', 'value': 'attr_value'} search_location: Optional path to limit the search to Returns: Dict: Search results with paths Example: # Search for files with multiple metadata criteria client.search_metadata( attributes=[ {"id": "abc123", "value": "Important", "type": 1}, {"id": "def456", "value": "2023", "type": 2} ], search_location="/user/documents" )
Parameters:
search_string: Type: strsearch_scope: Type: strattributes: Type: Optional[List[Dict[str, Any]]]search_location: Type: Optional[str]
Returns: Returns Dict
search_metadata_single(self, search_string, search_scope, attribute_id, attribute_type, attribute_value) -> Dict
Purpose: Search for files based on a single metadata attribute. This method is kept for backward compatibility. The search_metadata method is preferred. Args: search_string: String to search for (use '**' for all files) search_scope: Search scope (0=All, 1=Paths, 2=Content, 3=Metadata) attribute_id: Metadata attribute ID to search for attribute_type: Metadata attribute type (1=Text, 2=Integer, etc.) attribute_value: Metadata attribute value to match Returns: Dict: Search results with paths
Parameters:
search_string: Type: strsearch_scope: Type: strattribute_id: Type: Optional[str]attribute_type: Type: Optional[int]attribute_value: Type: Optional[str]
Returns: Returns Dict
get_metadata_values(self, file_path) -> Dict
Purpose: Get metadata values for a file or folder. Args: file_path: Full path to the file or folder Returns: Dict: Metadata values
Parameters:
file_path: Type: str
Returns: Returns Dict
save_attribute_values(self, file_path, set_id, attribute_values) -> Dict
Purpose: Save metadata attribute values for a file or folder. Args: file_path: Full path to the file or folder set_id: ID of the metadata set attribute_values: Dictionary of attribute IDs and values Returns: Dict: Response from the server
Parameters:
file_path: Type: strset_id: Type: strattribute_values: Type: Dict[str, str]
Returns: Returns Dict
get_file_info(self, file_path) -> Dict
Purpose: Get information about a file. Args: file_path: Full path to the file Returns: Dict: File information
Parameters:
file_path: Type: str
Returns: Returns Dict
create_directory_tree(self, base_path, target_path) -> Dict
Purpose: Create a directory tree, creating all necessary parent directories. Args: base_path: Base path where to create the directories target_path: Path structure to create (relative to base_path) Returns: Dict: Response from the server
Parameters:
base_path: Type: strtarget_path: Type: str
Returns: Returns Dict
get_automation_workflows(self, include_disabled) -> Dict
Purpose: Retrieve existing automation workflows. Args: include_disabled: Whether to include disabled workflows in the results Returns: Dict: List of available workflows
Parameters:
include_disabled: Type: bool
Returns: Returns Dict
get_automation_workflow_details(self, workflow_id) -> Dict
Purpose: Retrieve details of a specific automation workflow. Args: workflow_id: ID of the workflow to retrieve Returns: Dict: Workflow details
Parameters:
workflow_id: Type: str
Returns: Returns Dict
update_automation_workflow(self, workflow_id, workflow_data) -> Dict
Purpose: Update an existing automation workflow. Args: workflow_id: ID of the workflow to update workflow_data: Dictionary containing the updated workflow configuration Returns: Dict: Response from the server
Parameters:
workflow_id: Type: strworkflow_data: Type: Dict
Returns: Returns Dict
create_automation_workflow(self, workflow_data) -> Dict
Purpose: Create a new automation workflow. Args: workflow_data: Dictionary containing the workflow configuration Returns: Dict: Response from the server with the new workflow ID
Parameters:
workflow_data: Type: Dict
Returns: Returns Dict
delete_automation_workflow(self, workflow_id) -> Dict
Purpose: Delete an automation workflow. Args: workflow_id: ID of the workflow to delete Returns: Dict: Response from the server
Parameters:
workflow_id: Type: str
Returns: Returns Dict
start_automation_workflow(self, workflow_id, path, file_names) -> Dict
Purpose: Start an automation workflow on specific files. Args: workflow_id: ID of the workflow to start path: Path where the files are located file_names: List of file names to process (if None, processes all files in the path) Returns: Dict: Response from the server
Parameters:
workflow_id: Type: strpath: Type: strfile_names: Type: Optional[List[str]]
Returns: Returns Dict
get_automation_workflow_runs(self, workflow_id, start, limit) -> Dict
Purpose: Get the history of automation workflow runs. Args: workflow_id: Optional ID to filter by specific workflow start: Starting index for pagination limit: Maximum number of results to return Returns: Dict: List of workflow runs
Parameters:
workflow_id: Type: Optional[str]start: Type: intlimit: Type: int
Returns: Returns Dict
get_automation_workflow_run_details(self, run_id) -> Dict
Purpose: Get details about a specific workflow run. Args: run_id: ID of the workflow run to get details for Returns: Dict: Detailed information about the workflow run
Parameters:
run_id: Type: str
Returns: Returns Dict
_prepare_workflow_data(self, workflow_data) -> Dict
Purpose: Helper method to prepare workflow data for API submission. Args: workflow_data: Dictionary containing workflow configuration Returns: Dict: Formatted parameters for API request
Parameters:
workflow_data: Type: Dict
Returns: Returns Dict
get_metadata_sets(self) -> Dict
Purpose: Get all available metadata sets. Returns: Dict: List of metadata sets with success status and error details
Returns: Returns Dict
get_metadata_attributes(self, set_id) -> Dict
Purpose: Get attributes for a specific metadata set. Args: set_id: ID of the metadata set Returns: Dict: List of attributes in the set
Parameters:
set_id: Type: str
Returns: Returns Dict
get_metadata_values(self, file_path) -> Dict
Purpose: Get metadata values for a specific file. Args: file_path: Path to the file Returns: Dict: Metadata values for the file
Parameters:
file_path: Type: str
Returns: Returns Dict
save_attribute_values(self, file_path, set_id, attribute_values) -> Dict
Purpose: Save metadata values for a specific file. Args: file_path: Path to the file set_id: ID of the metadata set attribute_values: Dictionary of attribute IDs and values Returns: Dict: Response from the API
Parameters:
file_path: Type: strset_id: Type: strattribute_values: Type: Dict[str, str]
Returns: Returns Dict
create_document_folder(self, folder_path, create_parents) -> Dict[str, Any]
Purpose: Create a folder for document storage. Args: folder_path: Folder path to create create_parents: Whether to create parent folders if they don't exist Returns: Response dictionary with status information
Parameters:
folder_path: Type: strcreate_parents: Type: bool
Returns: Returns Dict[str, Any]
upload_controlled_document(self, file_data, folder_path, filename, metadata, version_comment) -> Dict[str, Any]
Purpose: Upload a controlled document with special handling. Args: file_data: File content as bytes or file-like object folder_path: Target folder path filename: Filename to use metadata: Document metadata to attach version_comment: Comment for version history Returns: Response dictionary with uploaded file information
Parameters:
file_data: Type: Union[bytes, BinaryIO]folder_path: Type: strfilename: Type: strmetadata: Type: Dict[str, Any]version_comment: Type: str
Returns: Returns Dict[str, Any]
get_document_with_metadata(self, file_path) -> Dict[str, Any]
Purpose: Get document content and metadata in a single call. Args: file_path: Path to the document Returns: Dictionary with file content and metadata
Parameters:
file_path: Type: str
Returns: Returns Dict[str, Any]
set_file_metadata(self, file_path, metadata) -> Dict[str, Any]
Purpose: Set custom metadata for a file. Args: file_path: Path to the file metadata: Dictionary of metadata key-value pairs Returns: Response dictionary
Parameters:
file_path: Type: strmetadata: Type: Dict[str, Any]
Returns: Returns Dict[str, Any]
get_file_metadata(self, file_path) -> Dict[str, Any]
Purpose: Get custom metadata for a file. Args: file_path: Path to the file Returns: Response dictionary with metadata
Parameters:
file_path: Type: str
Returns: Returns Dict[str, Any]
get_file_versions(self, file_path) -> Dict[str, Any]
Purpose: Get version history for a file. Args: file_path: Path to the file Returns: Response dictionary with version history
Parameters:
file_path: Type: str
Returns: Returns Dict[str, Any]
restore_file_version(self, file_path, version_id) -> Dict[str, Any]
Purpose: Restore a previous version of a file. Args: file_path: Path to the file version_id: Version ID to restore Returns: Response dictionary
Parameters:
file_path: Type: strversion_id: Type: str
Returns: Returns Dict[str, Any]
set_document_permissions(self, file_path, users, groups) -> Dict[str, Any]
Purpose: Set document access permissions. Args: file_path: Path to the document users: Dictionary mapping usernames to permission lists groups: Dictionary mapping groups to permission lists Returns: Response dictionary
Parameters:
file_path: Type: strusers: Type: Dict[str, List[str]]groups: Type: Dict[str, List[str]]
Returns: Returns Dict[str, Any]
create_document_share(self, file_path, share_type, password, expiry_days, notify_emails) -> Dict[str, Any]
Purpose: Create a shareable link for a document. Args: file_path: Path to the document share_type: Type of share (view, download, upload, collaborate) password: Optional password protection expiry_days: Number of days until link expires notify_emails: List of emails to notify about the share Returns: Response dictionary with share link
Parameters:
file_path: Type: strshare_type: Type: strpassword: Type: Optional[str]expiry_days: Type: Optional[int]notify_emails: Type: Optional[List[str]]
Returns: Returns Dict[str, Any]
search_documents(self, search_text, folder_path, metadata, doc_type, max_results) -> Dict[str, Any]
Purpose: Search for documents using text and metadata. Args: search_text: Text to search for folder_path: Optional folder path to limit search metadata: Optional metadata criteria as key-value pairs doc_type: Optional document type to filter by max_results: Maximum number of results to return Returns: Response dictionary with search results
Parameters:
search_text: Type: strfolder_path: Type: Optional[str]metadata: Type: Optional[Dict[str, str]]doc_type: Type: Optional[str]max_results: Type: int
Returns: Returns Dict[str, Any]
start_document_workflow(self, file_path, workflow_name, workflow_data) -> Dict[str, Any]
Purpose: Start a workflow for a document. Args: file_path: Path to the document workflow_name: Name of the workflow to start workflow_data: Initial workflow data Returns: Response dictionary with workflow information
Parameters:
file_path: Type: strworkflow_name: Type: strworkflow_data: Type: Dict[str, Any]
Returns: Returns Dict[str, Any]
check_folder_exists(self, folder_path) -> bool
Purpose: Check if a folder exists. Args: folder_path: Path to check Returns: True if folder exists, False otherwise
Parameters:
folder_path: Type: str
Returns: Returns bool
check_file_exists(self, file_path) -> bool
Purpose: Check if a file exists. Args: file_path: Path to check Returns: True if file exists, False otherwise
Parameters:
file_path: Type: str
Returns: Returns bool
_format_metadata_for_api(self, metadata) -> str
Purpose: Format metadata dictionary for FileCloud API. Args: metadata: Dictionary of metadata Returns: Formatted metadata string
Parameters:
metadata: Type: Dict[str, Any]
Returns: Returns str
_parse_api_metadata(self, metadata_str) -> Dict[str, Any]
Purpose: Parse metadata string from FileCloud API. Args: metadata_str: Metadata string from API Returns: Dictionary of parsed metadata
Parameters:
metadata_str: Type: str
Returns: Returns Dict[str, Any]
_api_request(self, endpoint, data, params, files, method) -> dict
Purpose: Make a request to the FileCloud API. Args: endpoint: API endpoint to call (without server URL) data: Dictionary of form data to send params: Dictionary of URL parameters files: Dictionary of files to upload method: HTTP method (POST or GET) Returns: Dictionary with parsed response
Parameters:
endpoint: Type: strdata: Type: dictparams: Type: dictfiles: Type: dictmethod: Type: str
Returns: Returns dict
Required Imports
import requests
import os
import xmltodict
import time
import io
Usage Example
# Example usage:
# result = FileCloudAPI(bases)
Tags
Similar Components
AI-powered semantic similarity - components with related functionality:
-
class FileCloudAPI_v1 98.7% similar
-
class FileCloudClient 70.9% similar
-
class FileCloudClient_v1 70.6% similar
-
class FileCloudIntegration 55.1% similar
-
function get_filecloud_client 54.4% similar