class ControlledDocumentFlaskApp
Main Flask application class for Controlled Document Management System.
/tf/active/vicechatdev/CDocs/main_flask.py
146 - 3881
moderate
Purpose
Main Flask application class for Controlled Document Management System.
Source Code
class ControlledDocumentFlaskApp:
"""Main Flask application class for Controlled Document Management System."""
def __init__(self, config_path=None):
"""Initialize the Flask application."""
# Get the CDocs directory path
cdocs_dir = os.path.dirname(os.path.abspath(__file__))
self.app = Flask(__name__,
template_folder=os.path.join(cdocs_dir, 'flask_templates'),
static_folder=os.path.join(cdocs_dir, 'flask_static'))
# Configure Flask app
self._configure_app()
# Initialize database
self._init_database()
# Setup authentication
self._setup_authentication()
# Setup Azure SSO
self._setup_azure_sso()
# Initialize notification manager
self.notification_manager = NotificationManager()
# Register routes
self._register_routes()
# Initialize session manager (reuse CDocs logic)
self.session_manager = SessionManager()
logger.info("Flask CDocs application initialized successfully")
def _configure_app(self):
"""Configure Flask application settings."""
self.app.config.update({
'SECRET_KEY': os.environ.get('FLASK_SECRET_KEY', str(uuid.uuid4())),
'WTF_CSRF_ENABLED': True,
'SESSION_COOKIE_SECURE': False, # Set to True in production with HTTPS
'SESSION_COOKIE_HTTPONLY': True,
'SESSION_COOKIE_SAMESITE': 'Lax',
'PERMANENT_SESSION_LIFETIME': timedelta(hours=8),
'MAX_CONTENT_LENGTH': 100 * 1024 * 1024, # 100MB max file size
'UPLOAD_FOLDER': os.path.join(os.path.dirname(__file__), 'uploads'),
'TEMPLATES_AUTO_RELOAD': True
})
# Ensure upload folder exists
os.makedirs(self.app.config['UPLOAD_FOLDER'], exist_ok=True)
# Add custom Jinja2 filters
@self.app.template_filter('safe_strftime')
def safe_strftime(date_value, format_string='%Y-%m-%d'):
"""Safely format date values that might be strings or datetime objects."""
if not date_value:
return 'Unknown'
# If it's already a string, return it as-is or try to parse it
if isinstance(date_value, str):
try:
# Try to parse common date formats
from datetime import datetime
# Try ISO format first
if 'T' in date_value:
dt = datetime.fromisoformat(date_value.replace('Z', '+00:00'))
return dt.strftime(format_string)
else:
# Return the string as-is if we can't parse it
return date_value
except:
return date_value
# If it has strftime method (datetime object), use it
try:
return date_value.strftime(format_string)
except:
return str(date_value)
def _init_database(self):
"""Initialize database connection and schema."""
try:
# Initialize database using existing CDocs logic
from db import init_database, get_driver
# Initialize database connection
if not init_database():
raise Exception("Failed to initialize database connection")
# Initialize schema
driver = get_driver()
if not schema_manager.initialize_schema(driver):
raise Exception("Failed to initialize database schema")
# Ensure admin user exists
self._ensure_admin_user()
logger.info("Database initialized successfully")
except Exception as e:
logger.error(f"Database initialization failed: {e}")
raise
def _ensure_admin_user(self):
"""Ensure admin user exists in the system."""
try:
admin_user = DocUser.get_by_username("admin")
if not admin_user:
admin_user = DocUser(
username="admin",
email="admin@company.com",
roles=["admin", "reviewer", "approver"]
)
admin_user.save()
logger.info("Admin user created")
except Exception as e:
logger.error(f"Error ensuring admin user: {e}")
def _setup_authentication(self):
"""Setup Flask-Login authentication."""
self.login_manager = LoginManager()
self.login_manager.init_app(self.app)
self.login_manager.login_view = 'login'
self.login_manager.login_message = 'Please log in to access this page.'
self.login_manager.login_message_category = 'info'
@self.login_manager.user_loader
def load_user(user_id):
try:
doc_user = DocUser(uid=user_id)
return FlaskUser(doc_user) if doc_user and doc_user.uid else None
except Exception as e:
logger.error(f"Error loading user {user_id}: {e}")
return None
def _setup_azure_sso(self):
"""Setup Azure SSO integration using Flask-specific configuration."""
try:
# Import the setup function from azure_auth module
from auth.azure_auth import setup_azure_sso
# Use Flask-specific Azure configuration
azure_config = flask_settings.FLASK_AZURE_CONFIG
# Set environment variables for the azure_auth module to use
os.environ['AZURE_REDIRECT_URI'] = azure_config['redirect_uri']
self.azure_sso = setup_azure_sso()
if self.azure_sso:
logger.info(f"Azure SSO configured successfully with redirect URI: {azure_config['redirect_uri']}")
else:
logger.warning("Azure SSO configuration failed - missing required settings")
self.azure_sso = None
except Exception as e:
logger.warning(f"Azure SSO configuration failed: {e}")
self.azure_sso = None
def _register_routes(self):
"""Register all Flask routes."""
@self.app.route('/')
def index():
"""Main dashboard route."""
if not current_user.is_authenticated:
return redirect(url_for('login'))
try:
# Get dashboard data using existing controllers
documents = self._get_user_documents(current_user.doc_user)
pending_reviews_result = review_controller.get_user_pending_reviews(current_user.doc_user)
pending_approvals_result = approval_controller.get_user_pending_approvals(current_user.doc_user)
recent_activities = self._get_recent_activities()
# Extract lists from controller results
documents = documents or []
# Extract reviews list from result dict
if isinstance(pending_reviews_result, dict) and 'reviews' in pending_reviews_result:
pending_reviews = pending_reviews_result['reviews'] or []
else:
pending_reviews = pending_reviews_result if isinstance(pending_reviews_result, list) else []
# Extract approvals list from result dict
if isinstance(pending_approvals_result, dict) and 'approvals' in pending_approvals_result:
pending_approvals = pending_approvals_result['approvals'] or []
else:
pending_approvals = pending_approvals_result if isinstance(pending_approvals_result, list) else []
recent_activities = recent_activities or []
return render_template('dashboard.html',
documents=documents,
pending_reviews=pending_reviews,
pending_approvals=pending_approvals,
recent_activities=recent_activities,
user=current_user)
except Exception as e:
logger.error(f"Dashboard error: {e}")
flash('Error loading dashboard', 'error')
return render_template('dashboard.html',
documents=[],
pending_reviews=[],
pending_approvals=[],
recent_activities=[],
user=current_user)
@self.app.route('/login', methods=['GET', 'POST'])
def login():
"""Login route."""
if current_user.is_authenticated:
return redirect(url_for('index'))
# Store the next page for Azure SSO redirect
next_page = request.args.get('next')
if next_page:
session['next_page'] = next_page
if request.method == 'POST':
username = request.form.get('username')
password = request.form.get('password')
try:
# Try to authenticate using the authenticate method (same as Panel app)
doc_user = DocUser.authenticate(username, password)
if doc_user:
flask_user = FlaskUser(doc_user)
login_user(flask_user, remember=True)
# Log successful login
from CDocs.utils import audit_trail
audit_trail.log_event(
event_type="USER_LOGIN",
user=doc_user.uid,
details={
"username": username,
"ip_address": request.remote_addr,
"user_agent": request.headers.get('User-Agent', 'Unknown')[:100]
}
)
next_page = request.args.get('next') or session.pop('next_page', None)
return redirect(next_page) if next_page else redirect(url_for('index'))
else:
flash('Invalid username or password', 'error')
except Exception as e:
logger.error(f"Login error: {e}")
flash('Login failed', 'error')
# Get Azure SSO URL if available
azure_auth_url = self._get_azure_auth_url()
return render_template('login.html', azure_auth_url=azure_auth_url)
@self.app.route('/logout')
@login_required
def logout():
"""Logout route."""
logout_user()
flash('You have been logged out', 'info')
return redirect(url_for('login'))
@self.app.route('/documents')
@login_required
def documents():
"""Documents listing route."""
try:
documents = self._get_user_documents(current_user.doc_user)
return render_template('documents.html', documents=documents, user=current_user)
except Exception as e:
logger.error(f"Documents error: {e}")
flash('Error loading documents', 'error')
return render_template('documents.html', documents=[], user=current_user)
@self.app.route('/documents/<doc_id>')
@login_required
def document_detail(doc_id):
"""Document detail route."""
try:
document = document_controller.get_document(doc_id, current_user.doc_user)
if not document:
abort(404)
# Log document view
from CDocs.utils import audit_trail
audit_trail.log_event(
event_type="DOCUMENT_VIEWED",
user=current_user.doc_user.uid,
resource_uid=doc_id,
resource_type="ControlledDocument",
details={
"document_title": document.get('title', 'Unknown'),
"document_number": document.get('docNumber', 'Unknown')
}
)
# Get document versions, reviews, approvals with error handling
versions = []
reviews = []
approvals = []
try:
versions_result = document_controller.get_document_versions(doc_id)
if isinstance(versions_result, dict):
versions = versions_result.get('versions', [])
elif isinstance(versions_result, list):
versions = versions_result
except Exception as e:
logger.error(f"Error loading document versions: {e}")
try:
reviews_result = review_controller.get_document_review_cycles(doc_id)
if isinstance(reviews_result, dict):
reviews = reviews_result.get('review_cycles', [])
elif isinstance(reviews_result, list):
reviews = reviews_result
except Exception as e:
logger.error(f"Error loading document reviews: {e}")
try:
approvals_result = approval_controller.get_document_approval_cycles(doc_id)
if isinstance(approvals_result, dict):
approvals = approvals_result.get('approval_cycles', [])
elif isinstance(approvals_result, list):
approvals = approvals_result
except Exception as e:
logger.error(f"Error loading document approvals: {e}")
# Get audit trail events
audit_events = []
try:
from CDocs.utils.audit_trail import get_document_history
audit_events = get_document_history(doc_id)
except Exception as e:
logger.error(f"Error loading audit trail: {e}")
return render_template('document_detail.html',
document=document,
versions=versions,
reviews=reviews,
approvals=approvals,
audit_events=audit_events,
user=current_user)
except Exception as e:
import traceback
logger.error(f"Document detail error: {e}")
logger.error(f"Full traceback: {traceback.format_exc()}")
flash('Error loading document', 'error')
return redirect(url_for('documents'))
@self.app.route('/documents/upload', methods=['GET', 'POST'])
@login_required
def upload_document():
"""Document upload route."""
if not current_user.has_permission('document_create'):
flash('You do not have permission to upload documents', 'error')
return redirect(url_for('documents'))
if request.method == 'POST':
try:
file = request.files.get('file')
title = request.form.get('title')
description = request.form.get('description')
document_type = request.form.get('document_type')
if not file or not title:
flash('File and title are required', 'error')
return render_template('upload_document.html', user=current_user)
# Use existing document controller logic
document = document_controller.create_document(
file=file,
title=title,
description=description,
document_type=document_type,
creator=current_user.doc_user
)
flash('Document uploaded successfully', 'success')
return redirect(url_for('document_detail', doc_id=document.uid))
except Exception as e:
logger.error(f"Document upload error: {e}")
flash('Error uploading document', 'error')
return render_template('upload_document.html', user=current_user)
@self.app.route('/reviews')
@login_required
def reviews():
"""Reviews listing route."""
try:
# Get pending reviews
pending_reviews = review_controller.get_user_pending_reviews(current_user.doc_user)
# Get completed reviews
completed_reviews = review_controller.get_user_assigned_reviews(
current_user.doc_user,
status_filter=['APPROVED', 'REJECTED', 'APPROVED_WITH_COMMENTS', 'COMPLETED']
)
# Get reviews that need to be closed (for review issuers)
try:
from CDocs.controllers.document_controller import get_documents
docs_result = get_documents(
user=current_user.doc_user,
status="IN_REVIEW",
owner=current_user.doc_user.uid
)
documents = docs_result.get('documents', [])
reviews_to_close = []
for doc in documents:
doc_uid = doc.get('UID', '')
if doc_uid:
doc_review_result = review_controller.get_document_review_cycles(document_uid=doc_uid)
doc_review_cycles = doc_review_result.get('review_cycles', [])
for review_cycle in doc_review_cycles:
if review_cycle.get('status') == 'COMPLETED':
reviews_to_close.append({
'review_cycle': review_cycle,
'document': doc
})
except Exception as e:
logger.error(f"Error getting reviews to close: {e}")
reviews_to_close = []
# Debug logging
logger.info(f"Pending reviews result: {pending_reviews}")
logger.info(f"Completed reviews result: {completed_reviews}")
# Handle response structure
if isinstance(pending_reviews, dict):
pending_reviews = pending_reviews.get('reviews', [])
if isinstance(completed_reviews, dict):
completed_reviews = completed_reviews.get('assignments', []) # Note: completed uses 'assignments' key
# Ensure they are lists
pending_reviews = pending_reviews if isinstance(pending_reviews, list) else []
completed_reviews = completed_reviews if isinstance(completed_reviews, list) else []
return render_template('reviews.html',
pending_reviews=pending_reviews,
completed_reviews=completed_reviews,
reviews_to_close=reviews_to_close,
user=current_user)
except Exception as e:
logger.error(f"Reviews error: {e}")
import traceback
logger.error(f"Traceback: {traceback.format_exc()}")
flash('Error loading reviews', 'error')
return render_template('reviews.html',
pending_reviews=[],
completed_reviews=[],
reviews_to_close=[],
user=current_user)
@self.app.route('/approvals')
@login_required
def approvals():
"""Approvals listing route."""
try:
from CDocs.controllers import approval_controller
# Get pending approvals
pending_approvals = approval_controller.get_user_assigned_approvals(
current_user.doc_user,
status_filter=['PENDING', 'ACTIVE']
)
# Extract assignments from the result
pending_approvals = pending_approvals.get('assignments', []) if pending_approvals else []
# Get completed approvals
completed_approvals = approval_controller.get_user_assigned_approvals(
current_user.doc_user,
status_filter=['APPROVED', 'REJECTED', 'APPROVED_WITH_COMMENTS', 'COMPLETED']
)
# Get approvals that need to be closed (for approval issuers)
try:
from CDocs.controllers.document_controller import get_documents
docs_result = get_documents(
user=current_user.doc_user,
status="IN_APPROVAL",
owner=current_user.doc_user.uid
)
documents = docs_result.get('documents', [])
approvals_to_close = []
for doc in documents:
doc_uid = doc.get('UID', '')
if doc_uid:
doc_approval_result = approval_controller.get_document_approval_cycles(document_uid=doc_uid)
doc_approval_cycles = doc_approval_result.get('approval_cycles', [])
for approval_cycle in doc_approval_cycles:
if approval_cycle.get('status') == 'COMPLETED':
approvals_to_close.append({
'approval_cycle': approval_cycle,
'document': doc
})
except Exception as e:
logger.error(f"Error getting approvals to close: {e}")
approvals_to_close = []
# Process data structures to ensure consistency
pending_approvals = pending_approvals if isinstance(pending_approvals, list) else []
completed_approvals = completed_approvals if isinstance(completed_approvals, list) else []
return render_template('approvals.html',
pending_approvals=pending_approvals,
completed_approvals=completed_approvals,
approvals_to_close=approvals_to_close,
user=current_user)
except Exception as e:
logger.error(f"Approvals error: {e}")
import traceback
logger.error(f"Traceback: {traceback.format_exc()}")
flash('Error loading approvals', 'error')
return render_template('approvals.html',
pending_approvals=[],
completed_approvals=[],
approvals_to_close=[],
user=current_user)
@self.app.route('/approval/<approval_id>')
@login_required
def approval_detail(approval_id):
"""Approval detail route."""
try:
from CDocs.controllers import approval_controller
# Get approval cycle details
approval = approval_controller.get_approval_cycle(
approval_uid=approval_id,
include_comments=True,
include_document=True
)
if not approval:
flash('Approval not found', 'error')
return redirect(url_for('approvals'))
# Get user assignment
user_assignment = None
approver_assignments = approval.get('approver_assignments', [])
if current_user.doc_user:
for assignment in approver_assignments:
if assignment.get('approver_uid') == current_user.doc_user.uid:
user_assignment = assignment
break
# Get comments
comments = approval.get('comments', [])
# Extract document for consistent template usage
document = approval.get('document', {})
return render_template('approval_detail.html',
approval=approval,
document=document,
user_assignment=user_assignment,
approver_assignments=approver_assignments,
comments=comments,
user=current_user)
except Exception as e:
logger.error(f"Error in approval detail: {e}")
flash(f'Error loading approval: {str(e)}', 'error')
return redirect(url_for('approvals'))
@self.app.route('/submit_approval', methods=['POST'])
@login_required
def submit_approval():
"""Submit an approval decision."""
try:
approval_uid = request.form.get('approval_uid')
decision = request.form.get('decision')
comments = request.form.get('comments', '')
if not approval_uid or not decision:
flash('Missing required fields', 'error')
return redirect(url_for('approval_detail', approval_id=approval_uid or ''))
from CDocs.controllers import approval_controller
# Submit the approval
result = approval_controller.complete_approval(
user=current_user.doc_user,
approval_uid=approval_uid,
decision=decision,
comments=comments if comments else None
)
if result.get('success'):
flash('Approval submitted successfully', 'success')
else:
flash(f'Error submitting approval: {result.get("message", "Unknown error")}', 'error')
return redirect(url_for('approval_detail', approval_id=approval_uid))
except Exception as e:
logger.error(f"Error submitting approval: {e}")
flash(f'Error submitting approval: {str(e)}', 'error')
return redirect(url_for('approvals'))
@self.app.route('/close_approval', methods=['POST'])
@login_required
def close_approval():
"""Close a completed approval cycle."""
try:
approval_uid = request.form.get('approval_uid')
update_status = request.form.get('update_status', 'true').lower() == 'true'
target_status = request.form.get('target_status', 'DRAFT')
if not approval_uid:
flash('Approval ID is required', 'error')
return redirect(url_for('approvals'))
from CDocs.controllers import approval_controller
# Close the approval cycle
result = approval_controller.close_approval_cycle(
user=current_user.doc_user,
approval_uid=approval_uid,
update_document_status=update_status,
target_status=target_status
)
if result.get('success'):
flash('Approval cycle closed successfully', 'success')
else:
flash(f'Error closing approval: {result.get("message", "Unknown error")}', 'error')
return redirect(url_for('approvals'))
except Exception as e:
logger.error(f"Error closing approval: {e}")
flash(f'Error closing approval: {str(e)}', 'error')
return redirect(url_for('approvals'))
@self.app.route('/extend_approval_deadline', methods=['POST'])
@login_required
def extend_approval_deadline():
"""Extend approval deadline."""
try:
approval_uid = request.form.get('approval_uid')
new_due_date_str = request.form.get('new_due_date')
reason = request.form.get('reason')
if not approval_uid or not new_due_date_str:
flash('Approval ID and new due date are required', 'error')
return redirect(url_for('approval_detail', approval_id=approval_uid or ''))
# Parse the datetime
from datetime import datetime
new_due_date = datetime.fromisoformat(new_due_date_str)
from CDocs.controllers import approval_controller
# Extend the approval deadline
result = approval_controller.extend_approval_deadline(
user=current_user.doc_user,
approval_uid=approval_uid,
new_due_date=new_due_date,
reason=reason
)
if result.get('success'):
flash('Approval deadline extended successfully', 'success')
else:
flash(f'Error extending deadline: {result.get("message", "Unknown error")}', 'error')
return redirect(url_for('approval_detail', approval_id=approval_uid))
except Exception as e:
logger.error(f"Error extending approval deadline: {e}")
flash(f'Error extending deadline: {str(e)}', 'error')
return redirect(url_for('approvals'))
@self.app.route('/cancel_approval', methods=['POST'])
@login_required
def cancel_approval():
"""Cancel an approval cycle."""
try:
approval_uid = request.form.get('approval_uid')
reason = request.form.get('reason')
if not approval_uid or not reason:
flash('Approval ID and cancellation reason are required', 'error')
return redirect(url_for('approval_detail', approval_id=approval_uid or ''))
from CDocs.controllers import approval_controller
# Cancel the approval cycle
result = approval_controller.cancel_approval_cycle(
user=current_user.doc_user,
approval_uid=approval_uid,
reason=reason
)
if result.get('success'):
flash('Approval cycle canceled successfully', 'success')
return redirect(url_for('approvals')) # Go back to approvals list
else:
flash(f'Error canceling approval: {result.get("message", "Unknown error")}', 'error')
return redirect(url_for('approval_detail', approval_id=approval_uid))
except Exception as e:
logger.error(f"Error canceling approval: {e}")
flash(f'Error canceling approval: {str(e)}', 'error')
return redirect(url_for('approvals'))
@self.app.route('/add_approver_to_approval', methods=['POST'])
@login_required
def add_approver_to_approval():
"""Add an approver to an active approval cycle."""
try:
approval_uid = request.form.get('approval_uid')
approver_uid = request.form.get('approver_uid')
instructions = request.form.get('instructions', '')
if not approval_uid or not approver_uid:
flash('Approval ID and approver are required', 'error')
return redirect(url_for('approval_detail', approval_id=approval_uid or ''))
from CDocs.controllers import approval_controller
from CDocs.models.user_extensions import DocUser
# Get the approver user object
approver = DocUser(uid=approver_uid)
if not approver:
flash('Selected approver not found', 'error')
return redirect(url_for('approval_detail', approval_id=approval_uid))
# Add the approver to the approval cycle
result = approval_controller.add_approver_to_active_approval(
user=current_user.doc_user,
approval_uid=approval_uid,
approver=approver,
instructions=instructions
)
if result.get('success'):
flash('Approver added successfully', 'success')
else:
flash(f'Error adding approver: {result.get("message", "Unknown error")}', 'error')
return redirect(url_for('approval_detail', approval_id=approval_uid))
except Exception as e:
logger.error(f"Error adding approver: {e}")
flash(f'Error adding approver: {str(e)}', 'error')
return redirect(url_for('approvals'))
@self.app.route('/upload_new_version', methods=['POST'])
@login_required
def upload_new_version():
"""Upload a new version of a document."""
try:
logger.info("Upload new version request received")
document_uid = request.form.get('document_uid')
comment = request.form.get('comment', '')
file = request.files.get('file')
logger.info(f"Document UID: {document_uid}")
logger.info(f"Comment: {comment}")
logger.info(f"File: {file.filename if file else 'No file'}")
if not document_uid:
logger.error("Document UID is missing")
return jsonify({'success': False, 'error': 'Document UID is required'})
if not file:
logger.error("File is missing")
return jsonify({'success': False, 'error': 'File is required'})
# Read file content
file_content = file.read()
file_name = file.filename
logger.info(f"File size: {len(file_content)} bytes")
# Create document version using controller
from CDocs.controllers.document_controller import create_document_version
logger.info("Calling create_document_version...")
result = create_document_version(
user=current_user.doc_user,
document_uid=document_uid,
file_content=file_content,
file_name=file_name,
comment=comment
)
logger.info(f"Result from create_document_version: {result}")
if result and result.get('UID'):
logger.info("Version created successfully")
return jsonify({
'success': True,
'message': 'New version uploaded successfully',
'version_uid': result.get('UID')
})
else:
logger.error(f"Version creation failed: {result}")
return jsonify({'success': False, 'error': 'Failed to upload version'})
except Exception as e:
logger.error(f"Upload version error: {e}")
import traceback
logger.error(traceback.format_exc())
return jsonify({'success': False, 'error': str(e)}), 500
@self.app.route('/create_document', methods=['GET', 'POST'])
@login_required
def create_document():
"""Create a new document or show creation form."""
if request.method == 'GET':
# Show create document form
from CDocs.config import settings
# Get available departments and document types
departments = list(settings.DEPARTMENTS.keys()) if hasattr(settings, 'DEPARTMENTS') else []
doc_types = list(settings.DOCUMENT_TYPES.keys()) if hasattr(settings, 'DOCUMENT_TYPES') else []
# Get available storage locations
from CDocs.db import db_operations
try:
# Get available Rootfolders and their paths
rootfolders_query = db_operations.run_query("""
MATCH (r:Rootfolder)
RETURN r.UID as uid, r.Name as name, r.Path as path
ORDER BY r.Name
""")
# Get available Subfolders and their paths
subfolders_query = db_operations.run_query("""
MATCH (s:Subfolder)
RETURN s.UID as uid, s.Name as name, s.Path as path
ORDER BY s.Path
""")
storage_locations = []
# Add Rootfolders
if rootfolders_query:
for folder in rootfolders_query:
storage_locations.append({
'uid': folder['uid'],
'name': folder['name'],
'path': folder['path'] or f"/{folder['name']}",
'type': 'Rootfolder'
})
# Add Subfolders
if subfolders_query:
for folder in subfolders_query:
storage_locations.append({
'uid': folder['uid'],
'name': folder['name'],
'path': folder['path'],
'type': 'Subfolder'
})
except Exception as e:
logger.error(f"Error getting storage locations: {e}")
storage_locations = []
# Get existing documents for cloning option
try:
from CDocs.controllers.document_controller import get_documents
docs_result = get_documents(
user=current_user.doc_user,
limit=100
)
existing_docs = docs_result.get('documents', []) if docs_result.get('success') else []
except Exception as e:
logger.error(f"Error getting documents for clone options: {e}")
existing_docs = []
return render_template(
'create_document.html',
departments=departments,
doc_types=doc_types,
existing_docs=existing_docs,
storage_locations=storage_locations
)
elif request.method == 'POST':
# Handle document creation
try:
title = request.form.get('title')
doc_type = request.form.get('doc_type')
department = request.form.get('department')
content = request.form.get('content', '')
clone_from = request.form.get('clone_from')
include_content = request.form.get('include_content') == 'true'
custom_doc_number = request.form.get('custom_doc_number')
storage_location = request.form.get('storage_location') # This will be the folder path
if not title:
return jsonify({'success': False, 'error': 'Title is required'})
if clone_from:
# Clone from existing document
from CDocs.controllers.document_controller import clone_document
result = clone_document(
user=current_user.doc_user,
document_uid=clone_from,
new_title=title,
doc_type=doc_type,
department=department,
include_content=include_content,
custom_doc_number=custom_doc_number,
additional_properties={'custom_path': storage_location} if storage_location else None
)
else:
# Create new document
from CDocs.controllers.document_controller import create_document as create_doc
# Create the basic document first
result = create_doc(
user=current_user.doc_user,
title=title,
doc_text=content,
doc_type=doc_type,
department=department,
doc_number=custom_doc_number,
status='DRAFT',
custom_path=storage_location if storage_location else None
)
# If document creation succeeded and there's a file, upload it as initial version
if result and result.get('status') == 'success':
document_file = request.files.get('document_file')
if document_file and document_file.filename:
try:
from CDocs.controllers.document_controller import create_document_version
# Read file content
file_content = document_file.read()
# Create initial version with the uploaded file
version_result = create_document_version(
user=current_user.doc_user,
document_uid=result['document']['uid'],
file_content=file_content,
file_name=document_file.filename,
comment="Initial document version"
)
if not version_result or not version_result.get('UID'):
logger.warning(f"Document created but file upload failed")
# Don't fail the entire operation, just log the warning
except Exception as file_error:
logger.error(f"Error uploading initial file: {file_error}")
# Don't fail the entire operation, just log the error
# Check for successful result based on the operation type
is_success = False
if clone_from:
# Clone operation returns 'success' key
is_success = result and result.get('success')
else:
# Create operation returns 'status' key
is_success = result and result.get('status') == 'success'
if is_success:
document_uid = result.get('document', {}).get('uid') or result.get('document_uid')
doc_number = result.get('document', {}).get('doc_number') or result.get('document_number')
return jsonify({
'success': True,
'message': f'Document {doc_number} created successfully',
'document_uid': document_uid,
'redirect_url': url_for('document_detail', doc_id=document_uid)
})
else:
error_msg = result.get('message', 'Failed to create document') if result else 'Creation failed'
return jsonify({'success': False, 'error': error_msg})
except Exception as e:
logger.error(f"Create document error: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@self.app.route('/document_access/<document_uid>')
@login_required
def document_access(document_uid):
"""Get document access information for view/edit buttons."""
try:
from CDocs.models.document import ControlledDocument
from CDocs.controllers.share_controller import get_user_access_url
# Get document and current version
document = ControlledDocument(uid=document_uid)
if not document:
return jsonify({'success': False, 'error': 'Document not found'})
version = document.current_version
if not version:
return jsonify({'success': False, 'error': 'No document version available'})
# Get user access information
result = get_user_access_url(version, current_user.doc_user.uid)
if result.get('success'):
return jsonify({
'success': True,
'can_view': True,
'can_edit': result.get('write_access', False),
'view_url': result.get('share_url', ''),
'edit_url': result.get('share_url', '') if result.get('write_access') else '',
'status': document.get_status_name(),
'access_mode': 'Edit Mode' if result.get('write_access') else 'Read Only',
'roles': {
'is_owner': result.get('is_owner', False),
'is_reviewer': result.get('is_reviewer', False),
'is_approver': result.get('is_approver', False)
}
})
else:
return jsonify({
'success': False,
'error': result.get('message', 'Access denied')
})
except Exception as e:
logger.error(f"Document access error: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@self.app.route('/extend_review_deadline', methods=['POST'])
@login_required
def extend_review_deadline():
"""Extend review deadline."""
try:
review_uid = request.form.get('review_uid')
new_due_date_str = request.form.get('new_due_date')
reason = request.form.get('reason')
if not review_uid or not new_due_date_str:
flash('Review ID and new due date are required', 'error')
return redirect(url_for('review_detail', review_id=review_uid or ''))
# Parse the datetime
from datetime import datetime
new_due_date = datetime.fromisoformat(new_due_date_str)
# Extend the review deadline
result = review_controller.extend_review_deadline(
user=current_user.doc_user,
review_uid=review_uid,
new_due_date=new_due_date,
reason=reason
)
if result.get('success'):
flash('Review deadline extended successfully', 'success')
else:
flash(f'Error extending deadline: {result.get("message", "Unknown error")}', 'error')
return redirect(url_for('review_detail', review_id=review_uid))
except Exception as e:
logger.error(f"Error extending review deadline: {e}")
flash(f'Error extending deadline: {str(e)}', 'error')
return redirect(url_for('reviews'))
@self.app.route('/cancel_review', methods=['POST'])
@login_required
def cancel_review():
"""Cancel a review cycle."""
try:
review_uid = request.form.get('review_uid')
reason = request.form.get('reason')
if not review_uid or not reason:
flash('Review ID and cancellation reason are required', 'error')
return redirect(url_for('review_detail', review_id=review_uid or ''))
# Cancel the review cycle
result = review_controller.cancel_review_cycle(
user=current_user.doc_user,
review_uid=review_uid,
reason=reason
)
if result.get('success'):
flash('Review cycle canceled successfully', 'success')
return redirect(url_for('reviews')) # Go back to reviews list
else:
flash(f'Error canceling review: {result.get("message", "Unknown error")}', 'error')
return redirect(url_for('review_detail', review_id=review_uid))
except Exception as e:
logger.error(f"Error canceling review: {e}")
flash(f'Error canceling review: {str(e)}', 'error')
return redirect(url_for('reviews'))
@self.app.route('/close_review', methods=['POST'])
@login_required
def close_review():
"""Close a completed review cycle."""
try:
review_uid = request.form.get('review_uid')
update_status = request.form.get('update_status', 'true').lower() == 'true'
target_status = request.form.get('target_status', 'DRAFT')
if not review_uid:
flash('Review ID is required', 'error')
return redirect(url_for('reviews'))
# Close the review cycle
result = review_controller.close_review_cycle(
user=current_user.doc_user,
review_uid=review_uid,
update_document_status=update_status,
target_status=target_status
)
if result.get('success'):
flash('Review cycle closed successfully', 'success')
else:
flash(f'Error closing review: {result.get("message", "Unknown error")}', 'error')
return redirect(url_for('reviews'))
except Exception as e:
logger.error(f"Error closing review: {e}")
flash(f'Error closing review: {str(e)}', 'error')
return redirect(url_for('reviews'))
@self.app.route('/review/<review_id>')
@login_required
def review_detail(review_id):
"""Review detail route."""
try:
from CDocs.controllers.review_controller import get_review_cycle
logger.info(f"Loading review detail for review_id: {review_id}")
# Get review data using the correct function
review = get_review_cycle(
review_uid=review_id,
include_comments=True,
include_document=True
)
logger.info(f"Review data retrieved: {bool(review)}")
if not review:
logger.error(f"Review not found: {review_id}")
flash('Review not found', 'error')
return redirect(url_for('reviews'))
# Extract data for template
document = review.get('document', {})
reviewer_assignments = review.get('reviewer_assignments', [])
comments = review.get('comments', [])
# Check if user can review
user_assignment = None
if current_user and current_user.doc_user:
user_assignment = next((r for r in reviewer_assignments
if r.get('reviewer_uid') == current_user.doc_user.uid), None)
logger.info(f"User assignment found: {bool(user_assignment)}")
if user_assignment:
logger.info(f"User assignment details: {user_assignment}")
logger.info(f"Assignment status: {user_assignment.get('status')}")
logger.info(f"Current user UID: {current_user.doc_user.uid if current_user.doc_user else 'None'}")
else:
logger.info(f"No user assignment found for user UID: {current_user.doc_user.uid if current_user.doc_user else 'None'}")
logger.info(f"Available reviewer UIDs: {[r.get('reviewer_uid') for r in reviewer_assignments]}")
return render_template('review_detail.html',
review=review,
document=document,
reviewer_assignments=reviewer_assignments,
comments=comments,
user_assignment=user_assignment,
user=current_user)
except Exception as e:
logger.error(f"Review detail error: {e}")
import traceback
logger.error(f"Review detail traceback: {traceback.format_exc()}")
flash('Error loading review', 'error')
return redirect(url_for('reviews'))
@self.app.route('/document/<document_uid>/review/<review_id>')
@login_required
def document_review_detail(document_uid, review_id):
"""Comprehensive review detail from document history."""
try:
from CDocs.controllers.review_controller import get_review_cycle
# Get review data using the correct function
review = get_review_cycle(
review_uid=review_id,
include_comments=True,
include_document=True
)
if not review:
flash('Review not found or you do not have permission to view it.', 'error')
return redirect(url_for('document_detail', doc_id=document_uid))
# Use the comprehensive review detail template
document = review.get('document', {})
reviewer_assignments = review.get('reviewer_assignments', [])
comments = review.get('comments', [])
# Check if user can review
user_assignment = None
if current_user and current_user.doc_user:
user_assignment = next((r for r in reviewer_assignments
if r.get('reviewer_uid') == current_user.doc_user.uid), None)
return render_template('review_detail.html',
review=review,
document=document,
reviewer_assignments=reviewer_assignments,
comments=comments,
user_assignment=user_assignment,
user=current_user)
except Exception as e:
logger.error(f"Document review detail error: {e}")
flash('Error loading review details', 'error')
return redirect(url_for('document_detail', doc_id=document_uid))
@self.app.route('/review/<review_id>/submit', methods=['POST'])
@login_required
def submit_review(review_id):
"""Submit review route."""
try:
from CDocs.controllers.review_controller import add_review_comment, complete_review
decision = request.form.get('decision')
comments = request.form.get('comments')
section = request.form.get('section')
if not decision:
flash('Please select a review decision', 'error')
return redirect(url_for('review_detail', review_id=review_id))
# Add comment if provided (like Panel app)
if comments:
try:
add_result = add_review_comment(
user=current_user.doc_user,
review_uid=review_id,
comment_text=comments,
comment_type="GENERAL",
page_number=None,
location_info={"section": section} if section else None
)
if not add_result or not add_result.get('success'):
error_msg = add_result.get('message', 'Failed to add comment')
flash(f'Warning: Could not add comment: {error_msg}', 'warning')
except Exception as comment_error:
logger.error(f"Error adding review comment: {comment_error}")
flash(f'Warning: Could not add comment, but proceeding with review submission', 'warning')
# Complete the review (like Panel app)
complete_result = complete_review(
user=current_user.doc_user,
review_uid=review_id,
decision=decision,
comments=comments if comments else ""
)
if complete_result and complete_result.get('success'):
flash('Review submitted successfully', 'success')
return redirect(url_for('reviews'))
else:
error_msg = complete_result.get('message', 'Failed to submit review')
flash(f'Error: {error_msg}', 'error')
return redirect(url_for('review_detail', review_id=review_id))
except Exception as e:
logger.error(f"Review submission error: {e}")
flash('Error submitting review', 'error')
return redirect(url_for('review_detail', review_id=review_id))
@self.app.route('/approvals/<approval_id>')
@login_required
def approvals_detail(approval_id):
"""Approval detail route from approvals page."""
try:
from CDocs.controllers.approval_controller import get_approval_cycle
# Get approval data using the correct function
approval = get_approval_cycle(
approval_uid=approval_id,
include_comments=True,
include_document=True
)
if not approval:
abort(404)
return render_template('approval_detail.html', approval=approval, user=current_user)
except Exception as e:
logger.error(f"Approval detail error: {e}")
flash('Error loading approval', 'error')
return redirect(url_for('approvals'))
@self.app.route('/document/<document_uid>/approval/<approval_id>')
@login_required
def document_approval_detail(document_uid, approval_id):
"""Comprehensive approval detail from document history."""
try:
from CDocs.controllers.approval_controller import get_approval_cycle
# Get approval data using the correct function
approval = get_approval_cycle(
approval_uid=approval_id,
include_comments=True,
include_document=True
)
if not approval:
flash('Approval not found or you do not have permission to view it.', 'error')
return redirect(url_for('document_detail', doc_id=document_uid))
# Get user assignment by checking approver_assignments
user_assignment = None
approver_assignments = approval.get('approver_assignments', [])
if current_user.doc_user:
for assignment in approver_assignments:
if assignment.get('approver_uid') == current_user.doc_user.uid:
user_assignment = assignment
break
# Get comments
comments = approval.get('comments', [])
# Use the full approval detail template with issuer management capabilities
return render_template('approval_detail.html',
approval=approval,
document=approval.get('document', {}),
user=current_user,
user_assignment=user_assignment,
approver_assignments=approver_assignments,
comments=comments)
except Exception as e:
logger.error(f"Document approval detail error: {e}")
flash('Error loading approval details', 'error')
return redirect(url_for('document_detail', doc_id=document_uid))
@self.app.route('/approvals/<approval_id>/submit', methods=['POST'])
@login_required
def submit_approval_decision(approval_id):
"""Submit approval route."""
try:
comments = request.form.get('comments')
decision = request.form.get('decision')
approval_controller.submit_approval(
approval_id=approval_id,
approver=current_user.doc_user,
comments=comments,
decision=decision
)
flash('Approval submitted successfully', 'success')
return redirect(url_for('approvals'))
except Exception as e:
logger.error(f"Approval submission error: {e}")
flash('Error submitting approval', 'error')
return redirect(url_for('approval_detail', approval_id=approval_id))
@self.app.route('/training')
@login_required
def training():
"""Training dashboard route."""
try:
training_records = training_controller.get_user_training_dashboard(user=current_user.doc_user)
required_training = training_records.get('required_training', []) if training_records else []
completed_training = training_records.get('completed_training', []) if training_records else []
return render_template('training.html',
training_records=completed_training,
required_training=required_training,
user=current_user)
except Exception as e:
logger.error(f"Training error: {e}")
flash('Error loading training data', 'error')
return render_template('training.html',
training_records=[],
required_training=[],
user=current_user)
@self.app.route('/training/<training_id>')
@login_required
def training_detail(training_id):
"""Training detail route."""
try:
training_record = training_controller.get_training_assignment(current_user.doc_user, training_id)
if not training_record:
flash('Training assignment not found', 'error')
return redirect(url_for('training_dashboard'))
return render_template('training_detail.html',
training_record=training_record,
user=current_user)
except Exception as e:
logger.error(f"Training detail error: {e}")
flash('Error loading training record', 'error')
return redirect(url_for('training_dashboard'))
@self.app.route('/admin')
@login_required
def admin():
"""Admin panel route."""
if not current_user.has_role('admin'):
flash('Access denied: Admin privileges required', 'error')
return redirect(url_for('index'))
try:
# Get admin dashboard data
system_stats = admin_controller.get_system_stats()
recent_activities = admin_controller.get_user_activity(limit=20)
return render_template('admin_enhanced.html',
system_stats=system_stats,
recent_activities=recent_activities,
user=current_user)
except Exception as e:
logger.error(f"Admin panel error: {e}")
flash('Error loading admin panel', 'error')
return render_template('admin_enhanced.html', user=current_user)
@self.app.route('/admin/users')
@login_required
def admin_users():
"""Admin users management route."""
if not current_user.has_role('admin'):
flash('Access denied: Admin privileges required', 'error')
return redirect(url_for('index'))
try:
users = admin_controller.get_users()
return render_template('admin_users.html', users=users, user=current_user)
except Exception as e:
logger.error(f"Admin users error: {e}")
flash('Error loading users', 'error')
return render_template('admin_users.html', users=[], user=current_user)
@self.app.route('/api/documents')
@login_required
def api_documents():
"""API endpoint for documents."""
try:
documents = document_controller.search_documents(owner=current_user.doc_user.uid, user=current_user.doc_user)
return jsonify([doc for doc in documents]) # documents are already dictionaries
except Exception as e:
logger.error(f"API documents error: {e}")
return jsonify({'error': str(e)}), 500
@self.app.route('/api/notifications')
@login_required
def api_notifications():
"""API endpoint for user notifications."""
try:
notifications = self.notification_manager.get_user_notifications(current_user.doc_user)
return jsonify([notif.to_dict() for notif in notifications])
except Exception as e:
logger.error(f"API notifications error: {e}")
return jsonify({'error': str(e)}), 500
# Admin API Routes for dynamic tab content
@self.app.route('/admin/api/users')
@login_required
def admin_api_users():
"""API endpoint for admin users tab content."""
if not current_user.has_role('admin'):
return jsonify({'success': False, 'error': 'Admin privileges required'}), 403
try:
users = admin_controller.get_users()
# Generate HTML content for users tab
users_html = self._generate_users_tab_content(users)
return jsonify({'success': True, 'content': users_html})
except Exception as e:
logger.error(f"Admin API users error: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@self.app.route('/admin/api/documents')
@login_required
def admin_api_documents():
"""API endpoint for admin documents tab content."""
if not current_user.has_role('admin'):
return jsonify({'success': False, 'error': 'Admin privileges required'}), 403
try:
documents = admin_controller.get_all_documents()
# Generate HTML content for documents tab
documents_html = self._generate_documents_tab_content(documents)
return jsonify({'success': True, 'content': documents_html})
except Exception as e:
logger.error(f"Admin API documents error: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@self.app.route('/admin/api/training')
@login_required
def admin_api_training():
"""API endpoint for admin training tab content."""
if not current_user.has_role('admin'):
return jsonify({'success': False, 'error': 'Admin privileges required'}), 403
try:
training_data = training_controller.get_training_overview(current_user.doc_user)
# Generate HTML content for training tab
training_html = self._generate_training_tab_content(training_data)
return jsonify({'success': True, 'content': training_html})
except Exception as e:
logger.error(f"Admin API training error: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@self.app.route('/admin/api/system')
@login_required
def admin_api_system():
"""API endpoint for admin system tab content."""
if not current_user.has_role('admin'):
return jsonify({'success': False, 'error': 'Admin privileges required'}), 403
try:
system_config = admin_controller.get_system_configuration()
# Generate HTML content for system tab
system_html = self._generate_system_tab_content(system_config)
return jsonify({'success': True, 'content': system_html})
except Exception as e:
logger.error(f"Admin API system error: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@self.app.route('/admin/api/audit')
@login_required
def admin_api_audit():
"""API endpoint for admin audit tab content."""
if not current_user.has_role('admin'):
return jsonify({'success': False, 'error': 'Admin privileges required'}), 403
try:
audit_events = admin_controller.get_audit_trail(limit=100)
# Generate HTML content for audit tab
audit_html = self._generate_audit_tab_content(audit_events)
return jsonify({'success': True, 'content': audit_html})
except Exception as e:
logger.error(f"Admin API audit error: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
# Admin Action Routes
@self.app.route('/admin/backup', methods=['POST'])
@login_required
def admin_backup():
"""Create database backup."""
if not current_user.has_role('admin'):
return jsonify({'success': False, 'error': 'Admin privileges required'}), 403
try:
# In a real implementation, this would create a database backup
# For now, we'll just log the action
logger.info(f"Admin {current_user.username} initiated database backup")
return jsonify({'success': True, 'message': 'Backup completed successfully'})
except Exception as e:
logger.error(f"Admin backup error: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@self.app.route('/admin/maintenance', methods=['POST'])
@login_required
def admin_maintenance():
"""Toggle maintenance mode."""
if not current_user.has_role('admin'):
return jsonify({'success': False, 'error': 'Admin privileges required'}), 403
try:
data = request.get_json()
enabled = data.get('enabled', False)
# In a real implementation, this would update system configuration
logger.info(f"Admin {current_user.username} {'enabled' if enabled else 'disabled'} maintenance mode")
return jsonify({'success': True, 'message': f"Maintenance mode {'enabled' if enabled else 'disabled'}"})
except Exception as e:
logger.error(f"Admin maintenance error: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@self.app.route('/admin/users/create')
@login_required
def admin_create_user():
"""Create user page."""
if not current_user.has_role('admin'):
flash('Access denied: Admin privileges required', 'error')
return redirect(url_for('index'))
return render_template('admin_create_user.html', user=current_user)
@self.app.route('/admin/reports')
@login_required
def admin_reports():
"""Admin reports page."""
if not current_user.has_role('admin'):
flash('Access denied: Admin privileges required', 'error')
return redirect(url_for('index'))
return render_template('admin_reports.html', user=current_user)
@self.app.route('/admin/export/documents')
@login_required
def admin_export_documents():
"""Export documents data."""
if not current_user.has_role('admin'):
return jsonify({'error': 'Admin privileges required'}), 403
try:
# In a real implementation, this would generate and return a CSV/Excel file
logger.info(f"Admin {current_user.username} exported documents data")
flash('Document export completed', 'success')
return redirect(url_for('admin'))
except Exception as e:
logger.error(f"Admin export documents error: {e}")
flash('Export failed', 'error')
return redirect(url_for('admin'))
@self.app.route('/admin/export/training')
@login_required
def admin_export_training():
"""Export training data."""
if not current_user.has_role('admin'):
return jsonify({'error': 'Admin privileges required'}), 403
try:
# In a real implementation, this would generate and return training data
logger.info(f"Admin {current_user.username} exported training data")
flash('Training export completed', 'success')
return redirect(url_for('admin'))
except Exception as e:
logger.error(f"Admin export training error: {e}")
flash('Export failed', 'error')
return redirect(url_for('admin'))
@self.app.route('/admin/export/audit')
@login_required
def admin_export_audit():
"""Export audit trail."""
if not current_user.has_role('admin'):
return jsonify({'error': 'Admin privileges required'}), 403
try:
# In a real implementation, this would generate and return audit data
logger.info(f"Admin {current_user.username} exported audit trail")
flash('Audit export completed', 'success')
return redirect(url_for('admin'))
except Exception as e:
logger.error(f"Admin export audit error: {e}")
flash('Export failed', 'error')
return redirect(url_for('admin'))
@self.app.route('/admin/documents/bulk')
@login_required
def admin_bulk_documents():
"""Bulk document actions page."""
if not current_user.has_role('admin'):
flash('Access denied: Admin privileges required', 'error')
return redirect(url_for('index'))
return render_template('admin_bulk_documents.html', user=current_user)
# Training Management Routes
@self.app.route('/admin/training/plans/create')
@login_required
def admin_create_training_plan():
"""Create training plan page."""
if not current_user.has_role('admin'):
flash('Access denied: Admin privileges required', 'error')
return redirect(url_for('index'))
return render_template('admin_create_training_plan.html', user=current_user)
@self.app.route('/admin/training/plans/<plan_id>')
@login_required
def view_training_plan(plan_id):
"""View training plan details."""
if not current_user.has_role('admin'):
flash('Access denied: Admin privileges required', 'error')
return redirect(url_for('index'))
try:
# Get document details for this training plan
from CDocs.controllers import document_controller
document = document_controller.get_document(plan_id)
if not document:
flash('Training plan not found', 'error')
return redirect(url_for('admin'))
# Get training overview to get plan data
training_overview = training_controller.get_training_overview(current_user.doc_user)
plan_data = None
for plan in training_overview.get('plans', []):
if plan.get('uid') == plan_id:
plan_data = plan
break
if not plan_data:
flash('Training plan data not found', 'error')
return redirect(url_for('admin'))
return render_template('admin_view_training_plan.html',
plan=plan_data,
document=document,
user=current_user)
except Exception as e:
logger.error(f"View training plan error: {e}")
flash('Error loading training plan', 'error')
return redirect(url_for('admin'))
@self.app.route('/admin/training/plans/<plan_id>/edit')
@login_required
def edit_training_plan(plan_id):
"""Edit training plan page."""
if not current_user.has_role('admin'):
flash('Access denied: Admin privileges required', 'error')
return redirect(url_for('index'))
try:
# Get document details for this training plan
from CDocs.controllers import document_controller
document = document_controller.get_document(plan_id)
if not document:
flash('Training plan not found', 'error')
return redirect(url_for('admin'))
# Get training overview to get plan data
training_overview = training_controller.get_training_overview(current_user.doc_user)
plan_data = None
for plan in training_overview.get('plans', []):
if plan.get('uid') == plan_id:
plan_data = plan
break
if not plan_data:
flash('Training plan data not found', 'error')
return redirect(url_for('admin'))
return render_template('admin_edit_training_plan.html',
plan=plan_data,
document=document,
user=current_user)
except Exception as e:
logger.error(f"Edit training plan error: {e}")
flash('Error loading training plan for editing', 'error')
return redirect(url_for('admin'))
@self.app.route('/admin/training/plans/<plan_id>/update', methods=['POST'])
@login_required
def update_training_plan(plan_id):
"""Update training plan settings."""
if not current_user.has_role('admin'):
return jsonify({'success': False, 'error': 'Access denied'}), 403
try:
data = request.get_json()
validity_days = int(data.get('validity_days', 365))
quiz_required = data.get('quiz_required', False)
instructions = data.get('instructions', '')
# Update training settings for document
result = training_controller.enable_document_training(
user=current_user.doc_user,
document_uid=plan_id,
validity_days=validity_days,
quiz_required=quiz_required,
instructions=instructions
)
if result.get('success'):
return jsonify({'success': True, 'message': 'Training plan updated successfully'})
else:
return jsonify({'success': False, 'error': result.get('message', 'Failed to update training plan')})
except Exception as e:
logger.error(f"Update training plan error: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
# User Training Routes
@self.app.route('/training')
@login_required
def training_dashboard():
"""User training dashboard."""
try:
training_data = training_controller.get_user_training_dashboard(current_user.doc_user)
return render_template('training_dashboard.html',
training_data=training_data,
user=current_user)
except Exception as e:
logger.error(f"Training dashboard error: {e}")
flash('Error loading training dashboard', 'error')
return render_template('training_dashboard.html',
training_data={'required_training': [], 'completed_training': []},
user=current_user)
@self.app.route('/training/assignment/<training_uid>')
@login_required
def training_assignment_detail(training_uid):
"""Training assignment detail and completion page."""
try:
# Get training details
training = training_controller.get_training_assignment(current_user.doc_user, training_uid)
if not training:
flash('Training assignment not found', 'error')
return redirect(url_for('training_dashboard'))
return render_template('training_detail.html',
training=training,
user=current_user)
except Exception as e:
logger.error(f"Training assignment detail error: {e}")
flash('Error loading training assignment', 'error')
return redirect(url_for('training_dashboard'))
@self.app.route('/training/assignment/<training_uid>/complete', methods=['POST'])
@login_required
def complete_training(training_uid):
"""Complete a training assignment."""
try:
data = request.get_json()
quiz_answers = data.get('quiz_answers', {})
result = training_controller.complete_user_training_by_uid(
user=current_user.doc_user,
training_uid=training_uid,
quiz_answers=quiz_answers
)
if result.get('success'):
return jsonify({'success': True, 'message': 'Training completed successfully'})
else:
return jsonify({'success': False, 'error': result.get('message', 'Failed to complete training')})
except Exception as e:
logger.error(f"Complete training error: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@self.app.route('/training/certificate/<training_uid>/download')
@login_required
def download_training_certificate(training_uid):
"""Download training certificate."""
try:
# Parse the training_uid to get user and document UIDs
# UUIDs are 36 characters, so we can split properly
if len(training_uid) >= 73: # Two UUIDs (36+36) + hyphen (1) = 73
user_uid = training_uid[:36]
document_uid = training_uid[37:] # Skip the connecting hyphen
else:
# Fallback for unexpected format
flash('Invalid training certificate ID format', 'error')
return redirect(url_for('training_dashboard'))
# Verify this is the correct user requesting their own certificate
if user_uid != current_user.doc_user.uid:
flash('You can only download your own training certificates', 'error')
return redirect(url_for('training_dashboard'))
# Check if training is completed
training = training_controller.get_training_assignment(current_user.doc_user, training_uid)
if not training or training.get('status') != 'COMPLETED':
flash('Training certificate not available - training must be completed first', 'error')
return redirect(url_for('training_dashboard'))
# Generate certificate content
from datetime import datetime, timedelta
completion_date = training.get('completed_date', 'Unknown')
validity_days = training.get('validity_days', 365)
# Calculate valid until date
if completion_date and completion_date != 'Unknown':
try:
if 'T' in completion_date:
completed_dt = datetime.fromisoformat(completion_date.replace('Z', '+00:00'))
else:
completed_dt = datetime.strptime(completion_date, '%Y-%m-%d')
valid_until = completed_dt + timedelta(days=validity_days)
valid_until_str = valid_until.strftime('%Y-%m-%d')
except:
valid_until_str = 'Unknown'
else:
valid_until_str = 'Unknown'
# Create certificate HTML content
certificate_html = f"""
<!DOCTYPE html>
<html>
<head>
<title>Training Certificate</title>
<style>
body {{ font-family: Arial, sans-serif; margin: 40px; background-color: #f8f9fa; }}
.certificate {{
background: white;
border: 3px solid #4e73df;
padding: 40px;
text-align: center;
margin: 20px auto;
max-width: 800px;
box-shadow: 0 4px 6px rgba(0,0,0,0.1);
}}
.header {{ color: #4e73df; font-size: 24px; font-weight: bold; margin-bottom: 20px; }}
.title {{ font-size: 36px; font-weight: bold; color: #2c3e50; margin: 30px 0; }}
.recipient {{ font-size: 28px; color: #4e73df; margin: 20px 0; font-style: italic; }}
.content {{ font-size: 18px; line-height: 1.6; margin: 20px 0; }}
.document {{ font-size: 20px; font-weight: bold; color: #2c3e50; margin: 15px 0; }}
.details {{ font-size: 14px; color: #6c757d; margin-top: 30px; }}
.signature-section {{ margin-top: 40px; }}
.signature-line {{ border-top: 1px solid #000; width: 200px; margin: 20px auto 5px auto; }}
</style>
</head>
<body>
<div class="certificate">
<div class="header">CONTROLLED DOCUMENT MANAGEMENT SYSTEM</div>
<div class="title">CERTIFICATE OF TRAINING COMPLETION</div>
<div class="content">This is to certify that</div>
<div class="recipient">{current_user.doc_user.name or current_user.username}</div>
<div class="content">has successfully completed the required training for:</div>
<div class="document">{training.get('document', {}).get('title', 'Training Document')}</div>
<div class="content">Document Number: {training.get('document', {}).get('doc_number', 'N/A')}</div>
<div class="details">
<p><strong>Training Completed:</strong> {completion_date}</p>
<p><strong>Score:</strong> {training.get('score', 'N/A')}%</p>
<p><strong>Valid Until:</strong> {valid_until_str}</p>
<p><strong>Certificate ID:</strong> {training_uid}</p>
</div>
<div class="signature-section">
<div class="signature-line"></div>
<div>Training Administrator</div>
<div style="font-size: 12px; color: #6c757d; margin-top: 10px;">
Generated on {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
</div>
</div>
</div>
</body>
</html>
"""
# Return HTML certificate
response = make_response(certificate_html)
response.headers['Content-Type'] = 'text/html'
response.headers['Content-Disposition'] = f'inline; filename="training_certificate_{training.get("document", {}).get("doc_number", "training")}_{completion_date}.html"'
return response
except Exception as e:
logger.error(f"Certificate download error: {e}")
flash('Error downloading certificate', 'error')
return redirect(url_for('training_dashboard'))
# Admin Training Management Routes
@self.app.route('/admin/training/assignments/<assignment_uid>/remove', methods=['DELETE'])
@login_required
def remove_training_assignment(assignment_uid):
"""Remove a training assignment."""
if not current_user.has_role('admin'):
return jsonify({'success': False, 'error': 'Access denied: Admin privileges required'}), 403
try:
result = training_controller.remove_training_assignment(
user=current_user.doc_user,
assignment_uid=assignment_uid
)
if result.get('success'):
return jsonify({
'success': True,
'message': result.get('message', 'Training assignment removed successfully'),
'removed_assignment': result.get('removed_assignment', {})
})
else:
return jsonify({
'success': False,
'error': result.get('message', 'Failed to remove training assignment')
})
except Exception as e:
logger.error(f"Remove training assignment error: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@self.app.route('/admin/documents/<document_uid>/status', methods=['POST'])
@login_required
def admin_update_document_status(document_uid):
"""Update document status (admin only)."""
if not current_user.has_role('admin'):
return jsonify({'success': False, 'error': 'Access denied: Admin privileges required'}), 403
try:
data = request.get_json()
new_status = data.get('status')
if not new_status:
return jsonify({'success': False, 'error': 'Status is required'}), 400
# Validate status
valid_statuses = ['draft', 'review', 'approved', 'published', 'archived']
if new_status.lower() not in valid_statuses:
return jsonify({'success': False, 'error': f'Invalid status. Must be one of: {", ".join(valid_statuses)}'}), 400
from CDocs.controllers import document_controller
# Get document first to verify it exists
document = document_controller.get_document(document_uid)
if not document:
return jsonify({'success': False, 'error': 'Document not found'}), 404
# Force update the document status
query = """
MATCH (d:ControlledDocument {UID: $document_uid})
SET d.documentStatus = $new_status,
d.modifiedDate = datetime()
RETURN d
"""
from CDocs.db import db_operations as db
result = db.run_query(query, {
'document_uid': document_uid,
'new_status': new_status.upper()
})
if result:
# Log the status change
from CDocs.utils import audit_trail
audit_trail.log_event(
event_type="DOCUMENT_STATUS_CHANGED",
user=current_user.doc_user.uid,
resource_uid=document_uid,
resource_type="ControlledDocument",
details={
"old_status": document.get('documentStatus', 'unknown'),
"new_status": new_status.upper(),
"changed_by_admin": True
}
)
return jsonify({
'success': True,
'message': f'Document status updated to {new_status}',
'new_status': new_status
})
else:
return jsonify({'success': False, 'error': 'Failed to update document status'}), 500
except Exception as e:
logger.error(f"Admin update document status error: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
# User Management Routes
@self.app.route('/admin/users/<user_id>')
@login_required
def get_user_for_edit(user_id):
"""Get user data for editing."""
if not current_user.has_role('admin'):
return jsonify({'success': False, 'error': 'Access denied'}), 403
try:
from CDocs.controllers import admin_controller
user = admin_controller.get_user_by_id(user_id)
if user:
return jsonify({'success': True, 'user': user})
else:
return jsonify({'success': False, 'error': 'User not found'}), 404
except Exception as e:
logger.error(f"Get user error: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@self.app.route('/admin/users/<user_id>/update', methods=['POST'])
@login_required
def update_user(user_id):
"""Update user data."""
if not current_user.has_role('admin'):
return jsonify({'success': False, 'error': 'Access denied'}), 403
try:
from CDocs.controllers import admin_controller
data = request.get_json()
result = admin_controller.update_user(user_id, data)
if result.get('success'):
return jsonify({'success': True, 'message': 'User updated successfully'})
else:
return jsonify({'success': False, 'error': result.get('message', 'Failed to update user')})
except Exception as e:
logger.error(f"Update user error: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@self.app.route('/admin/users/<user_id>/delete', methods=['POST'])
@login_required
def delete_user(user_id):
"""Delete user."""
if not current_user.has_role('admin'):
return jsonify({'success': False, 'error': 'Access denied'}), 403
try:
from CDocs.controllers import admin_controller
result = admin_controller.delete_user(user_id)
if result.get('success'):
return jsonify({'success': True, 'message': 'User deleted successfully'})
else:
return jsonify({'success': False, 'error': result.get('message', 'Failed to delete user')})
except Exception as e:
logger.error(f"Delete user error: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
# Document Action Routes
@self.app.route('/documents/<document_uid>/download')
@login_required
def download_document(document_uid):
"""Download document route - downloads current version."""
try:
# Import the actual download function from document_controller
from CDocs.controllers.document_controller import download_document_version as doc_download_func
logger.info(f"Downloading current version of document: {document_uid}")
# Call the function without version_uid to get current version
result = doc_download_func(
user=current_user.doc_user,
document_uid=document_uid,
version_uid=None, # None means current version
include_audit_trail=False,
include_watermark=False
)
if result.get('success'):
file_content = result.get('content')
file_name = result.get('file_name', 'document')
file_extension = result.get('file_extension', '.pdf')
if not file_name.endswith(file_extension):
file_name += file_extension
# Create response with file content
response = make_response(file_content)
response.headers['Content-Type'] = 'application/octet-stream'
response.headers['Content-Disposition'] = f'attachment; filename="{file_name}"'
return response
else:
flash('Failed to download document', 'error')
return redirect(url_for('document_detail', doc_id=document_uid))
except Exception as e:
logger.error(f"Error downloading document: {e}")
flash(f'Error downloading document: {str(e)}', 'error')
return redirect(url_for('document_detail', doc_id=document_uid))
@self.app.route('/documents/<document_uid>/view')
@login_required
def view_document_content(document_uid):
"""View document content route."""
try:
# Get document content for viewing
document = document_controller.get_document(document_uid, current_user.doc_user)
if not document:
flash('Document not found', 'error')
return redirect(url_for('documents'))
# For now, redirect to document detail page
# In future, this could open a document viewer
return redirect(url_for('view_document', document_uid=document_uid))
except Exception as e:
logger.error(f"View document error: {e}")
flash('Error viewing document', 'error')
return redirect(url_for('documents'))
@self.app.route('/documents/<document_uid>/reviews/start', methods=['POST'])
@login_required
def start_review_cycle(document_uid):
"""Start a new review cycle."""
try:
from CDocs.controllers.review_controller import initiate_review_cycle
result = initiate_review_cycle(
document_uid=document_uid,
initiated_by=current_user.doc_user
)
if result.get('success'):
return jsonify({'success': True, 'message': 'Review cycle started successfully'})
else:
return jsonify({'success': False, 'error': result.get('message', 'Failed to start review')})
except Exception as e:
logger.error(f"Start review error: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@self.app.route('/documents/<document_uid>/approvals/start', methods=['POST'])
@login_required
def start_approval_cycle(document_uid):
"""Start a new approval cycle."""
try:
from CDocs.controllers.approval_controller import initiate_approval_cycle
result = initiate_approval_cycle(
document_uid=document_uid,
initiated_by=current_user.doc_user
)
if result.get('success'):
return jsonify({'success': True, 'message': 'Approval cycle started successfully'})
else:
return jsonify({'success': False, 'error': result.get('message', 'Failed to start approval')})
except Exception as e:
logger.error(f"Start approval error: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@self.app.route('/documents/<document_uid>/publish', methods=['POST'])
@login_required
def publish_document(document_uid):
"""Publish a document."""
try:
from CDocs.controllers.document_controller import publish_document as publish_doc
data = request.get_json() or {}
comment = data.get('comment', '')
result = publish_doc(
user=current_user.doc_user,
document_uid=document_uid,
publish_comment=comment
)
if result.get('success'):
return jsonify({'success': True, 'message': 'Document published successfully'})
else:
return jsonify({'success': False, 'error': result.get('message', 'Failed to publish document')})
except Exception as e:
logger.error(f"Publish document error: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@self.app.route('/documents/<document_uid>/clone', methods=['POST'])
@login_required
def clone_document(document_uid):
"""Clone a document."""
try:
from CDocs.controllers.document_controller import clone_document as clone_doc
result = clone_doc(
document_uid=document_uid,
user=current_user.doc_user
)
if result.get('success'):
new_doc_uid = result.get('new_document_uid')
return jsonify({'success': True, 'new_document_id': new_doc_uid, 'message': 'Document cloned successfully'})
else:
return jsonify({'success': False, 'error': result.get('message', 'Failed to clone document')})
except Exception as e:
logger.error(f"Clone document error: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@self.app.route('/documents/<document_uid>/archive', methods=['POST'])
@login_required
def archive_document(document_uid):
"""Archive a document."""
try:
from CDocs.controllers.document_controller import archive_document as archive_doc
data = request.get_json()
reason = data.get('reason')
comment = data.get('comment', '')
if not reason:
return jsonify({'success': False, 'error': 'Archive reason is required'}), 400
result = archive_doc(
user=current_user.doc_user,
document_uid=document_uid,
archive_reason=reason,
archive_comment=comment
)
if result.get('success'):
return jsonify({'success': True, 'message': 'Document archived successfully'})
else:
return jsonify({'success': False, 'error': result.get('message', 'Failed to archive document')})
except Exception as e:
logger.error(f"Archive document error: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@self.app.route('/documents/<document_uid>/audit')
@login_required
def get_document_audit_trail(document_uid):
"""Get document audit trail."""
try:
from CDocs.controllers.document_controller import get_document_audit_trail
result = get_document_audit_trail(
document_uid=document_uid,
user=current_user.doc_user
)
if result.get('success'):
return jsonify({'success': True, 'audit_entries': result.get('audit_entries', [])})
else:
return jsonify({'success': False, 'error': result.get('message', 'Failed to get audit trail')})
except Exception as e:
logger.error(f"Get audit trail error: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@self.app.route('/documents/<document_uid>/convert-pdf', methods=['POST'])
@login_required
def convert_document_to_pdf(document_uid):
"""Convert a document to PDF."""
try:
from CDocs.controllers.document_controller import convert_document_to_pdf
result = convert_document_to_pdf(
user=current_user.doc_user,
document_uid=document_uid
)
if result.get('success'):
return jsonify({'success': True, 'message': 'Document converted to PDF successfully'})
else:
return jsonify({'success': False, 'error': result.get('message', 'Failed to convert document')})
except Exception as e:
logger.error(f"Convert to PDF error: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@self.app.route('/documents/<document_uid>/change-status', methods=['POST'])
@login_required
def change_document_status(document_uid):
"""Change document status (admin only)."""
try:
# Check if user is admin
if not (current_user.has_role('admin') or current_user.has_role('ADMIN')):
return jsonify({'success': False, 'error': 'Admin permissions required'}), 403
from CDocs.controllers.document_controller import update_document
from CDocs.utils.audit_trail import log_event
data = request.get_json()
new_status = data.get('status')
reason = data.get('reason', '')
if not new_status:
return jsonify({'success': False, 'error': 'Status is required'}), 400
if not reason or len(reason.strip()) < 10:
return jsonify({'success': False, 'error': 'Detailed reason is required (minimum 10 characters)'}), 400
# Get current document status for audit logging
document = document_controller.get_document(document_uid, current_user.doc_user)
if not document:
return jsonify({'success': False, 'error': 'Document not found'}), 404
old_status = document.get('status', 'Unknown')
result = update_document(
user=current_user.doc_user,
document_uid=document_uid,
status=new_status
)
if result.get('success'):
# Log the administrative status change
log_event(
event_type="ADMIN_STATUS_CHANGE",
user=current_user.doc_user,
resource_uid=document_uid,
resource_type="ControlledDocument",
details={
"old_status": old_status,
"new_status": new_status,
"reason": reason.strip(),
"admin_user": current_user.doc_user.username
}
)
return jsonify({'success': True, 'message': f'Document status changed from {old_status} to {new_status}'})
else:
return jsonify({'success': False, 'error': result.get('message', 'Failed to change status')})
except Exception as e:
logger.error(f"Change status error: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@self.app.route('/documents/<document_uid>/delete', methods=['DELETE'])
@login_required
def delete_document(document_uid):
"""Delete a document and all dependencies (admin only)."""
try:
# Check if user is admin
if not (current_user.has_role('admin') or current_user.has_role('ADMIN')):
return jsonify({'success': False, 'error': 'Admin permissions required'}), 403
from CDocs.controllers.document_controller import delete_document as delete_doc
result = delete_doc(
user=current_user.doc_user,
document_uid=document_uid
)
if result.get('success'):
return jsonify({'success': True, 'message': 'Document deleted successfully'})
else:
return jsonify({'success': False, 'error': result.get('message', 'Failed to delete document')})
except Exception as e:
logger.error(f"Delete document error: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@self.app.route('/documents/<document_uid>/training')
@login_required
def document_training_management(document_uid):
"""Document training management page."""
try:
# Check if user has training management permissions
if not (current_user.has_permission('MANAGE_TRAINING') or
current_user.has_permission('MANAGE_ALL_TRAINING') or
current_user.has_role('admin')):
flash('Access denied: Training management permissions required', 'error')
return redirect(url_for('document_detail', doc_id=document_uid))
# Get document details
document = document_controller.get_document(document_uid, current_user.doc_user)
if not document:
flash('Document not found', 'error')
return redirect(url_for('documents'))
# Get training information for this document
logger.info(f"Getting training info for document {document_uid} in training management route")
training_info = training_controller.get_document_training_info(document_uid)
logger.info(f"Training info retrieved: enabled={training_info.get('enabled', False)}")
return render_template('document_training_management.html',
document=document,
training_info=training_info,
user=current_user)
except Exception as e:
logger.error(f"Document training management error: {e}")
flash('Error loading training management page', 'error')
return redirect(url_for('document_detail', doc_id=document_uid))
@self.app.route('/documents/<document_uid>/training/enable', methods=['POST'])
@login_required
def enable_document_training(document_uid):
"""Enable training for a document."""
try:
logger.info(f"Training enable request for document: {document_uid}")
# Check if user has training management permissions
if not (current_user.has_permission('MANAGE_TRAINING') or
current_user.has_permission('MANAGE_ALL_TRAINING') or
current_user.has_role('admin')):
logger.warning(f"Permission denied for user {current_user.username} to enable training")
return jsonify({'success': False, 'error': 'Training management permissions required'}), 403
data = request.get_json()
logger.info(f"Training enable data: {data}")
validity_days = data.get('validity_days', 365)
quiz_required = data.get('quiz_required', False)
instructions = data.get('instructions', '')
logger.info(f"Calling enable_document_training with user: {current_user.username}, document: {document_uid}")
# Enable training using the training controller
result = training_controller.enable_document_training(
user=current_user.doc_user,
document_uid=document_uid,
validity_days=validity_days,
quiz_required=quiz_required,
instructions=instructions
)
logger.info(f"Training enable result: {result}")
if result.get('success'):
return jsonify({'success': True, 'message': 'Training enabled successfully'})
else:
return jsonify({'success': False, 'error': result.get('message', 'Failed to enable training')})
except Exception as e:
logger.error(f"Enable training error: {e}", exc_info=True)
return jsonify({'success': False, 'error': str(e)}), 500
@self.app.route('/documents/<document_uid>/training/disable', methods=['POST'])
@login_required
def disable_document_training(document_uid):
"""Disable training for a document."""
try:
# Check if user has training management permissions
if not (current_user.has_permission('MANAGE_TRAINING') or
current_user.has_permission('MANAGE_ALL_TRAINING') or
current_user.has_role('admin')):
return jsonify({'success': False, 'error': 'Training management permissions required'}), 403
# Disable training using the training controller
result = training_controller.disable_document_training(
user=current_user.doc_user,
document_uid=document_uid
)
if result.get('success'):
return jsonify({'success': True, 'message': 'Training disabled successfully'})
else:
return jsonify({'success': False, 'error': result.get('message', 'Failed to disable training')})
except Exception as e:
logger.error(f"Disable training error: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@self.app.route('/documents/<document_uid>/training/assign', methods=['GET', 'POST'])
@login_required
def assign_document_training(document_uid):
"""Assign training for a document to users."""
try:
if request.method == 'GET':
# Return the assignment form/page
logger.info(f"GET request for training assignment - Document UID: {document_uid}")
document = document_controller.get_document(document_uid, current_user.doc_user)
logger.info(f"Document retrieved: {document is not None}")
if document:
logger.info(f"Document type: {type(document)}")
logger.info(f"Document keys: {list(document.keys()) if isinstance(document, dict) else 'Not a dict'}")
logger.info(f"Document UID: {document.get('uid', 'NO UID') if isinstance(document, dict) else getattr(document, 'uid', 'NO UID ATTR')}")
logger.info(f"Full document: {document}")
if not document:
flash('Document not found', 'error')
return redirect(url_for('documents'))
# Get available users for assignment
users = admin_controller.get_users()
logger.info(f"Users retrieved: {len(users) if users else 0}")
# Get current training info
training_info = training_controller.get_document_training_info(document_uid)
logger.info(f"Training info retrieved: {training_info is not None}")
return render_template('training_assign.html',
document=document,
document_uid=document_uid,
users=users,
training_info=training_info)
elif request.method == 'POST':
# Handle training assignment
logger.info(f"POST request received for training assignment - Document: {document_uid}")
logger.info(f"Request headers: {dict(request.headers)}")
logger.info(f"Request content type: {request.content_type}")
if not (current_user.has_permission('MANAGE_TRAINING') or
current_user.has_permission('MANAGE_ALL_TRAINING') or
current_user.has_role('admin')):
logger.warning(f"User {current_user.email} lacks training management permissions")
return jsonify({'success': False, 'error': 'Training management permissions required'}), 403
# Get form data
data = request.get_json() if request.is_json else request.form
logger.info(f"Form data received: {data}")
user_uids = data.get('user_uids', [])
validity_days = data.get('validity_days', 365) # Default to 1 year
logger.info(f"Parsed user_uids: {user_uids}, validity_days: {validity_days}")
if isinstance(user_uids, str):
user_uids = [user_uids]
if not user_uids:
logger.error("No users selected for training assignment")
return jsonify({'success': False, 'error': 'No users selected'}), 400
# Assign training using the training controller
logger.info(f"Calling assign_user_training with user_uids: {user_uids}")
result = training_controller.assign_user_training(
user=current_user.doc_user,
document_uid=document_uid,
user_uids=user_uids,
validity_days=int(validity_days)
)
logger.info(f"Training assignment result: {result}")
if result.get('success'):
logger.info(f"Training assignment successful - assigned to {result.get('assigned_count', 0)} users")
return jsonify({
'success': True,
'message': f"Training assigned to {result.get('assigned_count', 0)} users"
})
else:
logger.error(f"Training assignment failed: {result.get('message', 'Failed to assign training')}")
return jsonify({
'success': False,
'error': result.get('message', 'Failed to assign training')
})
except Exception as e:
logger.error(f"Assign training error: {e}")
if request.method == 'POST':
return jsonify({'success': False, 'error': str(e)}), 500
else:
flash(f'Error loading training assignment page: {str(e)}', 'error')
return redirect(url_for('documents'))
@self.app.route('/documents/<document_uid>/versions/<version_uid>/download')
@login_required
def download_document_version(document_uid, version_uid):
"""Download a specific document version."""
try:
# Import the actual download function from document_controller
from CDocs.controllers.document_controller import download_document_version as doc_download_func
logger.info(f"Downloading document version - Document: {document_uid}, Version: {version_uid}")
# Call the function with the correct signature
result = doc_download_func(
user=current_user.doc_user,
document_uid=document_uid,
version_uid=version_uid,
include_audit_trail=False,
include_watermark=False
)
if result.get('success'):
file_content = result.get('content')
file_name = result.get('file_name', 'document')
file_extension = result.get('file_extension', '.pdf')
if not file_name.endswith(file_extension):
file_name += file_extension
# Create response with file content
response = make_response(file_content)
response.headers['Content-Type'] = 'application/octet-stream'
response.headers['Content-Disposition'] = f'attachment; filename="{file_name}"'
return response
else:
flash('Failed to download document version', 'error')
return redirect(url_for('document_detail', doc_id=document_uid))
except Exception as e:
logger.error(f"Error downloading document version: {e}")
flash(f'Error downloading document: {str(e)}', 'error')
return redirect(url_for('document_detail', doc_id=document_uid))
@self.app.route('/documents/<document_uid>/versions/<version_uid>/preview')
@login_required
def preview_document_version(document_uid, version_uid):
"""Preview a specific document version."""
try:
# For now, redirect to main document page
# This could be enhanced to show version-specific preview
return redirect(url_for('view_document', document_uid=document_uid))
except Exception as e:
logger.error(f"Version preview error: {e}")
flash('Error previewing document version', 'error')
return redirect(url_for('view_document', document_uid=document_uid))
# Document Action Routes - Start Review
@self.app.route('/documents/<document_uid>/start_review', methods=['POST'])
@login_required
def start_review(document_uid):
"""Start a review cycle for a document."""
try:
logger.info(f"=== START REVIEW REQUEST ===")
logger.info(f"Document UID: {document_uid}")
logger.info(f"User: {current_user.email}")
logger.info(f"Request content type: {request.content_type}")
logger.info(f"Request data: {request.data}")
data = request.json
logger.info(f"Parsed JSON data: {data}")
# Import the actual function from review_controller
from CDocs.controllers.review_controller import create_review_cycle
# Extract all required parameters from the request
reviewer_uids = data.get('reviewer_uids', [])
reviewer_instructions = data.get('reviewer_instructions', {})
due_date_str = data.get('due_date')
logger.info(f"Reviewer UIDs: {reviewer_uids}")
logger.info(f"Reviewer instructions: {reviewer_instructions}")
logger.info(f"Due date string: {due_date_str}")
# Convert due_date string to datetime
from datetime import datetime, date, timedelta
if isinstance(due_date_str, str):
try:
due_date = datetime.fromisoformat(due_date_str)
except ValueError:
due_date = datetime.strptime(due_date_str, '%Y-%m-%d')
elif isinstance(due_date_str, date):
due_date = datetime.combine(due_date_str, datetime.max.time())
else:
due_date = datetime.now() + timedelta(days=14) # Default
logger.info(f"Converted due date: {due_date}")
logger.info(f"Calling create_review_cycle with:")
logger.info(f" - user: {current_user.doc_user}")
logger.info(f" - document_uid: {document_uid}")
logger.info(f" - reviewer_uids: {reviewer_uids}")
logger.info(f" - reviewer_instructions: {reviewer_instructions}")
logger.info(f" - due_date: {due_date}")
logger.info(f" - instructions: {data.get('instructions', '')}")
logger.info(f" - review_type: {data.get('review_type', 'Technical Review')}")
logger.info(f" - sequential: {data.get('sequential', False)}")
logger.info(f" - required_approval_percentage: {data.get('required_approval_percentage', 100)}")
result = create_review_cycle(
user=current_user.doc_user,
document_uid=document_uid,
reviewer_uids=reviewer_uids,
reviewer_instructions=reviewer_instructions,
due_date=due_date,
instructions=data.get('instructions', ''),
review_type=data.get('review_type', 'Technical Review'),
sequential=data.get('sequential', False),
required_approval_percentage=data.get('required_approval_percentage', 100)
)
logger.info(f"create_review_cycle result: {result}")
if result.get('success'):
logger.info("Review cycle started successfully")
return jsonify({'success': True, 'message': 'Review cycle started successfully'})
else:
error_msg = result.get('message', 'Failed to start review cycle')
logger.error(f"Review cycle failed: {error_msg}")
return jsonify({'success': False, 'error': error_msg})
except Exception as e:
logger.error(f"Start review error: {e}")
logger.error(f"Exception details: {traceback.format_exc()}")
return jsonify({'success': False, 'error': str(e)}), 500
# Document Action Routes - Start Approval
@self.app.route('/documents/<document_uid>/start_approval', methods=['POST'])
@login_required
def start_approval(document_uid):
"""Start an approval cycle for a document."""
try:
data = request.json
logger.info(f"Starting approval cycle with data: {data}")
# Import the actual function from approval_controller
from CDocs.controllers.approval_controller import create_approval_cycle
# Extract all required parameters from the request
approver_uids = data.get('approver_uids', [])
approver_instructions = data.get('approver_instructions', {})
due_date_str = data.get('due_date')
# Convert due_date string to datetime
from datetime import datetime, date, timedelta
if isinstance(due_date_str, str):
try:
due_date = datetime.fromisoformat(due_date_str)
except ValueError:
due_date = datetime.strptime(due_date_str, '%Y-%m-%d')
elif isinstance(due_date_str, date):
due_date = datetime.combine(due_date_str, datetime.max.time())
else:
due_date = datetime.now() + timedelta(days=14) # Default
# Extract approval type and validate it
approval_type = data.get('approval_type', 'STANDARD')
# Import settings to validate approval type
from CDocs.config.settings import APPROVAL_TYPES
if approval_type not in APPROVAL_TYPES:
logger.warning(f"Invalid approval type '{approval_type}', using STANDARD instead")
approval_type = 'STANDARD'
result = create_approval_cycle(
user=current_user.doc_user,
document_uid=document_uid,
approver_uids=approver_uids,
approver_instructions=approver_instructions,
due_date=due_date,
instructions=data.get('instructions', ''),
approval_type=approval_type,
sequential=data.get('sequential', False),
required_approval_percentage=data.get('required_approval_percentage', 100)
)
if result.get('success'):
return jsonify({'success': True, 'message': 'Approval cycle started successfully'})
else:
return jsonify({'success': False, 'error': result.get('message', 'Failed to start approval cycle')})
except Exception as e:
logger.error(f"Start approval error: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
# API Routes for getting users
@self.app.route('/api/users/reviewers')
@login_required
def get_reviewers():
"""Get list of users who can be reviewers."""
try:
from CDocs.db.db_operations import run_query
reviewers_result = run_query(
"""
MATCH (u:User)
WHERE (u.UID <> $current_user_uid) and not('Template' in labels(u))
RETURN u.UID as uid, u.Name as name, u.Role as role, u.Department as department
ORDER BY u.Name
""",
{"current_user_uid": current_user.doc_user.uid}
)
reviewers = [
{
"uid": r["uid"],
"name": r["name"],
"role": r.get("role", ""),
"department": r.get("department", "")
}
for r in reviewers_result
]
return jsonify({'reviewers': reviewers})
except Exception as e:
logger.error(f"Error fetching reviewers: {e}")
return jsonify({'reviewers': []}), 500
@self.app.route('/api/users/approvers')
@login_required
def get_approvers():
"""Get list of users who can be approvers."""
try:
from CDocs.db.db_operations import run_query
approvers_result = run_query(
"""
MATCH (u:User)
WHERE (u.UID <> $current_user_uid) and not('Template' in labels(u))
RETURN u.UID as uid, u.Name as name, u.Role as role, u.Department as department
ORDER BY u.Name
""",
{"current_user_uid": current_user.doc_user.uid}
)
approvers = [
{
"uid": r["uid"],
"name": r["name"],
"role": r.get("role", ""),
"department": r.get("department", "")
}
for r in approvers_result
]
return jsonify({'approvers': approvers})
except Exception as e:
logger.error(f"Error fetching approvers: {e}")
return jsonify({'approvers': []}), 500
# Document Action Routes - Publish Document
@self.app.route('/documents/<document_uid>/publish', methods=['POST'])
@login_required
def publish_document_route(document_uid):
"""Publish a document."""
try:
from CDocs.controllers.document_controller import publish_document
publish_comment = request.json.get('comment', '')
result = publish_document(
user=current_user.doc_user,
document_uid=document_uid,
publish_comment=publish_comment
)
if result.get('success'):
return jsonify({'success': True, 'message': 'Document published successfully'})
else:
return jsonify({'success': False, 'error': result.get('message', 'Failed to publish document')})
except Exception as e:
logger.error(f"Publish document error: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
# Document Action Routes - Update/Edit Document
@self.app.route('/documents/<document_uid>/edit', methods=['POST'])
@login_required
def edit_document_route(document_uid):
"""Edit document metadata."""
try:
from CDocs.controllers.document_controller import update_document
# Get data from form
data = request.json
title = data.get('title')
description = data.get('description')
# Build update parameters
update_params = {}
if title:
update_params['title'] = title
if description:
update_params['description'] = description
result = update_document(
user=current_user.doc_user,
document_uid=document_uid,
**update_params
)
if result.get('success'):
return jsonify({'success': True, 'message': 'Document updated successfully'})
else:
return jsonify({'success': False, 'error': result.get('message', 'Failed to update document')})
except Exception as e:
logger.error(f"Edit document error: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
# Document Action Routes - Upload New Version
@self.app.route('/documents/<document_uid>/upload_version', methods=['POST'])
@login_required
def upload_document_version(document_uid):
"""Upload a new version of a document."""
try:
from CDocs.controllers.document_controller import create_document_version
# Get file from form
if 'file' not in request.files:
return jsonify({'success': False, 'error': 'No file provided'}), 400
file = request.files['file']
if file.filename == '':
return jsonify({'success': False, 'error': 'No file selected'}), 400
# Read file content
file_content = file.read()
file_name = file.filename
comment = request.form.get('comment', '')
result = create_document_version(
user=current_user.doc_user,
document_uid=document_uid,
file_content=file_content,
file_name=file_name,
comment=comment
)
if result and result.get('UID'):
return jsonify({'success': True, 'message': 'New version uploaded successfully'})
else:
return jsonify({'success': False, 'error': 'Failed to upload new version'})
except Exception as e:
logger.error(f"Upload new version error: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@self.app.route('/training/document/<document_uid>')
@login_required
def manage_document_training(document_uid):
"""Manage training for a document."""
try:
# Redirect to training page or show training management
flash('Training management will be implemented', 'info')
return redirect(url_for('view_document', document_uid=document_uid))
except Exception as e:
logger.error(f"Training management error: {e}")
flash('Error accessing training management', 'error')
return redirect(url_for('view_document', document_uid=document_uid))
@self.app.route('/auth/azure/callback')
def azure_callback():
"""Azure SSO callback route."""
if not self.azure_sso:
flash('Azure SSO not configured', 'error')
return redirect(url_for('login'))
try:
code = request.args.get('code')
error = request.args.get('error')
if error:
logger.error(f"Azure authentication error: {error}")
flash(f'Authentication failed: {error}', 'error')
return redirect(url_for('login'))
if not code:
flash('Authorization failed - no code received', 'error')
return redirect(url_for('login'))
# Exchange code for tokens
token_data = self.azure_sso.get_token_from_code(code)
if not token_data or 'access_token' not in token_data:
logger.error("Failed to get access token from Azure")
flash('Authentication failed - could not get access token', 'error')
return redirect(url_for('login'))
# Validate token and get user info
from auth.azure_auth import validate_azure_token
user_info = validate_azure_token(token_data)
if not user_info or not user_info.get('email'):
logger.error("Failed to get user information from Azure token")
flash('Authentication failed - could not get user information', 'error')
return redirect(url_for('login'))
# Get or create user in CDocs system
email = user_info['email']
name = user_info.get('name', email)
try:
doc_user = DocUser.get_by_email(email)
if not doc_user:
# Create new user with proper DocUser structure
doc_user = DocUser({
'Name': name,
'Mail': email, # Use 'Mail' field as per DocUser model
'UID': str(uuid.uuid4()),
'Roles': ['user'],
'Permissions': [],
'Department': '',
'CreatedAt': datetime.now().isoformat(),
'IsActive': True,
'AuthProvider': 'azure'
})
doc_user.save()
logger.info(f"Created new user from Azure SSO: {email}")
else:
# Update user info if needed
if doc_user.name != name:
doc_user._data['Name'] = name
doc_user.save()
logger.info(f"User logged in via Azure SSO: {email}")
# Create Flask user and log them in
flask_user = FlaskUser(doc_user)
login_user(flask_user, remember=True)
flash(f'Welcome {name}!', 'success')
# Redirect to intended page or dashboard
next_page = session.get('next_page')
if next_page:
session.pop('next_page', None)
return redirect(next_page)
else:
return redirect(url_for('index'))
except Exception as user_error:
logger.error(f"Error creating/updating user: {user_error}")
flash('Authentication succeeded but user creation failed', 'error')
return redirect(url_for('login'))
except Exception as e:
logger.error(f"Azure callback error: {e}")
flash('Authentication error occurred', 'error')
return redirect(url_for('login'))
@self.app.errorhandler(404)
def not_found(error):
return render_template('error.html',
error_code=404,
error_message="Page not found",
user=current_user if current_user.is_authenticated else None), 404
@self.app.errorhandler(500)
def internal_error(error):
return render_template('error.html',
error_code=500,
error_message="Internal server error",
user=current_user if current_user.is_authenticated else None), 500
def _get_azure_auth_url(self) -> Optional[str]:
"""Get Azure authentication URL."""
return self.azure_sso.get_auth_url() if self.azure_sso else None
def _get_recent_activities(self) -> List[Dict[str, Any]]:
"""Get recent activities for dashboard."""
try:
# Create a simple recent activities list since admin_controller.get_recent_activities doesn't exist
# This is a placeholder implementation
return [
{
'description': 'System started',
'user': 'System',
'timestamp': datetime.now()
}
]
except Exception as e:
logger.error(f"Error getting recent activities: {e}")
return []
def _get_user_documents(self, user: DocUser) -> List:
"""Get documents for a specific user using direct database query."""
try:
logger.info(f"Getting documents for user: {user.uid}")
# Simple direct query to get documents
query = """
MATCH (d:ControlledDocument)
OPTIONAL MATCH (d)-[:CURRENT_VERSION]->(v:DocumentVersion)
RETURN d, v.versionNumber as version_number
ORDER BY d.lastModifiedDate DESC
LIMIT 50
"""
result = db.run_query(query, {})
logger.info(f"Query returned {len(result) if result else 0} documents")
documents = []
for record in result:
try:
doc_data = record['d']
version_num = record.get('version_number', '1.0')
doc_dict = {
'uid': doc_data.get('UID'),
'title': doc_data.get('title', 'Untitled'),
'type': doc_data.get('documentType', 'Document'),
'status': doc_data.get('documentStatus', 'Draft'),
'version': version_num,
'lastModified': doc_data.get('lastModifiedDate', 'Unknown'),
'created': doc_data.get('createdDate', 'Unknown'),
'description': doc_data.get('description', ''),
'ownerUID': doc_data.get('ownerUID'),
'docNumber': doc_data.get('docNumber', '')
}
documents.append(doc_dict)
except Exception as doc_error:
logger.error(f"Error processing document record: {doc_error}")
continue
logger.info(f"Returning {len(documents)} documents")
return documents
except Exception as e:
logger.error(f"Error getting user documents: {e}")
import traceback
logger.error(traceback.format_exc())
return []
def _generate_users_tab_content(self, users):
"""Generate HTML content for users tab."""
html = f"""
<div class="row mb-3">
<div class="col-md-6">
<input type="text" class="form-control" placeholder="Search users..." id="user-search">
</div>
<div class="col-md-6">
<select class="form-select" id="role-filter">
<option value="">All Roles</option>
<option value="admin">Admin</option>
<option value="reviewer">Reviewer</option>
<option value="approver">Approver</option>
<option value="user">User</option>
</select>
</div>
</div>
<div class="table-responsive">
<table class="table table-hover">
<thead>
<tr>
<th>Name</th>
<th>Email</th>
<th>Roles</th>
<th>Last Login</th>
<th>Status</th>
<th>Actions</th>
</tr>
</thead>
<tbody>
"""
for user in users:
roles_html = ', '.join(user.get('roles', []))
last_login = user.get('lastLogin', 'Never')
status = 'Active' if user.get('isActive', True) else 'Inactive'
status_class = 'text-success' if user.get('isActive', True) else 'text-danger'
html += f"""
<tr>
<td>{user.get('name', user.get('username', 'Unknown'))}</td>
<td>{user.get('email', user.get('Mail', 'N/A'))}</td>
<td><span class="badge bg-primary">{roles_html}</span></td>
<td><small>{last_login}</small></td>
<td><span class="{status_class}">{status}</span></td>
<td>
<button class="btn btn-sm btn-outline-primary" onclick="editUser('{user.get('uid', user.get('id', ''))}')">
<i class="fas fa-edit"></i> Edit
</button>
<button class="btn btn-sm btn-outline-danger" onclick="deleteUser('{user.get('uid', user.get('id', ''))}')">
<i class="fas fa-trash"></i> Delete
</button>
</td>
</tr>
"""
html += """
</tbody>
</table>
</div>
"""
return html
def _generate_documents_tab_content(self, documents):
"""Generate HTML content for documents tab."""
html = f"""
<div class="row mb-3">
<div class="col-md-4">
<input type="text" class="form-control" placeholder="Search documents..." id="doc-search">
</div>
<div class="col-md-3">
<select class="form-select" id="status-filter">
<option value="">All Status</option>
<option value="draft">Draft</option>
<option value="review">In Review</option>
<option value="approved">Approved</option>
<option value="published">Published</option>
<option value="archived">Archived</option>
</select>
</div>
<div class="col-md-3">
<select class="form-select" id="type-filter">
<option value="">All Types</option>
<option value="policy">Policy</option>
<option value="procedure">Procedure</option>
<option value="guideline">Guideline</option>
<option value="form">Form</option>
</select>
</div>
<div class="col-md-2">
<button class="btn btn-primary w-100" onclick="refreshDocuments()">
<i class="fas fa-sync-alt"></i>
</button>
</div>
</div>
<div class="table-responsive">
<table class="table table-hover">
<thead>
<tr>
<th>Document</th>
<th>Type</th>
<th>Status</th>
<th>Owner</th>
<th>Last Modified</th>
<th>Version</th>
<th>Actions</th>
</tr>
</thead>
<tbody>
"""
for doc in documents:
status_class = {
'draft': 'bg-secondary',
'review': 'bg-warning',
'approved': 'bg-success',
'published': 'bg-primary',
'archived': 'bg-dark'
}.get(doc.get('status', 'draft'), 'bg-secondary')
html += f"""
<tr>
<td>
<strong>{doc.get('title', 'Untitled')}</strong>
<br><small class="text-muted">{doc.get('uid', '')}</small>
</td>
<td>{doc.get('type', 'Document')}</td>
<td><span class="badge {status_class}">{doc.get('status', 'draft').title()}</span></td>
<td>{doc.get('ownerName', 'Unknown')}</td>
<td><small>{doc.get('modifiedDate', 'Unknown')}</small></td>
<td>{doc.get('version', '1.0')}</td>
<td>
<button class="btn btn-sm btn-outline-primary" onclick="viewDocument('{doc.get('uid')}')">
<i class="fas fa-eye"></i>
</button>
<button class="btn btn-sm btn-outline-warning" onclick="forceStatusChange('{doc.get('uid')}')">
<i class="fas fa-edit"></i>
</button>
</td>
</tr>
"""
html += """
</tbody>
</table>
</div>
"""
return html
def _generate_training_tab_content(self, training_data):
"""Generate HTML content for training tab."""
plans = training_data.get('plans', [])
assignments = training_data.get('assignments', [])
completions = training_data.get('completions', [])
html = f"""
<div class="row mb-4">
<div class="col-md-4">
<div class="card text-center">
<div class="card-body">
<h3 class="text-primary">{len(plans)}</h3>
<p class="mb-0">Training Plans</p>
</div>
</div>
</div>
<div class="col-md-4">
<div class="card text-center">
<div class="card-body">
<h3 class="text-warning">{len(assignments)}</h3>
<p class="mb-0">Active Assignments</p>
</div>
</div>
</div>
<div class="col-md-4">
<div class="card text-center">
<div class="card-body">
<h3 class="text-success">{len(completions)}</h3>
<p class="mb-0">Completed Trainings</p>
</div>
</div>
</div>
</div>
<ul class="nav nav-pills mb-3" id="training-sub-tabs" role="tablist">
<li class="nav-item" role="presentation">
<button class="nav-link active" id="plans-tab" data-bs-toggle="pill" data-bs-target="#training-plans" type="button" role="tab">
Training Plans
</button>
</li>
<li class="nav-item" role="presentation">
<button class="nav-link" id="assignments-tab" data-bs-toggle="pill" data-bs-target="#training-assignments" type="button" role="tab">
Assignments
</button>
</li>
<li class="nav-item" role="presentation">
<button class="nav-link" id="completions-tab" data-bs-toggle="pill" data-bs-target="#training-completions" type="button" role="tab">
Completions
</button>
</li>
</ul>
<div class="tab-content" id="training-sub-content">
<div class="tab-pane fade show active" id="training-plans" role="tabpanel">
<div class="table-responsive">
<table class="table table-hover">
<thead>
<tr>
<th>Plan Name</th>
<th>Documents</th>
<th>Assigned Users</th>
<th>Completion Rate</th>
<th>Created</th>
<th>Actions</th>
</tr>
</thead>
<tbody>
"""
for plan in plans:
completion_rate = plan.get('completion_rate', 0)
html += f"""
<tr>
<td><strong>{plan.get('name', 'Unnamed Plan')}</strong></td>
<td>{plan.get('document_count', 0)} docs</td>
<td>{plan.get('assigned_count', 0)} users</td>
<td>
<div class="progress" style="width: 100px;">
<div class="progress-bar" role="progressbar" style="width: {completion_rate}%">
{completion_rate}%
</div>
</div>
</td>
<td><small>{plan.get('created', 'Unknown')}</small></td>
<td>
<button class="btn btn-sm btn-outline-primary" onclick="editTrainingPlan('{plan.get('uid')}')">
<i class="fas fa-edit"></i>
</button>
<button class="btn btn-sm btn-outline-info" onclick="viewTrainingPlan('{plan.get('uid')}')">
<i class="fas fa-eye"></i>
</button>
</td>
</tr>
"""
html += """
</tbody>
</table>
</div>
</div>
<div class="tab-pane fade" id="training-assignments" role="tabpanel">
<div class="table-responsive">
<table class="table table-hover">
<thead>
<tr>
<th>User</th>
<th>Document</th>
<th>Assigned Date</th>
<th>Expires Date</th>
<th>Status</th>
<th>Assigned By</th>
<th>Actions</th>
</tr>
</thead>
<tbody>
"""
for assignment in assignments:
status_class = {
'REQUIRED': 'warning',
'IN_PROGRESS': 'info',
'OVERDUE': 'danger'
}.get(assignment.get('status', 'REQUIRED'), 'secondary')
html += f"""
<tr>
<td>
<div>{assignment.get('user_name', 'Unknown User')}</div>
<small class="text-muted">{assignment.get('user_email', '')}</small>
</td>
<td>
<div>{assignment.get('document_title', 'Unknown Document')}</div>
<small class="text-muted">{assignment.get('document_number', '')}</small>
</td>
<td><small>{assignment.get('assigned_date', 'Unknown')}</small></td>
<td><small>{assignment.get('expires_date', 'No expiry')}</small></td>
<td><span class="badge bg-{status_class}">{assignment.get('status', 'REQUIRED')}</span></td>
<td><small>{assignment.get('assigned_by', 'Unknown')}</small></td>
<td>
<button class="btn btn-sm btn-outline-primary" onclick="viewAssignment('{assignment.get('uid')}')">
<i class="fas fa-eye"></i>
</button>
<button class="btn btn-sm btn-outline-danger" onclick="removeAssignment('{assignment.get('uid')}')">
<i class="fas fa-trash"></i>
</button>
</td>
</tr>
"""
if not assignments:
html += """
<tr>
<td colspan="7" class="text-center text-muted py-4">
<i class="fas fa-clipboard-list fa-3x mb-3"></i>
<div>No active training assignments found</div>
</td>
</tr>
"""
html += """
</tbody>
</table>
</div>
</div>
<div class="tab-pane fade" id="training-completions" role="tabpanel">
<div class="table-responsive">
<table class="table table-hover">
<thead>
<tr>
<th>User</th>
<th>Document</th>
<th>Completed Date</th>
<th>Assigned Date</th>
<th>Score</th>
<th>Actions</th>
</tr>
</thead>
<tbody>
"""
for completion in completions:
html += f"""
<tr>
<td>
<div>{completion.get('user_name', 'Unknown User')}</div>
<small class="text-muted">{completion.get('user_email', '')}</small>
</td>
<td>
<div>{completion.get('document_title', 'Unknown Document')}</div>
<small class="text-muted">{completion.get('document_number', '')}</small>
</td>
<td><small>{completion.get('completed_date', 'Unknown')}</small></td>
<td><small>{completion.get('assigned_date', 'Unknown')}</small></td>
<td>
<span class="badge bg-success">{completion.get('score', 'N/A')}</span>
</td>
<td>
<button class="btn btn-sm btn-outline-primary" onclick="viewCompletion('{completion.get('uid')}')">
<i class="fas fa-eye"></i>
</button>
<button class="btn btn-sm btn-outline-info" onclick="downloadCertificate('{completion.get('uid')}')">
<i class="fas fa-certificate"></i>
</button>
</td>
</tr>
"""
if not completions:
html += """
<tr>
<td colspan="6" class="text-center text-muted py-4">
<i class="fas fa-medal fa-3x mb-3"></i>
<div>No completed training found</div>
</td>
</tr>
"""
html += """
</tbody>
</table>
</div>
</div>
</div>
"""
return html
def _generate_system_tab_content(self, system_config):
"""Generate HTML content for system tab."""
html = f"""
<div class="row">
<div class="col-md-6">
<h6><i class="fas fa-database me-2"></i>Database Configuration</h6>
<div class="card">
<div class="card-body">
<table class="table table-sm">
<tr>
<td>Neo4j URI:</td>
<td>{system_config.get('neo4j_uri', 'Not configured')}</td>
</tr>
<tr>
<td>Connection Status:</td>
<td><span class="badge bg-success">Connected</span></td>
</tr>
<tr>
<td>Database:</td>
<td>{system_config.get('neo4j_database', 'neo4j')}</td>
</tr>
</table>
</div>
</div>
</div>
<div class="col-md-6">
<h6><i class="fas fa-cloud me-2"></i>FileCloud Configuration</h6>
<div class="card">
<div class="card-body">
<table class="table table-sm">
<tr>
<td>Server URL:</td>
<td>{system_config.get('filecloud_url', 'Not configured')}</td>
</tr>
<tr>
<td>Connection Status:</td>
<td><span class="badge bg-success">Connected</span></td>
</tr>
<tr>
<td>Sync Enabled:</td>
<td>{'Yes' if system_config.get('sync_enabled', False) else 'No'}</td>
</tr>
</table>
</div>
</div>
</div>
</div>
<div class="row mt-4">
<div class="col-md-6">
<h6><i class="fas fa-key me-2"></i>Authentication Settings</h6>
<div class="card">
<div class="card-body">
<table class="table table-sm">
<tr>
<td>Azure SSO:</td>
<td>{'Enabled' if system_config.get('azure_sso_enabled', False) else 'Disabled'}</td>
</tr>
<tr>
<td>Local Auth:</td>
<td>{'Enabled' if system_config.get('local_auth_enabled', True) else 'Disabled'}</td>
</tr>
<tr>
<td>Session Timeout:</td>
<td>{system_config.get('session_timeout', 60)} minutes</td>
</tr>
</table>
</div>
</div>
</div>
<div class="col-md-6">
<h6><i class="fas fa-cogs me-2"></i>System Settings</h6>
<div class="card">
<div class="card-body">
<div class="mb-3">
<label class="form-label">Maintenance Mode</label>
<div class="form-check form-switch">
<input class="form-check-input" type="checkbox" id="maintenance-mode" {'checked' if system_config.get('maintenance_mode', False) else ''}>
<label class="form-check-label" for="maintenance-mode">
Enable maintenance mode
</label>
</div>
</div>
<div class="mb-3">
<label for="backup-frequency" class="form-label">Backup Frequency</label>
<select class="form-select" id="backup-frequency">
<option value="daily" {'selected' if system_config.get('backup_frequency') == 'daily' else ''}>Daily</option>
<option value="weekly" {'selected' if system_config.get('backup_frequency') == 'weekly' else ''}>Weekly</option>
<option value="monthly" {'selected' if system_config.get('backup_frequency') == 'monthly' else ''}>Monthly</option>
</select>
</div>
<button class="btn btn-primary" onclick="saveSystemSettings()">
<i class="fas fa-save me-2"></i>Save Settings
</button>
</div>
</div>
</div>
</div>
"""
return html
def _generate_audit_tab_content(self, audit_events):
"""Generate HTML content for audit tab."""
html = f"""
<div class="row mb-3">
<div class="col-md-3">
<input type="date" class="form-control" id="audit-date-from" placeholder="From date">
</div>
<div class="col-md-3">
<input type="date" class="form-control" id="audit-date-to" placeholder="To date">
</div>
<div class="col-md-3">
<select class="form-select" id="audit-action-filter">
<option value="">All Actions</option>
<option value="create">Create</option>
<option value="update">Update</option>
<option value="delete">Delete</option>
<option value="login">Login</option>
<option value="logout">Logout</option>
<option value="review">Review</option>
<option value="approve">Approve</option>
</select>
</div>
<div class="col-md-3">
<input type="text" class="form-control" placeholder="Search user..." id="audit-user-search">
</div>
</div>
<div class="table-responsive">
<table class="table table-hover table-sm">
<thead>
<tr>
<th>Timestamp</th>
<th>User</th>
<th>Action</th>
<th>Resource</th>
<th>Details</th>
<th>IP Address</th>
</tr>
</thead>
<tbody>
"""
for event in audit_events:
action = event.get('action') or ''
action_class = {
'create': 'text-success',
'update': 'text-primary',
'delete': 'text-danger',
'login': 'text-info',
'logout': 'text-secondary',
'review': 'text-warning',
'approve': 'text-success'
}.get(action.lower() if action else '', 'text-dark')
html += f"""
<tr>
<td><small>{event.get('timestamp', 'Unknown')}</small></td>
<td>{event.get('user', 'System')}</td>
<td><span class="{action_class}">{action or 'Unknown'}</span></td>
<td>{event.get('resource', 'N/A')}</td>
<td><small>{(event.get('details') or '')[:50]}{'...' if len(event.get('details') or '') > 50 else ''}</small></td>
<td><small>{event.get('ip_address', 'N/A')}</small></td>
</tr>
"""
html += """
</tbody>
</table>
</div>
"""
return html
def run(self, host='0.0.0.0', port=5000, debug=False):
"""Run the Flask application."""
self.app.run(host=host, port=port, debug=debug)
def get_app(self):
"""Get the Flask application instance."""
return self.app
Parameters
| Name | Type | Default | Kind |
|---|---|---|---|
bases |
- | - |
Parameter Details
bases: Parameter of type
Return Value
Returns unspecified type
Class Interface
Methods
__init__(self, config_path)
Purpose: Initialize the Flask application.
Parameters:
config_path: Parameter
Returns: None
_configure_app(self)
Purpose: Configure Flask application settings.
Returns: None
_init_database(self)
Purpose: Initialize database connection and schema.
Returns: None
_ensure_admin_user(self)
Purpose: Ensure admin user exists in the system.
Returns: None
_setup_authentication(self)
Purpose: Setup Flask-Login authentication.
Returns: None
_setup_azure_sso(self)
Purpose: Setup Azure SSO integration using Flask-specific configuration.
Returns: None
_register_routes(self)
Purpose: Register all Flask routes.
Returns: None
_get_azure_auth_url(self) -> Optional[str]
Purpose: Get Azure authentication URL.
Returns: Returns Optional[str]
_get_recent_activities(self) -> List[Dict[str, Any]]
Purpose: Get recent activities for dashboard.
Returns: Returns List[Dict[str, Any]]
_get_user_documents(self, user) -> List
Purpose: Get documents for a specific user using direct database query.
Parameters:
user: Type: DocUser
Returns: Returns List
_generate_users_tab_content(self, users)
Purpose: Generate HTML content for users tab.
Parameters:
users: Parameter
Returns: None
_generate_documents_tab_content(self, documents)
Purpose: Generate HTML content for documents tab.
Parameters:
documents: Parameter
Returns: None
_generate_training_tab_content(self, training_data)
Purpose: Generate HTML content for training tab.
Parameters:
training_data: Parameter
Returns: None
_generate_system_tab_content(self, system_config)
Purpose: Generate HTML content for system tab.
Parameters:
system_config: Parameter
Returns: None
_generate_audit_tab_content(self, audit_events)
Purpose: Generate HTML content for audit tab.
Parameters:
audit_events: Parameter
Returns: None
run(self, host, port, debug)
Purpose: Run the Flask application.
Parameters:
host: Parameterport: Parameterdebug: Parameter
Returns: None
get_app(self)
Purpose: Get the Flask application instance.
Returns: None
Required Imports
import os
import sys
import logging
import argparse
import json
Usage Example
# Example usage:
# result = ControlledDocumentFlaskApp(bases)
Similar Components
AI-powered semantic similarity - components with related functionality:
-
function create_app 73.4% similar
-
class ControlledDocumentApp 71.8% similar
-
class ControlledDocApp 69.0% similar
-
class CDocsApp 61.6% similar
-
class ControlledDocument 61.3% similar