function manage_document_permissions
Comprehensive function to manage document sharing and user permissions. This function: 1. Creates a share only if needed for active users 2. Adds/updates users with appropriate permissions based on their roles 3. Removes users who shouldn't have access anymore 4. Cleans up shares that are no longer needed 5. Manages ACL entries for write permissions on the document's folder Args: document: The document to manage permissions for Returns: Dict: Result of permission updates with detailed information
/tf/active/vicechatdev/CDocs/controllers/share_controller.py
72 - 784
moderate
Purpose
Comprehensive function to manage document sharing and user permissions. This function: 1. Creates a share only if needed for active users 2. Adds/updates users with appropriate permissions based on their roles 3. Removes users who shouldn't have access anymore 4. Cleans up shares that are no longer needed 5. Manages ACL entries for write permissions on the document's folder Args: document: The document to manage permissions for Returns: Dict: Result of permission updates with detailed information
Source Code
def manage_document_permissions(document: ControlledDocument) -> Dict[str, Any]:
"""
Comprehensive function to manage document sharing and user permissions.
This function:
1. Creates a share only if needed for active users
2. Adds/updates users with appropriate permissions based on their roles
3. Removes users who shouldn't have access anymore
4. Cleans up shares that are no longer needed
5. Manages ACL entries for write permissions on the document's folder
Args:
document: The document to manage permissions for
Returns:
Dict: Result of permission updates with detailed information
"""
try:
# Get current version
current_version = document.current_version
if not current_version:
return {
'success': False,
'message': 'Document has no current version'
}
# Get document status
status = document.status
logger.info(f"Document {document.doc_number} status: {status}")
# Initialize results dictionary
results = {
'success': True,
'document_id': document.uid,
'document_number': document.doc_number,
'version_id': current_version.uid,
'document_status': status,
'share_id': None,
'share_url': None,
'share_created': False,
'author_access': False,
'reviewer_access': False,
'approver_access': False,
'users_added': 0,
'users_updated': 0,
'users_removed': 0,
'acls_added': 0,
'acls_removed': 0,
'message': 'Permissions updated successfully',
'details': []
}
# Get FileCloud client
fc_client = get_filecloud_client()
if not fc_client:
return {
'success': False,
'message': 'Could not get FileCloud client'
}
# Step 1: Determine who should have access based on document status
# -----------------------------------------------------------------
# Track all users who should have access with their permission level
# Format: {user_email: {'role': 'author/reviewer/approver', 'write_access': True/False}}
should_have_access = {}
# NEW CONDITION: Grant write access to owner and authors if document is IN_REVIEW
# but all review cycles are completed or cancelled
if document.status == STATUS_IN_REVIEW:
# Find if there are any active reviews
has_active_reviews = False
# Query for review cycles related to this document version
from CDocs.db import db_operations as db
review_query = db.run_query(
"""
MATCH (v:DocumentVersion {UID: $version_uid})<-[:FOR_REVIEW]-(r:ReviewCycle)
WHERE not(r.status IN ['COMPLETED', 'CANCELED'])
RETURN count(r) as active_count
""",
{"version_uid": current_version.uid}
)
if review_query and review_query[0].get('active_count', 0) > 0:
has_active_reviews = True
# Step 1.1: Check if the document owner should have access
document_owner = document.owner
if document_owner and hasattr(document_owner, 'email') and document_owner.email:
owner_write_access = (status == STATUS_DRAFT) or (status == STATUS_IN_REVIEW and not has_active_reviews)
should_have_access[document_owner.email] = {
'role': 'owner',
'write_access': owner_write_access,
'username': getattr(document_owner, 'username', document_owner.email)
}
# Step 1.2: Check if authors should have access
if hasattr(document, 'authors'):
for author in document.authors:
if (author and hasattr(author, 'email') and author.email and
(not document_owner or author.UID != document_owner.UID)):
# Authors have write access only in DRAFT status
author_write_access = (status == STATUS_DRAFT) or (status == STATUS_IN_REVIEW and not has_active_reviews)
should_have_access[author.email] = {
'role': 'author',
'write_access': author_write_access,
'username': getattr(author, 'username', author.email)
}
# Step 1.3: Check if reviewers should have access
if status == "IN_REVIEW":
try:
# Get active review cycles for this document version
from CDocs.models.review import ReviewCycle
from CDocs.db import db_operations
# Get review cycles associated with this document version
review_cycles = []
review_query = db_operations.run_query(
"""
MATCH (v:DocumentVersion {UID: $version_uid})<-[:FOR_REVIEW]-(r:ReviewCycle)
WHERE r.status IN ['PENDING', 'IN_PROGRESS']
RETURN r.UID as review_uid
""",
{"version_uid": current_version.uid}
)
if review_query:
review_cycles = [ReviewCycle(uid=record["review_uid"]) for record in review_query]
# Process each active review cycle
for review_cycle in review_cycles:
if review_cycle and review_cycle.status in ['PENDING', 'IN_PROGRESS']:
reviewer_assignments = review_cycle.get_reviewer_assignments()
for assignment in reviewer_assignments:
if assignment:
reviewer_uid = assignment.reviewer_uid
# For sequential reviews, only the active reviewer should have write access
if review_cycle.sequential:
is_reviewer_active = assignment.status == 'ACTIVE'
else:
is_reviewer_active = assignment.status in ['ACTIVE', 'PENDING']
should_have_write = is_reviewer_active
# Get the reviewer's user object to get email
from CDocs.models.user_extensions import DocUser
reviewer = DocUser(None, reviewer_uid)
if reviewer and reviewer.email:
should_have_access[reviewer.email] = {
'role': 'reviewer',
'write_access': should_have_write,
'username': getattr(reviewer, 'username', reviewer.email)
}
if should_have_write:
results['reviewer_access'] = True
except Exception as review_err:
logger.error(f"Error determining reviewer access for document {document.uid}: {str(review_err)}")
# Step 1.4: Check if approvers should have access
if status == "IN_APPROVAL":
logger.info("process approvers")
try:
# Get active approval cycles for this document version
from CDocs.models.approval import ApprovalCycle
from CDocs.db import db_operations
# Get approval cycles associated with this document version
approval_cycles = []
approval_query = db_operations.run_query(
"""
MATCH (v:DocumentVersion {UID: $version_uid})<-[:FOR_APPROVAL]-(a:ApprovalCycle)
WHERE a.status IN ['PENDING', 'IN_PROGRESS', 'IN_APPROVAL','ACTIVE']
RETURN a.UID as approval_uid
""",
{"version_uid": current_version.uid}
)
if approval_query:
approval_cycles = [ApprovalCycle(uid=record["approval_uid"]) for record in approval_query]
# Process each active approval cycle
for approval_cycle in approval_cycles:
if approval_cycle and approval_cycle.status not in ['COMPLETED', 'REJECTED', 'CANCELLED']:
approver_assignments = approval_cycle.get_approver_assignments()
for assignment in approver_assignments:
if assignment and not assignment.removal_date: # Skip removed approvers
approver_uid = assignment.approver_uid
# Get the approver's user object to get email
from CDocs.models.user_extensions import DocUser
approver = DocUser(None, approver_uid)
if approver and approver.email:
should_have_access[approver.email] = {
'role': 'approver',
'write_access': False, # Approvers always get read-only access
'username': getattr(approver, 'username', approver.email)
}
results['approver_access'] = True
except Exception as approval_err:
logger.error(f"Error determining approver access for document {document.uid}: {str(approval_err)}")
# Step 1.5: Check if users with training assignments should have access
if document.is_training_enabled():
try:
training_assignments = document.get_training_assignments()
for assignment in training_assignments:
user_email = assignment.get("user_email")
training_status = assignment.get("status", "REQUIRED")
if user_email and training_status in ["REQUIRED", "TRAINED"]:
should_have_access[user_email] = {
'role': 'trainee',
'write_access': False, # Training users get read-only access
'username': assignment.get("user_name", user_email)
}
except Exception as training_err:
logger.error(f"Error determining training access for document {document.uid}: {str(training_err)}")
# If document is in a terminal state, everyone gets read-only access
if status in [STATUS_PUBLISHED, STATUS_EFFECTIVE, STATUS_ARCHIVED, STATUS_OBSOLETE]:
for email in should_have_access:
should_have_access[email]['write_access'] = False
logger.info(f"Users who should have access: {should_have_access}")
# Step 2: Check if we need to create or update a share
# -----------------------------------------------------------------
# If no users need access, we don't need to create a share
# Fix for the share removal at the beginning of the function
if not should_have_access:
# Check if there's an existing share that should be removed
share_id = getattr(current_version, 'share_id', None)
if share_id:
try:
# Use delete_share instead of remove_share
remove_share_result = fc_client.delete_share(share_id)
if remove_share_result.get('success', False):
logger.info(f"Removed unnecessary share for document {document.doc_number}")
# Clear share info from document version
current_version.share_id = None
current_version.share_url = None
current_version.save()
results['message'] = f"Share removed for document with no users"
else:
logger.warning(f"Failed to remove unnecessary share: {remove_share_result.get('message', 'Unknown error')}")
except Exception as remove_err:
logger.error(f"Error removing unnecessary share: {str(remove_err)}")
return results
# Now, let's create or get the share
share_id = getattr(current_version, 'share_id', None)
share_url = getattr(current_version, 'share_url', None)
if not share_id or not share_url:
# Create a share if needed
try:
# Use create_document_share_link from filecloud_controller instead of the non-existent manage_document_share
from CDocs.controllers.filecloud_controller import create_document_share_link
share_result = create_document_share_link(
document=document, # Pass document object directly
user=None, # System operation
version=current_version.version_number
)
if not share_result.get('success', False):
logger.error(f"Failed to create share: {share_result.get('message', 'Unknown error')}")
return {
'success': False,
'message': f"Failed to create share: {share_result.get('message', 'Unknown error')}"
}
# Update document version with share information
share_id = share_result.get('share_id')
share_url = share_result.get('share_url')
logger.info(f"Share created with ID: {share_id}, URL: {share_url}")
if not hasattr(current_version, 'share_id'):
setattr(current_version, 'share_id', share_id)
else:
current_version.share_id = share_id
if not hasattr(current_version, 'share_url'):
setattr(current_version, 'share_url', share_url)
else:
current_version.share_url = share_url
# Save the updated document version
current_version.save()
results['share_created'] = True
logger.info(f"Created new share for document {document.doc_number} with ID {share_id}")
except Exception as share_err:
logger.error(f"Error creating share for document {document.uid}: {str(share_err)}")
return {
'success': False,
'message': f"Error creating share: {str(share_err)}"
}
# Update share info in results
results['share_id'] = share_id
results['share_url'] = share_url
# Step 3: Get current users who have access to the share
# -----------------------------------------------------
current_users = set()
try:
users_result = fc_client.get_users_for_share(share_id=share_id)
logger.info(f"Users result: {users_result}")
if users_result.get('success', False):
# Extract users from the result
try:
if 'data' in users_result:
# Handle different response structures
if 'users' in users_result['data']:
users_data = users_result['data']['users']
# Users data can be a dict with a 'user' key, or directly a user list
if isinstance(users_data, dict) and 'user' in users_data:
user_data = users_data['user']
# User can be a single dict or a list of dicts
if isinstance(user_data, list):
# Multiple users - list format
for user in user_data:
if isinstance(user, dict):
user_id = user.get('name') or user.get('email') or user.get('userid')
if user_id:
current_users.add(user_id)
logger.debug(f"Added user from list: {user_id}")
elif isinstance(user_data, dict):
# Single user - dict format
user_id = user_data.get('name') or user_data.get('email') or user_data.get('userid')
if user_id:
current_users.add(user_id)
logger.debug(f"Added single user: {user_id}")
elif isinstance(users_data, list):
# Direct list of user objects
for user in users_data:
if isinstance(user, dict):
user_id = user.get('name') or user.get('email') or user.get('userid')
if user_id:
current_users.add(user_id)
logger.debug(f"Added user from direct list: {user_id}")
except Exception as parse_err:
logger.warning(f"Error parsing users response: {str(parse_err)}")
logger.warning(f"Full users_result structure: {users_result}")
except Exception as users_err:
logger.warning(f"Error getting users for share: {str(users_err)}")
logger.info(f"Current users with access: {current_users}")
# Get document folder path
document_folder = None
try:
file_path = get_filecloud_document_path(document)
if file_path:
document_folder = os.path.dirname(file_path)
logger.info(f"Document folder path: {document_folder}")
except Exception as folder_err:
logger.error(f"Error determining document folder: {str(folder_err)}")
document_folder = None
# Step 3.5: Get current effective ACLs for all users in the system
# ----------------------------------------------------------------
current_acls = {}
if document_folder:
try:
# Get all users in the CDocs system
from CDocs.models.user_extensions import DocUser
from CDocs.db import db_operations as db
# Query all users from the system
users_query = db.run_query(
"""
MATCH (u:User)
WHERE u.Mail IS NOT NULL AND u.Mail <> '' and not 'Template' in labels(u)
RETURN u.Mail as email, u.UID as uid
""",
{}
)
if users_query:
#logger.info(f"Checking effective ACLs for {len(users_query)} users on path: {document_folder}")
for user_record in users_query:
user_email = user_record.get('email')
if user_email:
try:
# Get effective ACL for this specific user
effective_acl = fc_client.get_effective_acl(
path=document_folder,
emailid=user_email # FIXED: Changed from 'emailid' to 'user'
)
#logger.info(f"effective_acl data for {user_email}: {effective_acl}")
if effective_acl.get('success', False) and 'data' in effective_acl:
acl_data = effective_acl['data']
# FIXED: Parse the correct response structure
if 'permissions' in acl_data and 'perm' in acl_data['permissions']:
permission_data = acl_data['permissions']['perm']
logger.info("Permission data structure: %s", permission_data)
# Extract permission flags
read_perm = permission_data.get('read', '0') == '1'
write_perm = permission_data.get('write', '0') == '1'
delete_perm = permission_data.get('delete', '0') == '1' # FIXED: Changed from 'deletionallowed' to 'delete'
share_perm = permission_data.get('share', '0') == '1'
manage_perm = permission_data.get('manage', '0') == '1' # FIXED: Changed from 'canmanageacl' to 'manage'
# Only track users who have some form of access
if read_perm or write_perm or delete_perm or share_perm or manage_perm:
permissions_string = ''
if read_perm:
permissions_string += 'R'
if write_perm:
permissions_string += 'W'
if delete_perm:
permissions_string += 'D'
if share_perm:
permissions_string += 'S'
if manage_perm:
permissions_string += 'M'
current_acls[user_email] = permissions_string
logger.debug(f"User {user_email} has effective permissions: {permissions_string}")
else:
# User has no effective permissions on this path
logger.debug(f"User {user_email} has no effective permissions on {document_folder}")
except Exception as user_acl_err:
logger.debug(f"Error checking effective ACL for user {user_email}: {str(user_acl_err)}")
continue
else:
logger.warning("No users found in the system")
except Exception as acl_err:
logger.warning(f"Error getting effective ACLs: {str(acl_err)}")
logger.info(f"Current effective ACLs for document folder: {current_acls}")
# Step 4: Add/update users who should have access
# ----------------------------------------------
for email, access_info in should_have_access.items():
try:
user_already_exists = email in current_users
# Determine if user needs to be added or just updated
if not user_already_exists:
# Add user to share
add_result = fc_client.add_user_to_share(
user_id=email,
share_id=share_id
)
if not add_result.get('success', False):
if "already added" in str(add_result.get('message', '')).lower():
# User was already added but not detected in current_users
user_already_exists = True
logger.info(f"User {email} was already added to share {share_id}")
else:
logger.error(f"Failed to add user {email} to share: {add_result.get('message', 'Unknown error')}")
results['details'].append({
'user': access_info['username'],
'role': access_info['role'],
'write_access': access_info['write_access'],
'operation': 'add',
'result': False,
'message': f"Failed to add user: {add_result.get('message', 'Unknown error')}"
})
continue
# Set appropriate permissions
perm_result = fc_client.set_user_access_for_share(
share_id=share_id,
user_id=email,
write=access_info['write_access'],
download=True,
share=False,
sync=False
)
if perm_result.get('success', False):
# Record successful permission update
if user_already_exists:
results['users_updated'] += 1
operation = 'update'
else:
results['users_added'] += 1
operation = 'add'
# Track author access for result
if access_info['role'] in ['owner', 'author'] and access_info['write_access']:
results['author_access'] = True
results['details'].append({
'user': access_info['username'],
'role': access_info['role'],
'write_access': access_info['write_access'],
'operation': operation,
'result': True
})
# Handle ACL for write access
if document_folder and access_info['write_access']:
# Check if user already has effective write access
user_current_perms = current_acls.get(email, '')
has_write_acl = 'W' in user_current_perms
if not has_write_acl:
# Add ACL entry with write permission
acl_result = fc_client.add_acl_entry(
path=document_folder,
entry_type='user',
value=email,
permissions='RW' # Read and Write permission
)
if acl_result.get('success', False):
results['acls_added'] += 1
logger.info(f"Added ACL entry for user {email} with write permission")
results['details'].append({
'user': access_info['username'],
'operation': 'add_acl',
'result': True,
'path': document_folder
})
else:
logger.warning(f"Failed to add ACL entry: {acl_result.get('message', 'Unknown error')}")
# Remove ACL if user has read-only access but has effective write ACL
elif document_folder and not access_info['write_access']:
user_current_perms = current_acls.get(email, '')
has_write_acl = 'W' in user_current_perms
if has_write_acl:
# Remove write permission ACL
acl_result = fc_client.delete_acl_entry(
path=document_folder,
entry_type='user',
value=email
)
if acl_result.get('success', False):
results['acls_removed'] += 1
logger.info(f"Removed ACL entry for user {email} (changed to read-only)")
results['details'].append({
'user': access_info['username'],
'operation': 'remove_acl',
'result': True,
'path': document_folder
})
else:
logger.warning(f"Failed to set permissions for user {email}: {perm_result.get('message', 'Unknown error')}")
results['details'].append({
'user': access_info['username'],
'role': access_info['role'],
'write_access': access_info['write_access'],
'operation': 'set_permissions',
'result': False,
'message': f"Failed to set permissions: {perm_result.get('message', 'Unknown error')}"
})
except Exception as user_err:
logger.error(f"Error managing access for user {email}: {str(user_err)}")
results['details'].append({
'user': access_info.get('username', email),
'role': access_info['role'],
'write_access': access_info['write_access'],
'operation': 'error',
'result': False,
'message': f"Error: {str(user_err)}"
})
# Step 5: Fix for Issue 1 - Remove users who shouldn't have access anymore
# -----------------------------------------------------
# Include users from current_users (share) and users with write ACL permissions
users_with_share_access = current_users
users_with_write_acl = {email for email, perms in current_acls.items() if 'W' in perms}
# Combine both sets to get all users who currently have some form of access
all_users_with_access = users_with_share_access | users_with_write_acl
# Remove users who shouldn't have access anymore
users_to_remove = all_users_with_access - set(should_have_access.keys())
logger.info(f"Users to remove from share: {users_with_share_access - set(should_have_access.keys())}")
logger.info(f"Users to remove from write ACL: {users_with_write_acl - set(should_have_access.keys())}")
logger.info(f"All users to remove: {users_to_remove}")
for email in users_to_remove:
try:
# Remove from share if user is in current_users
if email in current_users:
remove_result = fc_client.delete_user_from_share(
share_id=share_id,
user_id=email
)
if remove_result.get('success', False):
results['users_removed'] += 1
results['details'].append({
'user': email,
'role': 'former',
'write_access': False,
'operation': 'remove_from_share',
'result': True
})
logger.info(f"Removed user {email} from share")
else:
logger.warning(f"Failed to remove user {email} from share: {remove_result.get('message', 'Unknown error')}")
results['details'].append({
'user': email,
'role': 'former',
'write_access': False,
'operation': 'remove_from_share',
'result': False,
'message': f"Failed to remove user: {remove_result.get('message', 'Unknown error')}"
})
# Remove ACL entry if user has write permissions and document folder is known
if document_folder and email in current_acls and 'W' in current_acls[email]:
# Remove write permission ACL since user should no longer have access
acl_result = fc_client.delete_acl_entry(
path=document_folder,
entry_type='user',
value=email
)
if acl_result.get('success', False):
results['acls_removed'] += 1
logger.info(f"Removed ACL entry for user {email} (user no longer needs access)")
results['details'].append({
'user': email,
'operation': 'remove_acl',
'result': True,
'path': document_folder
})
else:
logger.warning(f"Failed to remove ACL entry for {email}: {acl_result.get('message', 'Unknown error')}")
results['details'].append({
'user': email,
'operation': 'remove_acl',
'result': False,
'path': document_folder,
'message': f"Failed to remove ACL: {acl_result.get('message', 'Unknown error')}"
})
except Exception as remove_err:
logger.error(f"Error removing user {email}: {str(remove_err)}")
results['details'].append({
'user': email,
'role': 'former',
'write_access': False,
'operation': 'remove_error',
'result': False,
'message': f"Error: {str(remove_err)}"
})
# Step 6: Fix for Issue 2 - Clean up unused shares for terminal states
# ------------------------------------------------
# If document is in a terminal state and no users need access, consider removing the share
if (status in [STATUS_ARCHIVED, STATUS_OBSOLETE] and len(should_have_access) == 0):
try:
# Only remove shares for ARCHIVED or OBSOLETE documents with no users
# Use delete_share instead of remove_share
remove_share_result = fc_client.delete_share(share_id)
if remove_share_result.get('success', False):
logger.info(f"Removed unused share for {status} document {document.doc_number}")
# Clear share info from document version
current_version.share_id = None
current_version.share_url = None
current_version.save()
results['message'] = f"Share removed for {status} document with no users"
else:
logger.warning(f"Failed to remove unused share: {remove_share_result.get('message', 'Unknown error')}")
except Exception as remove_err:
logger.error(f"Error removing unused share: {str(remove_err)}")
# Update summary statistics
if results['users_added'] > 0 or results['users_updated'] > 0 or results['users_removed'] > 0:
summary_message = (f"Updated permissions: {results['users_added']} users added, "
f"{results['users_updated']} users updated, "
f"{results['users_removed']} users removed")
if results['acls_added'] > 0 or results['acls_removed'] > 0:
summary_message += f", {results['acls_added']} ACLs added, {results['acls_removed']} ACLs removed"
results['message'] = summary_message
return results
except Exception as e:
logger.error(f"Error managing document permissions: {str(e)}")
import traceback
logger.error(traceback.format_exc())
return {
'success': False,
'message': f'Error managing permissions: {str(e)}'
}
Parameters
| Name | Type | Default | Kind |
|---|---|---|---|
document |
ControlledDocument | - | positional_or_keyword |
Parameter Details
document: Parameter of type ControlledDocument
Return Value
Type: Dict[str, Any]
Returns Dict[str, Any]
Required Imports
import logging
from typing import Dict
from typing import Any
from typing import List
from typing import Optional
Usage Example
# Example usage:
# result = manage_document_permissions(document)
Similar Components
AI-powered semantic similarity - components with related functionality:
-
function manage_user_share_access_v2 76.3% similar
-
function manage_user_share_access_v1 74.2% similar
-
function manage_user_share_access 73.3% similar
-
function check_document_permissions_on_startup_v1 68.7% similar
-
function set_document_permissions_in_filecloud 67.9% similar