class ControlledDocumentApp
Main application class for the Controlled Document Management System. This class initializes all components and provides the main Panel interface for the application. It is designed to be served via `panel serve` command and integrates with the existing datacapture application.
/tf/active/vicechatdev/CDocs/main.py
213 - 2832
moderate
Purpose
Main application class for the Controlled Document Management System. This class initializes all components and provides the main Panel interface for the application. It is designed to be served via `panel serve` command and integrates with the existing datacapture application.
Source Code
class ControlledDocumentApp(param.Parameterized):
"""
Main application class for the Controlled Document Management System.
This class initializes all components and provides the main Panel interface for
the application. It is designed to be served via `panel serve` command and
integrates with the existing datacapture application.
"""
current_user = param.Parameter(default=None)
current_view = param.String(default='dashboard')
sidebar_collapsed = param.Boolean(default=False)
# Add dialog initialization in the __init__ method
def __init__(self, config_path=None, **params):
"""
Initialize the application with optional configuration path.
Parameters
----------
config_path : str, optional
Path to configuration file. If not provided, default configuration is used.
"""
super().__init__(**params)
self.app_name = "Controlled Document Management System"
# Get version from settings or use default
self.version = getattr(settings, 'VERSION', "1.0.0")
self.debug = getattr(settings, 'DEBUG_MODE', False)
# Add environment attribute to prevent errors
self.environment = getattr(settings, 'ENVIRONMENT', 'Development')
# Initialize logging
self._setup_logging()
logger.info(f"Initializing {self.app_name} v{self.version}")
# Initialize notification area
self.notification_area = pn.pane.HTML("")
# Initialize database
self._init_database()
# Initialize Panel template and UI
self.template = self._create_panel_template()
self.notification_area = pn.pane.Markdown("", width=600)
# Main content area
self.main_content = pn.Column(sizing_mode='stretch_width')
# Notification manager
self.notification_manager = NotificationManager()
# UI components
self.document_dashboard = None
self.document_detail = None
self.review_panel = None
self.approval_panel = None
self.admin_panel = None
# Set up application layout
self._setup_layout()
# Initialize UI components
self._init_ui_components()
# Default to dashboard view
#self.load_dashboard()
logger.info(f"Application initialized: {self.app_name} v{self.version}")
# Initialize dialog component early
self.dialog = None # Will be initialized properly after UI setup
def _setup_logging(self):
"""Configure logging for the application."""
log_level = logging.DEBUG if self.debug else logging.INFO
log_format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
# Get the root logger
root_logger = logging.getLogger('CDocs')
# Only configure if no handlers exist
if not root_logger.handlers:
logger.info("Initializing logging handlers")
# Create console handler
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setFormatter(logging.Formatter(log_format))
# Set up root logger
root_logger.setLevel(log_level)
root_logger.addHandler(console_handler)
# Configure file logging if enabled
if settings.ENABLE_FILE_LOGGING:
os.makedirs(settings.LOG_DIRECTORY, exist_ok=True)
log_filename = os.path.join(
settings.LOG_DIRECTORY,
f"cdocs_{datetime.now().strftime('%Y%m%d')}.log"
)
file_handler = logging.FileHandler(log_filename)
file_handler.setFormatter(logging.Formatter(log_format))
root_logger.addHandler(file_handler)
else:
# Just update the log level if handlers already exist
root_logger.setLevel(log_level)
# Set log levels for dependencies (always do this)
logging.getLogger('urllib3').setLevel(logging.WARNING)
logging.getLogger('neo4j').setLevel(logging.WARNING)
logging.getLogger('panel').setLevel(logging.WARNING)
def _load_configuration(self, config_path: str):
"""
Load configuration from file.
Parameters
----------
config_path : str
Path to configuration file
"""
try:
with open(config_path, 'r') as f:
config_data = json.load(f)
# Update settings based on loaded configuration
for key, value in config_data.items():
if hasattr(settings, key):
setattr(settings, key, value)
logger.info(f"Configuration loaded from {config_path}")
except Exception as e:
logger.error(f"Failed to load configuration from {config_path}: {e}")
self.show_notification(f"Failed to load configuration: {str(e)}", level="error")
def _init_database(self):
"""Initialize database connection and schema."""
try:
# Use the correct imports
from CDocs.db import get_driver, init_database
# Initialize Neo4j schema
success = init_database()
if success:
logger.info("Database schema initialized successfully")
else:
logger.error("Database schema initialization failed")
# Create admin user if it doesn't exist
self._ensure_admin_user()
# Initialize SSO authentication
self._init_sso_authentication()
# Load initial data if needed
#self._load_initial_data()
# Initialize document permissions in a background thread
#self._init_document_permissions()
except Exception as e:
logger.error(f"Database initialization failed: {e}")
if hasattr(self, 'notification_area'):
self.show_notification(f"Database initialization failed: {str(e)}", level="error")
def _init_document_permissions(self):
"""
Initialize document permissions and shares.
This performs a background check to ensure all documents have correct
permissions based on their status and user roles:
- Authors/owners have write access only during DRAFT status
- Reviewers have write access only during active review periods
- Approvers have read-only access during approval cycles
"""
try:
logger.info("Starting document permission initialization")
from CDocs.controllers.permission_startup_check import run_permission_check_async, check_document_permissions_on_startup
# Run permission check in background to avoid blocking the UI
result = run_permission_check_async()
#result= check_document_permissions_on_startup()
if result.get('success', False):
logger.info("Document permission check started in background")
else:
logger.warning(f"Document permission check failed to start: {result.get('message', 'Unknown error')}")
except Exception as e:
logger.error(f"Error initializing document permissions: {str(e)}")
import traceback
logger.error(traceback.format_exc())
def _perform_sso_login_with_user_info(self, user_info: dict):
"""Logs in a user based on user_info dictionary."""
logger.info(f"Attempting to log in user from SSO info: {user_info.get('email')}")
if not user_info or not user_info.get('email'):
logger.warning("SSO login failed: Invalid or empty user_info.")
self.show_notification("SSO login failed: Incomplete user data.", level="error")
return False
try:
from CDocs.models.user_extensions import DocUser # Local import
# Check if user exists by email
user = DocUser.get_by_email(user_info['email'])
if not user:
logger.info(f"User not found by email, creating new user: {user_info['email']}")
user = DocUser.get_or_create_from_sso(
email=user_info['email'],
name=user_info.get('name', user_info['email']), # Default name to email if not present
sso_id=user_info.get('oid', None) # Azure Object ID
)
else:
logger.info(f"Found existing user by email: {user_info['email']}")
# Optionally, update existing user's SSO ID or name if changed
if user_info.get('oid') and getattr(user, 'sso_id', None) != user_info.get('oid'):
# db_operations.update_node(user.uid, {'sso_id': user_info.get('oid')}) # If you have such a function
logger.info(f"User {user.username} SSO ID updated/checked.")
if user:
self.current_user = user
logger.info(f"User {user.email} successfully logged in via SSO. Roles: {getattr(user, 'roles', 'N/A')}")
self._setup_header()
self._setup_sidebar()
# Schedule dashboard loading to ensure DOM is ready after potential redirects/cookie handling
pn.state.onload(self.load_dashboard)
return True
else:
logger.error(f"SSO login failed: User could not be created or found for email {user_info['email']}.")
self.show_notification("SSO login failed.", level="error")
return False
except Exception as e:
logger.error(f"Error during _perform_sso_login_with_user_info: {e}", exc_info=True)
self.show_notification(f"SSO login error: {e}", level="error")
return False
def _check_and_process_sso_cookie(self):
"""Checks for SSO login cookie and processes it."""
logger.debug("Checking for SSO login cookie.")
sso_user_info_json = None
request = None
if pn.state.curdoc and pn.state.curdoc.session_context:
request = pn.state.curdoc.session_context.request
if request and request.cookies:
sso_cookie_object = request.cookies.get("sso_pending_login_info")
if sso_cookie_object:
encoded_value = sso_cookie_object
logger.info("Found 'sso_pending_login_info' cookie.")
try:
# Decode the base64 encoded value
import base64
decoded_bytes = base64.b64decode(encoded_value)
sso_user_info_json = decoded_bytes.decode('utf-8')
logger.debug("Successfully decoded the SSO cookie value.")
except Exception as e:
logger.error(f"Failed to decode SSO cookie: {e}", exc_info=True)
if sso_user_info_json:
try:
user_info = json.loads(sso_user_info_json)
self._perform_sso_login_with_user_info(user_info)
# Attempt to clear cookie (this may not work reliably in Panel)
# Would be better handled by the plugin setting a short expiry time
except json.JSONDecodeError:
logger.error("Failed to decode SSO user info from cookie.", exc_info=True)
except Exception as e:
logger.error(f"Error processing SSO cookie: {e}", exc_info=True)
else:
logger.debug("No SSO login cookie found.")
def _init_sso_authentication(self):
"""
Initialize Azure SSO authentication.
This sets up the OAuth2 authentication flow with Azure AD,
allowing users to log in with their Microsoft accounts.
"""
try:
from CDocs.auth.azure_auth import setup_azure_sso
self.azure_sso = setup_azure_sso()
logger.info("Azure SSO authentication initialized")
except Exception as e:
logger.error(f"Error initializing Azure SSO authentication: {str(e)}")
import traceback
logger.error(traceback.format_exc())
self.azure_sso = None
def _ensure_admin_user(self):
"""Ensure admin user exists in the database."""
# Check if admin user exists
admin_exists = db_operations.check_node_exists('User', {'username': 'admin'})
if not admin_exists:
# Get department code for Administration
from CDocs.config import settings
admin_dept_code = settings.get_department_code('Administration')
# Create admin user
DocUser.create(
username='admin',
password=settings.DEFAULT_ADMIN_PASSWORD,
name='Administrator',
email=settings.ADMIN_EMAIL,
department=admin_dept_code, # Use the code, not the full name
role='ADMIN'
)
logger.info("Admin user created")
def _load_initial_data(self):
"""Load initial data into the database."""
# Load document types
self._ensure_document_types_exist()
# Load departments
self._ensure_departments_exist()
def _ensure_document_types_exist(self):
"""Ensure document types exist in the database."""
for doc_type in settings.DOCUMENT_TYPES:
exists = db_operations.check_node_exists('DocumentType', {'name': doc_type})
if not exists:
db_operations.create_node('DocumentType', {
'name': doc_type,
'description': f'{doc_type} document type',
'active': True
})
logger.info(f"Created document type: {doc_type}")
def _ensure_departments_exist(self):
"""Ensure departments exist in the database."""
for dept in settings.DEFAULT_DEPARTMENTS:
exists = db_operations.check_node_exists('Department', {'name': dept})
if not exists:
db_operations.create_node('Department', {
'name': dept,
'description': f'{dept} department',
'active': True
})
logger.info(f"Created department: {dept}")
def _create_panel_template(self) -> pn.Template:
"""
Create and configure the Panel template.
Returns
-------
pn.Template
Configured Panel template
"""
# Set up Panel extensions without setting a global template
pn.extension('tabulator', 'ace', 'katex') # Remove template='bootstrap' here
# Create template
template = BootstrapTemplate(
title=self.app_name,
theme=getattr(settings, 'UI_THEME', 'default'),
sidebar_width=300
)
# Configure global Panel settings
pn.config.sizing_mode = 'stretch_width'
# Add custom CSS files if specified in settings
css_files = getattr(settings, 'CSS_FILES', [])
if (css_files):
pn.config.css_files = css_files
return template
def _setup_layout(self):
"""Set up the main application layout with dynamic containers."""
# Create dynamic containers for each template section
self.header_container = pn.Column(sizing_mode='stretch_width', margin=0)
self.sidebar_container = pn.Column(sizing_mode='stretch_width', margin=0)
self.main_container = pn.Column(sizing_mode='stretch_width', margin=0)
# Add these containers to the template sections (these operations happen only once)
self.template.header.append(self.header_container)
self.template.sidebar.append(self.sidebar_container)
self.template.main.append(self.main_container)
# Initialize notification area
self.notification_area = pn.pane.Markdown("", sizing_mode="stretch_width")
# Add notification area to main container
self.main_container.append(self.notification_area)
# Add main content area (will be populated later)
self.main_content = pn.Column(sizing_mode='stretch_width')
self.main_container.append(self.main_content)
def _setup_header(self):
"""Set up the application header."""
# Clear existing content
self.header_container.clear()
# Create user info and logout button if user is logged in
if self.current_user:
user_info = pn.pane.HTML(f"""
<div class="d-flex align-items-center">
<span class="mr-2">Logged in as: <strong>{getattr(self.current_user, 'name', 'User')}</strong></span>
</div>
""")
logout_btn = pn.widgets.Button(
name="Logout",
button_type="default",
width=80
)
logout_btn.on_click(self.logout)
# Add header elements to header container
self.header_container.append(
pn.Row(
user_info,
logout_btn,
sizing_mode='stretch_width',
align='end'
)
)
else:
# If not logged in, show login button
login_btn = pn.widgets.Button(
name="Login",
button_type="primary",
width=80
)
login_btn.on_click(self.show_login)
self.header_container.append(
pn.Row(
login_btn,
sizing_mode='stretch_width',
align='end'
)
)
def _setup_sidebar(self, integrated=False):
"""Set up the sidebar with filters and actions."""
try:
# Clear existing content
self.sidebar_container.clear()
# Get environment safely
env_text = getattr(self, 'environment', 'Development')
# For integrated mode, use a more compact sidebar
if integrated:
# Add a minimal system info section
system_info = pn.Column(
pn.pane.Markdown(f"**Version:** {self.version}"),
sizing_mode='stretch_width',
margin=(10, 0, 0, 0)
)
else:
# Add more detailed system info for standalone mode
system_info = pn.Column(
pn.pane.Markdown(f"**Version:** {self.version}"),
pn.pane.Markdown(f"**Environment:** {env_text}"),
sizing_mode='stretch_width',
margin=(20, 0, 0, 0)
)
# Add navigation buttons if user is logged in
if self.current_user:
# Enhanced debugging of user information
logger.debug(f"User in _setup_sidebar: {self.current_user.username if hasattr(self.current_user, 'username') else 'Unknown'}")
logger.debug(f"User role attributes: roles={getattr(self.current_user, 'roles', 'Not present')}, role={getattr(self.current_user, 'role', 'Not present')}")
logger.debug(f"User permissions: {permissions.get_user_permissions(self.current_user)}")
# Debug user permissions
has_admin = permissions.user_has_permission(self.current_user, "ADMIN")
logger.debug(f"User {self.current_user.username} has ADMIN permission: {has_admin}")
# Create navigation buttons
dashboard_btn = pn.widgets.Button(
name="Documents Dashboard",
button_type="primary",
width=200
)
dashboard_btn.on_click(self.load_dashboard)
# Add navigation links
nav_links = pn.Column(
pn.pane.Markdown("### Navigation"),
dashboard_btn,
sizing_mode='stretch_width'
)
# Add create document button if user has permission
can_create = permissions.user_has_permission(self.current_user, "CREATE_DOCUMENT")
logger.debug(f"User {self.current_user.username} has CREATE_DOCUMENT permission: {can_create}")
if can_create:
create_btn = pn.widgets.Button(
name="Create Document",
button_type="success",
width=200
)
create_btn.on_click(self.create_document)
nav_links.append(create_btn)
# Add training dashboard button
can_view_training = permissions.user_has_permission(self.current_user, "VIEW_TRAINING")
if can_view_training:
training_dashboard_btn = pn.widgets.Button(
name="Training Dashboard",
button_type="primary",
width=200
)
training_dashboard_btn.on_click(self.navigate_to_training_dashboard)
nav_links.append(training_dashboard_btn)
# Add reviews button
reviews_btn = pn.widgets.Button(
name="My Reviews",
button_type="default",
width=200
)
reviews_btn.on_click(self.load_reviews)
nav_links.append(reviews_btn)
# Add approvals button
approvals_btn = pn.widgets.Button(
name="My Approvals",
button_type="default",
width=200
)
approvals_btn.on_click(self.load_approvals)
nav_links.append(approvals_btn)
# UPDATED: Add admin button if user has admin permission
# Try multiple approaches to check for admin permissions
admin_permission = permissions.user_has_permission(self.current_user, "ADMIN")
admin_role = hasattr(self.current_user, 'roles') and 'ADMIN' in self.current_user.roles
logger.debug(f"Admin permission check: {admin_permission}")
logger.debug(f"Admin role check: {admin_role}")
if admin_permission or admin_role:
logger.debug(f"Adding admin button for user {self.current_user.username}")
admin_btn = pn.widgets.Button(
name="Admin Panel",
button_type="warning",
width=200
)
admin_btn.on_click(self.load_admin)
nav_links.append(admin_btn)
else:
logger.debug(f"Not adding admin button for user {self.current_user.username} - lacks admin permission")
# Add navigation links to sidebar
self.sidebar_container.append(nav_links)
# Add system info at the bottom of sidebar
self.sidebar_container.append(system_info)
return True
except Exception as e:
logger.error(f"Error setting up sidebar: {e}")
logger.error(traceback.format_exc())
return False
def load_admin(self, event=None):
"""Load the admin panel."""
try:
# Stop dashboard notifications when switching to admin
if hasattr(self, 'document_dashboard') and self.document_dashboard:
try:
self.document_dashboard.stop_notifications()
except Exception as e:
logger.debug(f"Error stopping notifications: {e}")
# Update current view
self.current_view = 'admin'
logger.info("Loading admin panel")
# Clear main content
self.main_content.clear()
self.notification_area.object = ""
# Check if user is authenticated
if not self.current_user:
self.show_login()
return
# Use multiple methods to check admin access
has_admin_permission = permissions.user_has_permission(self.current_user, "ADMIN")
has_admin_role = False
if hasattr(self.current_user, 'roles') and isinstance(self.current_user.roles, (list, tuple, set)):
has_admin_role = 'ADMIN' in self.current_user.roles
logger.debug(f"User {self.current_user.username if hasattr(self.current_user, 'username') else 'Unknown'} - Admin permission: {has_admin_permission}, Admin role: {has_admin_role}")
if not (has_admin_permission or has_admin_role):
logger.warning(f"User {self.current_user.username if hasattr(self.current_user, 'username') else 'Unknown'} attempted to access admin panel without permission")
self.show_notification("You do not have permission to access the admin panel", level="error")
self.load_dashboard()
return
try:
# Initialize admin panel if needed
if not self.admin_panel:
from CDocs.ui.admin_panel import create_admin_panel
self.admin_panel = create_admin_panel(
parent_app=self, # Just pass self as parent_app
embedded=True
)
# IMPORTANT: Set up admin sidebar BEFORE setting user
# This ensures the sidebar reflects the current admin panel state
self._setup_admin_sidebar()
# Set the current user if needed
if self.admin_panel and hasattr(self.admin_panel, 'set_user'):
success = self.admin_panel.set_user(self.current_user)
if not success:
logger.error("Failed to set user in admin panel")
self.show_notification("Error setting up admin panel", level="error")
self.load_dashboard()
return
# Get admin view
if self.admin_panel and hasattr(self.admin_panel, 'get_admin_view'):
admin_view = self.admin_panel.get_admin_view()
self.main_content.append(admin_view)
logger.info("Admin panel loaded successfully")
else:
raise AttributeError("Admin panel does not have get_admin_view method")
except Exception as e:
logger.error(f"Error loading admin panel: {e}")
import traceback
logger.error(f"Traceback: {traceback.format_exc()}")
self.show_notification(f"Error loading admin panel: {e}", level="error")
# Add basic error view
from panel.pane import Markdown
self.main_content.append(Markdown("# Admin Panel"))
self.main_content.append(Markdown(f"**Error loading admin panel: {str(e)}**"))
except Exception as e:
logger.error(f"Error in load_admin: {e}")
import traceback
logger.error(f"Traceback: {traceback.format_exc()}")
self.show_notification(f"Error: {e}", level="error")
def _setup_admin_sidebar(self):
"""Set up the sidebar specifically for the admin panel."""
try:
# Clear the sidebar container
self.sidebar_container.clear()
# Create navigation buttons for admin
dashboard_btn = pn.widgets.Button(
name='System Dashboard',
button_type='primary' if self.admin_panel and getattr(self.admin_panel, 'current_tab', '') == 'dashboard' else 'default',
width=200
)
users_btn = pn.widgets.Button(
name='User Management',
button_type='primary' if self.admin_panel and getattr(self.admin_panel, 'current_tab', '') == 'users' else 'default',
width=200
)
# ADD: Notifications button for admin panel
notifications_btn = pn.widgets.Button(
name='Notifications',
button_type='primary' if self.admin_panel and getattr(self.admin_panel, 'current_tab', '') == 'notifications' else 'default',
width=200
)
departments_btn = pn.widgets.Button(
name='Departments',
button_type='primary' if self.admin_panel and getattr(self.admin_panel, 'current_tab', '') == 'departments' else 'default',
width=200
)
doc_types_btn = pn.widgets.Button(
name='Document Types',
button_type='primary' if self.admin_panel and getattr(self.admin_panel, 'current_tab', '') == 'doc_types' else 'default',
width=200
)
settings_btn = pn.widgets.Button(
name='System Settings',
button_type='primary' if self.admin_panel and getattr(self.admin_panel, 'current_tab', '') == 'settings' else 'default',
width=200
)
backup_btn = pn.widgets.Button(
name='Backup & Restore',
button_type='primary' if self.admin_panel and getattr(self.admin_panel, 'current_tab', '') == 'backup' else 'default',
width=200
)
# Define click handlers
def load_dashboard(event):
if self.admin_panel and hasattr(self.admin_panel, '_load_admin_dashboard'):
self.admin_panel._load_admin_dashboard()
self._setup_admin_sidebar() # Update sidebar buttons
def load_users(event):
if self.admin_panel and hasattr(self.admin_panel, '_load_user_management'):
self.admin_panel._load_user_management()
self._setup_admin_sidebar() # Update sidebar buttons
# ADD: Notifications handler
def load_notifications(event):
if self.admin_panel and hasattr(self.admin_panel, '_load_notification_management'):
self.admin_panel._load_notification_management()
self._setup_admin_sidebar() # Update sidebar buttons
def load_departments(event):
if self.admin_panel and hasattr(self.admin_panel, '_load_department_management'):
self.admin_panel._load_department_management()
self._setup_admin_sidebar() # Update sidebar buttons
def load_doc_types(event):
if self.admin_panel and hasattr(self.admin_panel, '_load_document_type_management'):
self.admin_panel._load_document_type_management()
self._setup_admin_sidebar() # Update sidebar buttons
def load_settings(event):
if self.admin_panel and hasattr(self.admin_panel, '_load_system_settings'):
self.admin_panel._load_system_settings()
self._setup_admin_sidebar() # Update sidebar buttons
def load_backup(event):
if self.admin_panel and hasattr(self.admin_panel, '_load_backup_restore'):
self.admin_panel._load_backup_restore()
self._setup_admin_sidebar() # Update sidebar buttons
# Attach handlers
dashboard_btn.on_click(load_dashboard)
users_btn.on_click(load_users)
notifications_btn.on_click(load_notifications) # ADD: notifications handler
departments_btn.on_click(load_departments)
doc_types_btn.on_click(load_doc_types)
settings_btn.on_click(load_settings)
backup_btn.on_click(load_backup)
# Add to navigation area - INCLUDE notifications button
navigation = pn.Column(
pn.pane.Markdown("## Admin Navigation"),
dashboard_btn,
users_btn,
notifications_btn, # ADD: notifications button
departments_btn,
doc_types_btn,
settings_btn,
backup_btn,
sizing_mode='stretch_width'
)
# Add navigation to sidebar
self.sidebar_container.append(navigation)
# Add back to dashboard button
back_btn = pn.widgets.Button(
name="← Back to Dashboard",
button_type="primary",
width=200
)
back_btn.on_click(self.load_dashboard)
self.sidebar_container.append(pn.layout.Spacer(height=20))
self.sidebar_container.append(back_btn)
# Add system status if available
if self.admin_panel and hasattr(self.admin_panel, 'status_area'):
# Clone the status area from admin panel
status_container = pn.Column(
pn.pane.Markdown("## System Status"),
sizing_mode='stretch_width',
styles={'background': '#f8f9fa'},
css_classes=['p-3', 'border', 'rounded']
)
try:
# Get statistics from admin controller
from CDocs.controllers.admin_controller import get_system_stats
stats = get_system_stats()
# Format uptime
uptime_seconds = stats.get('uptime', 0)
days, remainder = divmod(uptime_seconds, 86400)
hours, remainder = divmod(remainder, 3600)
minutes, seconds = divmod(remainder, 60)
uptime_str = f"{days}d {hours}h {minutes}m"
# Display key statistics
active_users = stats.get('active_users', 0)
total_users = stats.get('total_users', 0)
active_docs = stats.get('active_documents', 0)
total_docs = stats.get('total_documents', 0)
status_container.append(pn.pane.Markdown(f"**Users:** {active_users}/{total_users} active"))
status_container.append(pn.pane.Markdown(f"**Documents:** {active_docs}/{total_docs} active"))
status_container.append(pn.pane.Markdown(f"**Reviews:** {stats.get('pending_reviews', 0)} pending"))
status_container.append(pn.pane.Markdown(f"**Approvals:** {stats.get('pending_approvals', 0)} pending"))
status_container.append(pn.pane.Markdown(f"**Uptime:** {uptime_str}"))
self.sidebar_container.append(status_container)
except Exception as e:
logger.warning(f"Error getting system statistics: {e}")
status_container.append(pn.pane.Markdown("*Statistics unavailable*"))
self.sidebar_container.append(status_container)
return True
except Exception as e:
logger.error(f"Error setting up admin sidebar: {e}")
import traceback
logger.error(traceback.format_exc())
return False
def _init_ui_components(self):
"""Initialize UI components for the application."""
try:
# Initialize the document dashboard
from CDocs.ui.document_dashboard import DocumentDashboard
self.document_dashboard = DocumentDashboard(self)
# Initialize the document detail view (lazily loaded)
self.document_detail = None
# Initialize review panel (lazily loaded)
self.review_panel = None
# Initialize approval panel (lazily loaded)
self.approval_panel = None
# Initialize admin panel (lazily loaded)
self.admin_panel = None
logger.info("UI components initialized")
except Exception as e:
logger.error(f"Error initializing UI components: {str(e)}")
def show_notification(self, message: str, level: str = "info", duration: int = 5000):
"""
Show a notification message.
Parameters
----------
message : str
The message to display
level : str, optional
The notification level (info, warning, error, success)
duration : int, optional
How long to display the message (in milliseconds)
"""
# Set appropriate styling based on level
if level == "error":
style = "color: #dc3545; font-weight: bold;"
elif level == "warning":
style = "color: #ffc107; font-weight: bold;"
elif level == "success":
style = "color: #28a745; font-weight: bold;"
else: # info
style = "color: #17a2b8; font-weight: bold;"
# Update notification area
self.notification_area.object = f"<div style='{style}'>{message}</div>"
# Clear notification after duration if not error
if level != "error" and duration > 0:
pn.state.onload(lambda: pn.state.add_timeout(duration, self.clear_notification))
def clear_notification(self):
"""Clear the notification area."""
self.notification_area.object = ""
def load_dashboard(self, event=None):
"""Load the document dashboard."""
try:
# Update current view
self.current_view = 'dashboard'
logger.info("Loading dashboard")
# Clear main content
self.main_content.clear()
self.notification_area.object = ""
# Check if user is authenticated
if not self.current_user:
self.show_login()
return
# Initialize dashboard if needed
if not self.document_dashboard:
from CDocs.ui.document_dashboard import DocumentDashboard
self.document_dashboard = DocumentDashboard(parent_app=self)
# Set up dashboard sidebar
self._setup_sidebar()
# Set user - this might restart the notification timer, so handle it carefully
try:
self.document_dashboard.set_user(self.current_user)
except RuntimeError as e:
if "already started" in str(e):
# Timer is already running, just refresh notifications
logger.debug("Notification timer already running, refreshing notifications")
if hasattr(self.document_dashboard, '_refresh_notifications'):
self.document_dashboard._refresh_notifications()
else:
raise e
# Get dashboard view
dashboard_view = self.document_dashboard.get_dashboard_view()
self.main_content.append(dashboard_view)
logger.info("Dashboard loaded successfully")
except Exception as e:
logger.error(f"Error loading dashboard: {e}")
import traceback
logger.error(f"Traceback: {traceback.format_exc()}")
self.show_notification(f"Error loading dashboard: {e}", level="error")
def login(self, username: str = None, password: str = None, sso_token: dict = None) -> bool:
"""
Authenticate a user using either credentials or SSO token.
Parameters
----------
username : str, optional
Username or email (not required for SSO login)
password : str, optional
Password (not required for SSO login)
sso_token : dict, optional
SSO authentication token from Azure
Returns
-------
bool
True if authentication successful, False otherwise
"""
try:
# Check if we're using SSO
if sso_token:
# Add detailed logging
logger.info(f"Processing SSO login with token: {sso_token.keys()}")
# Authenticate using SSO token
from CDocs.auth.azure_auth import validate_azure_token
user_info = validate_azure_token(sso_token)
# Log the user info received from Azure
logger.info(f"SSO user info: {user_info}")
if user_info and 'email' in user_info:
# Get or create user from email
from CDocs.models.user_extensions import DocUser
# First try to find existing user by email
user = DocUser.get_by_email(user_info['email'])
if user:
logger.info(f"Found existing user: {user.username} for email: {user_info['email']}")
else:
# Create a new user if none exists
logger.info(f"Creating new user for email: {user_info['email']}")
user = DocUser.get_or_create_from_sso(
email=user_info['email'],
name=user_info.get('name', user_info['email']),
sso_id=user_info.get('oid', None) # Azure Object ID
)
if user:
# Set current user
self.current_user = user
logger.info(f"User {user.email} logged in via SSO. User roles: {getattr(user, 'roles', None)}")
# Refresh layout with logged-in user
self._setup_header()
self._setup_sidebar()
# IMPORTANT: Use onload to schedule dashboard loading after DOM is ready
pn.state.onload(self.load_dashboard)
return True
else:
logger.error(f"SSO login failed - user could not be created for email: {user_info['email']}")
else:
# Normal username/password login - continue with existing code
from CDocs.models.user_extensions import DocUser
user = DocUser.authenticate(username, password)
if user:
# Set current user
self.current_user = user
logger.info(f"User {username} logged in")
# Refresh layout with logged-in user
self._setup_header()
self._setup_sidebar()
# Reload dashboard
self.load_dashboard()
return True
else:
logger.warning(f"Failed login attempt for user {username}")
self.show_notification("Invalid username or password", level="error")
return False
except Exception as e:
logger.error(f"Login error: {str(e)}")
logger.error(traceback.format_exc())
self.show_notification(f"Login error: {str(e)}", level="error")
return False
def logout(self, event=None):
"""Log out the current user."""
if self.current_user:
username = self.current_user.username
self.current_user = None
logger.info(f"User {username} logged out")
# Refresh layout
self._setup_header()
self._setup_sidebar()
# Show login form
self.show_login()
def show_login(self, event=None):
"""Show the login form with fragment-aware SSO option."""
try:
self.main_content.clear()
logger.info("Displaying login form.")
username_input = pn.widgets.TextInput(name="Username or Email", placeholder="Enter username or email", width=300)
password_input = pn.widgets.PasswordInput(name="Password", placeholder="Enter password", width=300)
login_btn = pn.widgets.Button(name="Login", button_type="primary", width=100)
sso_btn = pn.widgets.Button(name="Sign in with Microsoft", button_type="default", width=300)
login_message = pn.pane.Markdown("", width=300)
# Handler for normal login button click
def handle_login(event):
if not username_input.value:
login_message.object = "**Error:** Username or email is required"
return
if not password_input.value:
login_message.object = "**Error:** Password is required"
return
# Show processing message
login_message.object = "Logging in..."
# Call login method
success = self.login(username_input.value, password_input.value)
if not success:
login_message.object = "**Error:** Invalid login credentials"
# Handler for SSO button click
def handle_sso_click(event):
if not hasattr(self, 'azure_sso') or not self.azure_sso:
login_message.object = "**Error:** SSO authentication is not available/configured."
return
try:
# Generate auth URL
auth_url = self.azure_sso.get_auth_url(response_mode='query')
logger.info(f"Redirecting to SSO URL: {auth_url}")
# Use a direct window location change through JavaScript
# This is the most reliable approach that doesn't depend on specific Bokeh internals
redirect_script = f"""
<script>
console.log("Redirecting to SSO authentication: {auth_url}");
window.location.href = "{auth_url}";
</script>
"""
# Show redirecting message first
self.main_content.clear()
self.main_content.append(pn.pane.Markdown("## Redirecting to Microsoft Login"))
self.main_content.append(pn.pane.Markdown("Please wait..."))
# Add the redirect script
self.main_content.append(pn.pane.HTML(redirect_script))
except Exception as e:
logger.error(f"Error getting Azure auth URL: {e}", exc_info=True)
login_message.object = f"**Error:** Failed to start SSO: {str(e)}"
# Add handlers to buttons
login_btn.on_click(handle_login)
sso_btn.on_click(handle_sso_click)
# Create login form
login_form_layout = pn.Column(
pn.pane.Markdown("## Login"),
username_input, password_input, pn.Row(login_btn),
pn.layout.Divider(),
pn.pane.Markdown("## Or use Single Sign-On"),
pn.Row(sso_btn, align='center'),
login_message,
width=400, styles={'background':'#f8f9fa'}, css_classes=['p-4', 'border', 'rounded']
)
self.main_content.append(pn.Row(login_form_layout, align='center'))
logger.info("Login form added to main content")
except Exception as e:
logger.error(f"Error showing login form: {e}")
logger.error(traceback.format_exc())
# Show a minimal error message if something goes wrong
self.main_content.clear()
self.main_content.append(pn.pane.Markdown("## Error Showing Login"))
self.main_content.append(pn.pane.Markdown(f"**Error:** {str(e)}"))
self.main_content.append(pn.pane.Markdown("Please refresh the page to try again."))
def load_dashboard(self, event=None):
"""Load the document dashboard view."""
# Update current view
self.current_view = 'dashboard'
logger.info("Loading document dashboard view")
# Add debugging for user information
if self.current_user:
logger.debug(f"Current user: {self.current_user.username}, Role: {getattr(self.current_user, 'role', 'Unknown')}")
logger.debug(f"User permissions: {permissions.get_user_permissions(self.current_user)}")
else:
logger.debug("No user currently logged in")
# Clear main content
self.main_content.clear()
self.notification_area.object = ""
# Update sidebar buttons for main dashboard
self._setup_sidebar()
# Check if user is authenticated, but skip login screen in integration mode
if not self.current_user and not getattr(self, 'integration_mode', False):
logger.info("User not logged in, showing login form")
self.show_login()
return
try:
# Initialize document dashboard if needed
if not self.document_dashboard:
logger.debug("Creating new DocumentDashboard instance")
self.document_dashboard = document_dashboard.DocumentDashboard(self)
# Set the current user
if self.current_user:
logger.debug(f"Setting user {self.current_user.username} in document dashboard")
self.document_dashboard.set_user(self.current_user)
# Load dashboard content
dashboard_view = self.document_dashboard.get_dashboard_view()
#logger.info(f'Document dashboard loaded successfully {dashboard_view}')
self.main_content.append(dashboard_view)
except Exception as e:
logger.error(f"Error loading dashboard: {str(e)}")
import traceback
logger.error(f"Traceback: {traceback.format_exc()}")
self.show_notification(f"Error loading dashboard: {str(e)}", level="error")
# Add basic error view
self.main_content.append(pn.pane.Markdown("# Document Dashboard"))
self.main_content.append(pn.pane.Markdown(f"**Error loading dashboard: {str(e)}**"))
# This continues the implementation of /CDocs/main.py
def load_document(self, document_uid: str, event=None):
"""Load a specific document."""
# Update current view
self.current_view = 'document'
logger.info(f"Loading document view for {document_uid}")
# Clear main content
self.main_content.clear()
self.notification_area.object = ""
# Check if user is authenticated
if not self.current_user:
logger.warning("Attempted to view document without logged-in user")
self.show_login()
return
try:
# Get document data
from CDocs.controllers.document_controller import get_document
document_data = get_document(document_uid=document_uid)
if not document_data:
logger.error(f"Document {document_uid} not found")
self.show_notification(f"Document not found", level="error")
self.load_dashboard()
return
# Create document detail view
self.document_detail = document_detail.DocumentDetail(parent_app=self)
# Set user
self.document_detail.set_user(self.current_user)
# Use document_uid and let the detail view fetch the data
self.document_detail.document_uid = document_uid
# Load document
loaded = self.document_detail._load_document()
if not loaded:
logger.error(f"Failed to load document {document_uid}")
self.show_notification("Failed to load document", level="error")
self.load_dashboard()
return
# Get document view
document_view = self.document_detail.get_document_view()
# Add to main content
self.main_content.append(document_view)
except Exception as e:
logger.error(f"Error loading document {document_uid}: {str(e)}")
import traceback
logger.error(f"Traceback: {traceback.format_exc()}")
self.show_notification(f"Error loading document: {str(e)}", level="error")
def load_reviews(self, event=None):
"""Load the reviews view."""
# Update current view
self.current_view = 'reviews'
logger.info("Loading reviews panel")
# Clear main content
self.main_content.clear()
self.notification_area.object = ""
# Setup review-specific sidebar
self._setup_review_sidebar()
# Check if user is authenticated
if not self.current_user:
self.show_login()
return
try:
# Stop dashboard notifications when switching views
if hasattr(self, 'document_dashboard') and self.document_dashboard:
self.document_dashboard.stop_notifications()
# Initialize review panel if needed
if not self.review_panel:
from CDocs.ui.review_panel import create_review_panel
self.review_panel = create_review_panel(parent_app=self)
else:
# Set the current user
self.review_panel.set_user(self.current_user)
self.review_panel._load_pending_reviews()
# Create a header for the reviews section
self.main_content.append(pn.pane.Markdown("# Review Management"))
# Load pending reviews
review_view = self.review_panel.get_main_content()
self.main_content.append(review_view)
except Exception as e:
logger.error(f"Error loading reviews panel: {str(e)}")
import traceback
logger.error(traceback.format_exc())
self.show_notification(f"Error loading reviews: {str(e)}", level="error")
# Add basic error view
self.main_content.append(Markdown("# Reviews"))
self.main_content.append(Markdown(f"**Error loading reviews: {str(e)}**"))
# Add back button
back_btn = pn.widgets.Button(
name="Back to Dashboard",
button_type="primary",
width=150
)
back_btn.on_click(self.load_dashboard)
self.main_content.append(back_btn)
def load_review(self, review_uid: str, event=None):
"""
Load a specific review.
Parameters
----------
review_uid : str
UID of the review to load
event : event, optional
Event that triggered this action
"""
# Update current view
self.current_view = 'review_detail'
logger.info(f"Loading review {review_uid}")
# Clear main content
self.main_content.clear()
self.notification_area.object = ""
# Setup review-specific sidebar
self._setup_review_sidebar()
# Check if user is authenticated
if not self.current_user:
self.show_login()
return
try:
# Initialize review panel if needed
if not self.review_panel:
from CDocs.ui.review_panel import create_review_panel
self.review_panel = create_review_panel(parent_app=self)
# Set the current user
self.review_panel.set_user(self.current_user)
# Load review
self.review_panel._load_review(review_uid)
review_view = self.review_panel.get_main_content()
self.main_content.append(review_view)
except Exception as e:
logger.error(f"Error loading review {review_uid}: {str(e)}")
self.show_notification(f"Error loading review: {str(e)}", level="error")
# Add basic error view
self.main_content.append(Markdown("# Review Details"))
self.main_content.append(Markdown(f"**Error loading review: {str(e)}**"))
# Add back button
back_btn = Button(name="Back to Reviews", button_type="primary", width=150)
back_btn.on_click(self.load_reviews)
self.main_content.append(Row(back_btn))
def load_approvals(self, event=None):
"""Load the approvals panel."""
# Update current view
self.current_view = 'approvals'
logger.info("Loading approvals panel")
# Clear main content
self.main_content.clear()
self.notification_area.object = ""
# Setup approval-specific sidebar
self._setup_approval_sidebar()
# Check if user is authenticated
if not self.current_user:
self.show_login()
return
try:
# Stop dashboard notifications when switching views
if hasattr(self, 'document_dashboard') and self.document_dashboard:
self.document_dashboard.stop_notifications()
# Initialize approval panel if needed
if not self.approval_panel:
# Use absolute import instead of relative import
from CDocs.ui.approval_panel import create_approval_panel
self.approval_panel = create_approval_panel(parent_app=self)
# Set the current user
self.approval_panel.set_user(self.current_user)
# Create a header for the approvals section
self.main_content.append(pn.pane.Markdown("# Approval Management"))
# Load pending approvals
self.approval_panel._load_pending_approvals()
approval_view = self.approval_panel.get_main_content()
self.main_content.append(approval_view)
except Exception as e:
logger.error(f"Error loading approvals panel: {str(e)}")
import traceback
logger.error(traceback.format_exc())
self.show_notification(f"Error loading approvals: {str(e)}", level="error")
# Add basic error view
self.main_content.append(Markdown("# Approvals"))
self.main_content.append(Markdown(f"**Error loading approvals: {str(e)}**"))
# Add back button
back_btn = pn.widgets.Button(
name="Back to Dashboard",
button_type="primary",
width=150
)
back_btn.on_click(self.load_dashboard)
self.main_content.append(back_btn)
def load_approval(self, approval_uid: str, event=None):
"""
Load a specific approval cycle.
Parameters
----------
approval_uid : str
UID of the approval cycle to load
event : event, optional
Event that triggered this action
"""
# Update current view
self.current_view = 'approval_detail'
logger.info(f"Loading approval cycle {approval_uid}")
# Clear main content
self.main_content.clear()
self.notification_area.object = ""
# Setup approval-specific sidebar
self._setup_approval_sidebar()
# Check if user is authenticated
if not self.current_user:
self.show_login()
return
try:
# Initialize approval panel if needed
if not self.approval_panel:
# Use absolute import
from CDocs.ui.approval_panel import create_approval_panel
self.approval_panel = create_approval_panel(parent_app=self)
# Set the current user
self.approval_panel.set_user(self.current_user)
# Load approval - use the updated method that works with ApprovalCycle
self.approval_panel._load_approval(approval_uid)
approval_view = self.approval_panel.get_approval_view()
self.main_content.append(approval_view)
except Exception as e:
logger.error(f"Error loading approval cycle {approval_uid}: {e}")
self.show_notification(f"Error loading approval: {str(e)}", level="error")
# Add basic error view
from panel.pane import Markdown
self.main_content.append(Markdown("# Approval Details"))
self.main_content.append(Markdown(f"**Error loading approval: {str(e)}**"))
# Add back button
from panel.widgets import Button
back_btn = Button(name="Back to Approvals", button_type="primary", width=150)
back_btn.on_click(self.load_approvals)
self.main_content.append(pn.Row(back_btn))
def _update_sidebar_buttons(self):
"""Update sidebar button states based on current view."""
# Clear sidebar and rebuild based on current view
if self.current_view == 'dashboard':
self._setup_sidebar()
elif self.current_view == 'reviews':
self._setup_review_sidebar()
elif self.current_view == 'approvals':
self._setup_approval_sidebar()
elif self.current_view == 'admin':
self._setup_admin_sidebar()
else:
# Default to standard sidebar
self._setup_sidebar()
def create_document(self, event=None):
"""Create a new document from the navigation menu."""
try:
# Use a Modal instead of Dialog for Panel 1.6.1
# Navigate directly to document dashboard and use its form instead
# If we have a document dashboard, use it to create a document
if hasattr(self, 'document_dashboard') and self.document_dashboard:
# Navigate to documents page
self.navigate_to('documents')
# Call the create document method on the dashboard
self.document_dashboard.show_create_document_form()
return
# If we don't have a document dashboard yet, create a simple modal
# using what's available in Panel 1.6.1
from CDocs.config import settings
# Use DEPARTMENTS dictionary keys for department options
departments = list(settings.DEPARTMENTS.keys())
# Create form widgets
title_input = pn.widgets.TextInput(name="Title", placeholder="Enter document title")
doc_type_select = pn.widgets.Select(name="Document Type", options=list(settings.DOCUMENT_TYPES.keys()))
department_select = pn.widgets.Select(name="Department", options=departments)
content_area = pn.widgets.TextAreaInput(name="Content", placeholder="Enter document content", rows=5)
# Buttons
create_btn = pn.widgets.Button(name="Create", button_type="primary")
cancel_btn = pn.widgets.Button(name="Cancel", button_type="default")
def handle_create(event):
try:
# Validate inputs
if not title_input.value:
self.show_notification("Document title is required", level="error")
return
if not doc_type_select.value:
self.show_notification("Document type is required", level="error")
return
# Get values from form
title = title_input.value
doc_type = settings.get_document_type_code(doc_type_select.value)
department = settings.get_department_code(department_select.value)
content = content_area.value or ""
# Create document using document controller
from CDocs.controllers.document_controller import create_document
result = create_document(
user=self.current_user,
title=title,
doc_text=content,
doc_type=doc_type,
department=department,
status='DRAFT'
)
# Check result
if result and isinstance(result, dict) and result.get('status') == 'success':
document_uid = result.get('document', {}).get('uid')
doc_number = result.get('document', {}).get('doc_number', 'Unknown')
# Get document instance for FileCloud upload
from CDocs.models.document import ControlledDocument
doc = ControlledDocument(uid=document_uid)
# Convert text content to file content
file_content = content.encode('utf-8')
file_name = f"{doc_number}_initial.txt"
# More detailed logging
logger.info(f"Uploading document {doc_number} to FileCloud with file size: {len(file_content)} bytes")
try:
# Upload to FileCloud with explicit file_name parameter
from CDocs.controllers.filecloud_controller import upload_document_to_filecloud
filecloud_result = upload_document_to_filecloud(
user=self.current_user,
document=doc,
file_content=file_content,
file_name=file_name, # Explicitly provide the file_name
version_comment="Initial version",
metadata=None
)
# Verify the FileCloud upload result
if not filecloud_result or not isinstance(filecloud_result, dict) or filecloud_result.get('status') != 'success':
error_msg = filecloud_result.get('message') if isinstance(filecloud_result, dict) else "Unknown error"
logger.error(f"FileCloud upload failed: {error_msg}")
self.show_notification(f"Document created but file upload failed: {error_msg}", level="warning")
# Continue despite file upload failure
else:
logger.info(f"FileCloud upload successful for document {doc_number}")
# Update permissions
from CDocs.controllers.share_controller import manage_document_permissions
manage_document_permissions(doc)
# Show success notification
self.show_notification(f"Document {doc_number} created successfully", level="success")
# Load document dashboard
self.load_dashboard()
except Exception as upload_error:
import traceback
logger.error(f"Error during FileCloud upload: {upload_error}")
logger.error(traceback.format_exc())
self.show_notification(f"Document created but file upload failed: {str(upload_error)}", level="warning")
self.load_dashboard()
else:
error_msg = result.get('message') if isinstance(result, dict) else "Unknown error"
self.show_notification(f"Error creating document: {error_msg}", level="error")
except Exception as e:
logger.error(f"Error creating document: {e}")
import traceback
logger.error(traceback.format_exc())
self.show_notification(f"Error creating document: {str(e)}", level="error")
# Handler for cancel button
def handle_cancel(event):
# Clear main content and return to dashboard
self.load_dashboard()
# Bind handlers
create_btn.on_click(handle_create)
cancel_btn.on_click(handle_cancel)
# Create form
form = pn.Column(
pn.pane.Markdown("# Create New Document"),
title_input,
pn.Row(doc_type_select, department_select),
content_area,
pn.Row(
pn.layout.HSpacer(),
cancel_btn,
create_btn,
),
styles={'background': '#f8f9fa'},
css_classes=['p-4', 'border', 'rounded'],
width=700
)
# Show form in main content area
self.main_content.clear()
self.main_content.append(form)
except Exception as e:
logger.error(f"Error creating document: {e}")
import traceback
logger.error(f"Traceback: {traceback.format_exc()}")
self.show_notification(f"Error creating document: {str(e)}", level="error")
def get_server_info(self) -> Dict[str, Any]:
"""
Get server information for health checks.
Returns
-------
Dict[str, Any]
Server information
"""
return {
'app_name': self.app_name,
'version': self.version,
'debug': self.debug,
'uptime': self._get_uptime(),
'database_status': self._check_database_status()
}
def _get_uptime(self) -> int:
"""
Get application uptime in seconds.
Returns
-------
int
Uptime in seconds
"""
# In a real implementation, this would track the actual start time
# For now, we'll just return a placeholder
return 0
def _check_database_status(self) -> str:
"""
Check database connection status.
Returns
-------
str
Database status ('connected', 'disconnected', or error message)
"""
try:
# Check database connection
if db_operations.test_connection():
return 'connected'
else:
return 'disconnected'
except Exception as e:
return f'error: {str(e)}'
def health_check(self) -> Dict[str, Any]:
"""
Perform a health check on the application.
Returns
-------
Dict[str, Any]
Health check results
"""
# Get server info
server_info = self.get_server_info()
# Check database status
db_status = self._check_database_status()
# Check if all components are loaded
components_status = {
'document_dashboard': self.document_dashboard is not None,
'review_panel': self.review_panel is not None,
'approval_panel': self.approval_panel is not None
}
# Check for approval cycles
approval_cycles = 0
try:
# Check if ApprovalCycle nodes exist in DB
from CDocs.db import db_operations
result = db_operations.run_query(
"MATCH (ac:ApprovalCycle) RETURN count(ac) as count"
)
if result and len(result) > 0:
approval_cycles = result[0].get('count', 0)
except Exception as e:
logger.warning(f"Could not query for approval cycles: {e}")
# Determine overall status
status = 'healthy'
if db_status != 'connected':
status = 'degraded'
if not all(components_status.values()):
status = 'degraded'
return {
'status': status,
'timestamp': datetime.now().isoformat(),
'server': server_info,
'database': db_status,
'components': components_status,
'approval_cycles': approval_cycles
}
# Update the initialize method
def initialize(self):
"""Initialize the application and handle routing."""
try:
logger.info(f"App.initialize(): Starting. Session args: {format_session_args(pn.state.session_args)}")
# Check for SSO cookie FIRST, before checking current_user or showing login
self._check_and_process_sso_cookie() # This might log in the user
# If _check_and_process_sso_cookie resulted in a login, self.current_user will be set.
if self.current_user:
logger.info(f"App.initialize(): User '{self.current_user.username}' is authenticated (possibly from SSO cookie). Loading dashboard.")
# self._setup_header() and self._setup_sidebar() are called by _perform_sso_login_with_user_info
# self.load_dashboard() is also scheduled by it.
# No further action needed here if login was successful.
else:
# No SSO cookie processed or it failed, and no existing current_user. Show login.
logger.info("App.initialize(): No authenticated user. Showing login page.")
self._setup_header() # Ensure header is set up for login button
self.show_login()
logger.info("App.initialize(): Completed.")
except Exception as e:
logger.error(f"Error during application initialization: {e}", exc_info=True)
self.show_notification("Application initialization error", level="error")
def initialize_integration(self, datacapture_app=None):
"""Initialize integration with DataCapture application."""
try:
logger.info("Initializing integration with datacapture application")
# Store reference to DataCapture app if provided
if (datacapture_app is not None):
self.datacapture_app = datacapture_app
# Set integration mode flag
self.integration_mode = True
# Ensure environment attribute exists
if not hasattr(self, 'environment'):
self.environment = "Integrated"
logger.info("Integration with datacapture application initialized")
return True
except Exception as e:
logger.error(f"Failed to initialize integration: {e}")
import traceback
logger.error(traceback.format_exc())
return False
def import_user_from_datacapture(self, dc_user):
"""Import user from DataCapture application."""
try:
if not dc_user:
logger.warning("No user provided from DataCapture")
return False
# Get user attributes for logging
user_name = getattr(dc_user, 'user', '')
user_mail = getattr(dc_user, 'mail', '')
logger.info(f"Importing user from DataCapture: {user_name} ({user_mail})")
# Check if this is a Neo4j user with proper UID
if hasattr(dc_user, 'UID') and dc_user.UID:
# Create DocUser from datacapture user or get existing user
from .models.user_extensions import DocUser
# First try to get user directly from database
doc_user = DocUser.get_by_uid(dc_user.UID)
# If not found, create a new DocUser from the datacapture user
if not doc_user:
doc_user = DocUser.from_datacapture_user(dc_user)
if not doc_user:
logger.error("Failed to create or retrieve DocUser")
return False
# Set current user
self.current_user = doc_user
# Load the dashboard
self.load_dashboard()
return True
else:
logger.error("User has no UID attribute - cannot import")
return False
except Exception as e:
logger.error(f"Error importing user from datacapture: {e}")
import traceback
logger.error(traceback.format_exc())
return False
def get_integrated_view(self):
"""
Get a view optimized for integration with datacapture.
Returns
-------
pn.viewable.Viewable
Panel viewable for integration
"""
try:
return self.main_content
except Exception as e:
logger.error(f"Error creating integrated view: {e}")
# Return a simple error message
import panel as pn
return pn.Column(
pn.pane.Markdown("## Error Loading Controlled Documents"),
pn.pane.Markdown(f"**Error:** {str(e)}")
)
def is_standalone_mode(self):
"""Determine if CDocs is running in standalone mode (not embedded in datacapture).
Returns
-------
bool
True if running standalone, False if embedded
"""
# Check if integration_mode attribute has been set, which indicates embedding
return not hasattr(self, 'integration_mode') or not self.integration_mode
def _setup_navigation(self, integrated=False):
"""Set up navigation buttons for the application."""
try:
# Clear existing navigation
if hasattr(self, 'nav_container'):
self.nav_container.clear()
else:
self.nav_container = pn.Row(sizing_mode='stretch_width')
self.header_container.append(self.nav_container)
# Create navigation buttons
dashboard_btn = pn.widgets.Button(
name="Documents",
button_type="primary",
width=150
)
dashboard_btn.on_click(self.load_dashboard)
# Create more compact set of buttons for integrated mode
if integrated:
self.nav_container.append(dashboard_btn)
else:
# Create full navigation for standalone mode
create_doc_btn = pn.widgets.Button(
name="Create Document",
button_type="success",
width=150
)
create_doc_btn.on_click(self.create_document)
reviews_btn = pn.widgets.Button(
name="Reviews",
button_type="default",
width=150
)
reviews_btn.on_click(self.load_reviews)
approvals_btn = pn.widgets.Button(
name="Approvals",
button_type="default",
width=150
)
approvals_btn.on_click(self.load_approvals)
admin_btn = pn.widgets.Button(
name="Admin",
button_type="warning",
width=150
)
admin_btn.on_click(self.load_admin)
self.nav_container.extend([
dashboard_btn,
create_doc_btn,
reviews_btn,
approvals_btn,
admin_btn
])
except Exception as e:
logger.error(f"Error setting up navigation: {e}")
import traceback
logger.error(traceback.format_exc())
def handle_datacapture_document(self, document_data, event=None):
"""
Handle a document submitted from datacapture application.
Parameters
----------
document_data : Dict[str, Any]
Document data from datacapture
event : event, optional
Event that triggered this action
Returns
-------
Dict[str, Any]
Result of document processing
"""
try:
logger.info(f"Processing document from datacapture: {document_data.get('title', 'Untitled')}")
# Extract document metadata
doc_title = document_data.get('title', 'Untitled Document')
doc_content = document_data.get('content', '')
doc_type = document_data.get('doc_type', 'GENERAL')
department = document_data.get('department', 'General')
# Set creator to current user or admin if no user is logged in
creator = self.current_user
if not creator:
# Handle case where no user is logged in
logger.warning("No user logged in for document creation - using admin")
from CDocs.models.user_extensions import DocUser
creator = DocUser.get_admin_user()
# Create document in the controlled document system
# Note the parameter name doc_text used correctly here
result = document_controller.create_document(
user=creator,
title=doc_title,
doc_text=doc_content, # Changed from content to doc_text
doc_type=doc_type,
department=department,
status='DRAFT'
)
# Validate result before accessing
if not result or not isinstance(result, dict):
logger.error(f"Invalid result from create_document: {result}")
self.show_notification("Error creating document: Invalid result from server", level="error")
return {'error': 'Invalid result from server'}
# Check for document data in the result
if 'document' not in result:
logger.error(f"Missing document data in result: {result}")
self.show_notification("Error creating document: Missing document data", level="error")
return {'error': 'Missing document data in result'}
# Show success notification
document_uid = result.get('document', {}).get('uid')
doc_number = result.get('document', {}).get('doc_number')
if document_uid:
success_message = f"Document {doc_number} created successfully" if doc_number else "Document created successfully"
self.show_notification(success_message, level="success")
# Optionally load the document detail view
self.load_document(document_uid)
# Create notification for document creation
if result and self.current_user:
document_uid = result.get('document', {}).get('uid')
doc_number = result.get('document', {}).get('doc_number')
if document_uid and hasattr(self.current_user, 'uid'):
self.notification_manager.create_notification(
notification_type='DOCUMENT_CREATED',
user_uid=self.current_user.uid,
resource_uid=document_uid,
resource_type='DOCUMENT',
message=f"Document {doc_number} created successfully",
details={
'doc_number': doc_number,
'title': document_data.get('title', 'Untitled'),
'created_date': datetime.now().isoformat()
}
)
return result
except Exception as e:
logger.error(f"Error processing document from datacapture: {str(e)}")
logger.error(traceback.format_exc())
self.show_notification(f"Error processing document: {str(e)}", level="error")
return {'error': str(e)}
def login_user(self, user):
"""
Set the current user and update UI accordingly.
Parameters
----------
user : DocUser
The user to log in
"""
try:
# Set current user
self.current_user = user
logger.info(f"User {user.username} logged in")
# Create welcome notification
if hasattr(user, 'uid'):
self.notification_manager.create_notification(
notification_type='SYSTEM_NOTIFICATION',
user_uid=user.uid,
message=f"Welcome back, {user.username}!",
details={'login_time': datetime.now().isoformat()}
)
# Refresh layout with logged-in user
self._setup_header()
self._setup_sidebar()
# Initialize UI components that depend on user
self._init_ui_components()
# Reload dashboard
if hasattr(self, 'document_dashboard') and self.document_dashboard:
self.document_dashboard.set_user(user)
self.load_dashboard()
return True
except Exception as e:
logger.error(f"Error logging in user: {e}")
return False
def servable(self, title="Controlled Documents"):
"""
Make the application servable in standalone mode.
Parameters
----------
title : str
The title for the application
Returns
-------
panel.viewable.Viewable
The servable application
"""
import panel as pn
if self.is_standalone_mode():
logger.info("Starting CDocs in standalone mode")
return self.template.servable(title)
else:
logger.info("CDocs is running in embedded mode")
return self.main_content.servable()
def _init_review_panel(self):
"""Initialize the review panel."""
from .ui.review_panel import create_review_panel
self.review_panel = create_review_panel(parent_app=self)
def _setup_review_sidebar(self):
"""Set up the sidebar specifically for the reviews panel."""
try:
# Clear the sidebar container
self.sidebar_container.clear()
# Create navigation buttons for reviews
my_reviews_btn = pn.widgets.Button(
name='My Pending Reviews',
button_type='primary',
width=200
)
completed_reviews_btn = pn.widgets.Button(
name='My Completed Reviews',
button_type='default',
width=200
)
# Define click handlers
def load_my_reviews(event):
if hasattr(self.review_panel, '_load_pending_reviews'):
self.review_panel._load_pending_reviews()
def load_completed_reviews(event):
if hasattr(self.review_panel, '_load_completed_reviews'):
self.review_panel._load_completed_reviews()
# Attach handlers
my_reviews_btn.on_click(load_my_reviews)
completed_reviews_btn.on_click(load_completed_reviews)
# Add "All Reviews" button if user has permission
if self.current_user and permissions.user_has_permission(self.current_user, "MANAGE_REVIEWS"):
all_reviews_btn = pn.widgets.Button(
name='All Reviews',
button_type='default',
width=200
)
def load_all_reviews(event):
if hasattr(self.review_panel, '_load_all_reviews'):
self.review_panel._load_all_reviews()
all_reviews_btn.on_click(load_all_reviews)
# Add to navigation area
navigation = pn.Column(
pn.pane.Markdown("## Review Navigation"),
my_reviews_btn,
completed_reviews_btn,
all_reviews_btn,
sizing_mode='stretch_width'
)
else:
# Add to navigation area without all reviews button
navigation = pn.Column(
pn.pane.Markdown("## Review Navigation"),
my_reviews_btn,
completed_reviews_btn,
sizing_mode='stretch_width'
)
# Add navigation to sidebar
self.sidebar_container.append(navigation)
# Add back to dashboard button
back_btn = pn.widgets.Button(
name="← Back to Dashboard",
button_type="primary",
width=200
)
back_btn.on_click(self.load_dashboard)
self.sidebar_container.append(pn.layout.Spacer(height=20))
self.sidebar_container.append(back_btn)
# Add review statistics
# try:
# stats_container = pn.Column(
# pn.pane.Markdown("## Review Statistics"),
# sizing_mode='stretch_width',
# styles={'background': '#f8f9fa'},
# css_classes=['p-3', 'border', 'rounded']
# )
# # Get statistics from review controller
# from CDocs.controllers.review_controller import get_review_statistics
# stats = get_review_statistics()
# # Display key statistics
# pending_reviews = stats.get('statistics', {}).get('pending_reviews', 0)
# completed_reviews = stats.get('statistics', {}).get('completed_reviews', 0)
# approval_rate = stats.get('statistics', {}).get('review_approval_rate', 0)
# stats_container.append(pn.pane.Markdown(f"**Pending Reviews:** {pending_reviews}"))
# stats_container.append(pn.pane.Markdown(f"**Completed Reviews:** {completed_reviews}"))
# stats_container.append(pn.pane.Markdown(f"**Approval Rate:** {approval_rate:.1f}%"))
# self.sidebar_container.append(stats_container)
# except Exception as e:
# logger.warning(f"Error getting review statistics: {e}")
# stats_container = pn.Column(
# pn.pane.Markdown("## Review Statistics"),
# pn.pane.Markdown("*Statistics unavailable*"),
# sizing_mode='stretch_width',
# styles={'background': '#f8f9fa'},
# css_classes=['p-3', 'border', 'rounded']
# )
# self.sidebar_container.append(stats_container)
return True
except Exception as e:
logger.error(f"Error setting up review sidebar: {e}")
import traceback
logger.error(traceback.format_exc())
return False
def _setup_approval_sidebar(self):
"""Set up the sidebar specifically for the approvals panel."""
try:
# Clear the sidebar container
self.sidebar_container.clear()
# Create navigation buttons for approvals
my_approvals_btn = pn.widgets.Button(
name='My Pending Approvals',
button_type='primary',
width=200
)
completed_approvals_btn = pn.widgets.Button(
name='My Completed Approvals',
button_type='default',
width=200
)
# Define click handlers
def load_my_approvals(event):
if hasattr(self.approval_panel, '_load_pending_approvals'):
self.approval_panel._load_pending_approvals()
def load_completed_approvals(event):
if hasattr(self.approval_panel, '_load_completed_approvals'):
self.approval_panel._load_completed_approvals()
# Attach handlers
my_approvals_btn.on_click(load_my_approvals)
completed_approvals_btn.on_click(load_completed_approvals)
# Add "All Approvals" button if user has permission
if self.current_user and permissions.user_has_permission(self.current_user, "MANAGE_APPROVALS"):
all_approvals_btn = pn.widgets.Button(
name='All Approvals',
button_type='default',
width=200
)
def load_all_approvals(event):
if hasattr(self.approval_panel, '_load_all_approvals'):
self.approval_panel._load_all_approvals()
all_approvals_btn.on_click(load_all_approvals)
# Add to navigation area
navigation = pn.Column(
pn.pane.Markdown("## Approval Navigation"),
my_approvals_btn,
completed_approvals_btn,
all_approvals_btn,
sizing_mode='stretch_width'
)
else:
# Add to navigation area without all approvals button
navigation = pn.Column(
pn.pane.Markdown("## Approval Navigation"),
my_approvals_btn,
completed_approvals_btn,
sizing_mode='stretch_width'
)
# Add navigation to sidebar
self.sidebar_container.append(navigation)
# Add back to dashboard button
back_btn = pn.widgets.Button(
name="← Back to Dashboard",
button_type="primary",
width=200
)
back_btn.on_click(self.load_dashboard)
self.sidebar_container.append(pn.layout.Spacer(height=20))
self.sidebar_container.append(back_btn)
# Add approval statistics - updated to work with new ApprovalCycle model
try:
stats_container = pn.Column(
pn.pane.Markdown("## Approval Statistics"),
sizing_mode='stretch_width',
styles={'background': '#f8f9fa'},
css_classes=['p-3', 'border', 'rounded']
)
# Get statistics from approval controller using updated statistics method
from CDocs.controllers.approval_controller import get_approval_statistics
stats = get_approval_statistics(user=self.current_user)
# Display key statistics with updated statistic names
pending_count = stats.get('statistics', {}).get('pending_count', 0)
completed_count = stats.get('statistics', {}).get('completed_count', 0)
approval_rate = stats.get('statistics', {}).get('approval_rate', 0)
stats_container.append(pn.pane.Markdown(f"**Pending Approvals:** {pending_count}"))
stats_container.append(pn.pane.Markdown(f"**Completed Approvals:** {completed_count}"))
stats_container.append(pn.pane.Markdown(f"**Approval Rate:** {approval_rate:.1f}%"))
self.sidebar_container.append(stats_container)
except Exception as e:
logger.warning(f"Error getting approval statistics: {e}")
stats_container = pn.Column(
pn.pane.Markdown("## Approval Statistics"),
pn.pane.Markdown("*Statistics unavailable*"),
sizing_mode='stretch_width',
styles={'background': '#f8f9fa'},
css_classes=['p-3', 'border', 'rounded']
)
self.sidebar_container.append(stats_container)
return True
except Exception as e:
logger.error(f"Error setting up approval sidebar: {e}")
import traceback
logger.error(traceback.format_exc())
return False
def _init_approval_panel(self):
"""Initialize the approval panel with the updated approval cycle model."""
# Use absolute import instead of relative import
from CDocs.ui.approval_panel import create_approval_panel
self.approval_panel = create_approval_panel(parent_app=self)
def navigate_to_training(self, document_uid: str):
"""Navigate to training completion for a specific document."""
try:
# Update current view
self.current_view = 'training_completion'
logger.info(f"Navigating to training completion for document {document_uid}")
# Clear main content
self.main_content.clear()
self.notification_area.object = ""
# Check if user is authenticated
if not self.current_user:
self.show_login()
return
# Stop dashboard notifications when switching views
if hasattr(self, 'document_dashboard') and self.document_dashboard:
self.document_dashboard.stop_notifications()
# Create training completion component
from CDocs.ui.training_completion import TrainingCompletion
training_completion = TrainingCompletion(parent_app=self, document_uid=document_uid)
training_completion.set_user_and_document(self.current_user, document_uid)
# Add to main content
self.main_content.append(training_completion.get_view())
except Exception as e:
logger.error(f"Error navigating to training completion: {e}")
import traceback
logger.error(f"Traceback: {traceback.format_exc()}")
self.show_notification(f"Error loading training completion: {str(e)}", level="error")
def navigate_to_training_dashboard(self, event=None):
"""Navigate to the training dashboard."""
try:
# Update current view
self.current_view = 'training_dashboard'
logger.info("Navigating to training dashboard")
# Clear main content
self.main_content.clear()
self.notification_area.object = ""
# Check if user is authenticated
if not self.current_user:
self.show_login()
return
# Stop dashboard notifications when switching views
if hasattr(self, 'document_dashboard') and self.document_dashboard:
self.document_dashboard.stop_notifications()
# Create training dashboard component
from CDocs.ui.training_dashboard import TrainingDashboard
training_dashboard = TrainingDashboard(parent_app=self)
training_dashboard.set_user(self.current_user)
# Add to main content
self.main_content.append(training_dashboard.get_view())
except Exception as e:
logger.error(f"Error navigating to training dashboard: {e}")
import traceback
logger.error(f"Traceback: {traceback.format_exc()}")
self.show_notification(f"Error loading training dashboard: {str(e)}", level="error")
def navigate_to_training_management(self, document_uid: str):
"""Navigate to training management for a specific document."""
try:
# Update current view
self.current_view = 'training_management'
logger.info(f"Navigating to training management for document {document_uid}")
# Clear main content
self.main_content.clear()
self.notification_area.object = ""
# Check if user is authenticated
if not self.current_user:
self.show_login()
return
# Create training management component
from CDocs.ui.training_management import create_training_management
training_management = create_training_management(parent_app=self, document_uid=document_uid)
training_management.set_user_and_document(self.current_user, document_uid)
# Add to main content
self.main_content.append(training_management.get_view())
except Exception as e:
logger.error(f"Error navigating to training management: {e}")
self.show_notification(f"Error loading training management: {str(e)}", level="error")
def navigate_to(self, view_name, document_uid=None):
"""Navigate to a specific view."""
try:
if view_name == 'dashboard':
self.load_dashboard()
elif view_name == 'documents':
self.load_dashboard() # Documents and dashboard are the same
elif view_name == 'reviews':
self.load_reviews()
elif view_name == 'approvals':
self.load_approvals()
elif view_name == 'admin':
self.load_admin()
elif view_name == 'training_dashboard':
self.navigate_to_training_dashboard()
elif view_name == 'training_completion':
# Handle training completion with document ID
if document_uid:
self.navigate_to_training_completion(document_uid)
elif hasattr(self, 'current_document_uid') and self.current_document_uid:
self.navigate_to_training_completion(self.current_document_uid)
else:
self.show_notification("Training completion requires a document ID", level="error")
else:
logger.warning(f"Unknown view name: {view_name}")
self.show_notification(f"Unknown view: {view_name}", level="error")
except Exception as e:
logger.error(f"Error navigating to {view_name}: {e}")
import traceback
logger.error(f"Traceback: {traceback.format_exc()}")
self.show_notification(f"Error navigating to {view_name}: {str(e)}", level="error")
def navigate_to_training_completion(self, document_uid: str):
"""Navigate to training completion for a specific document."""
try:
# Update current view
self.current_view = 'training_completion'
self.current_document_uid = document_uid
logger.info(f"Navigating to training completion for document {document_uid}")
# Clear main content
self.main_content.clear()
self.notification_area.object = ""
# Check if user is authenticated
if not self.current_user:
self.show_login()
return
# Stop dashboard notifications when switching views
if hasattr(self, 'document_dashboard') and self.document_dashboard:
self.document_dashboard.stop_notifications()
# Create training completion component
from CDocs.ui.training_completion import TrainingCompletion
training_completion = TrainingCompletion(parent_app=self, document_uid=document_uid)
training_completion.set_user_and_document(self.current_user, document_uid)
# Add to main content
self.main_content.append(training_completion.get_view())
except Exception as e:
logger.error(f"Error navigating to training completion: {e}")
import traceback
logger.error(f"Traceback: {traceback.format_exc()}")
self.show_notification(f"Error loading training completion: {str(e)}", level="error")
def handle_sso_callback(self):
"""
Handle the callback from Azure SSO authentication.
"""
try:
logger.info("Processing SSO callback")
# Get query parameters more safely
auth_code = None
if pn.state.session_args and b'code' in pn.state.session_args:
auth_code = pn.state.session_args[b'code'][0].decode('utf-8')
# Log a sanitized version of the code
sanitized_code = auth_code[:5] + "..." + auth_code[-5:] if len(auth_code) > 10 else "***"
logger.info(f"Found authorization code in session args (length: {len(auth_code)}, preview: {sanitized_code})")
if not auth_code:
logger.error("No authorization code received in SSO callback")
# Create a new error view instead of modifying existing content
error_view = pn.Column(
pn.pane.Markdown("## Authentication Error"),
pn.pane.Markdown("No authorization code received from identity provider."),
pn.widgets.Button(name="Return to Login", button_type="primary", on_click=self.show_login)
)
# Replace main content safely
if hasattr(self, 'main_content'):
self.main_content.clear()
self.main_content.append(error_view)
else:
logger.error("No main_content attribute in handle_sso_callback")
return
# Exchange authorization code for tokens
if hasattr(self, 'azure_sso') and self.azure_sso:
logger.info(f"Exchanging auth code for token (code length: {len(auth_code)})")
token_response = self.azure_sso.get_token_from_code(auth_code)
if token_response and 'access_token' in token_response:
logger.info("Successfully obtained access token")
# Create a loading indicator while login is processing
if hasattr(self, 'main_content'):
self.main_content.clear()
self.main_content.append(pn.Column(
pn.pane.Markdown("## Authenticating..."),
pn.indicators.LoadingSpinner(value=True, width=50, height=50),
pn.pane.Markdown("Please wait while we complete your login."),
sizing_mode='stretch_width',
align='center'
))
# Use the token to login - this is the critical part
success = self.login(username=None, sso_token=token_response)
if success:
logger.info("SSO login successful")
# IMPORTANT: Don't call load_dashboard here, it's already scheduled in login
return
else:
logger.error("SSO login failed after token exchange")
else:
logger.error(f"Failed to exchange authorization code for token: {token_response}")
# Create error view
error_view = pn.Column(
pn.pane.Markdown("## Authentication Error"),
pn.pane.Markdown("Failed to exchange authorization code for token."),
pn.widgets.Button(name="Return to Login", button_type="primary", on_click=self.show_login)
)
# Replace main content safely
if hasattr(self, 'main_content'):
self.main_content.clear()
self.main_content.append(error_view)
return
else:
logger.error("SSO is not initialized")
# Create error view
error_view = pn.Column(
pn.pane.Markdown("## Authentication Error"),
pn.pane.Markdown("SSO authentication is not configured properly."),
pn.widgets.Button(name="Return to Login", button_type="primary", on_click=self.show_login)
)
# Replace main content safely
if hasattr(self, 'main_content'):
self.main_content.clear()
self.main_content.append(error_view)
return
# If we reach here, authentication failed - show login page
logger.info("Authentication failed, returning to login page")
self.show_login()
except Exception as e:
logger.error(f"Error handling SSO callback: {str(e)}")
import traceback
logger.error(traceback.format_exc())
# Create error view with return button
error_view = pn.Column(
pn.pane.Markdown("## Authentication Error"),
pn.pane.Markdown(f"An error occurred during authentication: {str(e)}"),
pn.widgets.Button(name="Return to Login", button_type="primary", on_click=self.show_login)
)
# Replace main content safely
if hasattr(self, 'main_content'):
self.main_content.clear()
self.main_content.append(error_view)
Parameters
| Name | Type | Default | Kind |
|---|---|---|---|
bases |
param.Parameterized | - |
Parameter Details
bases: Parameter of type param.Parameterized
Return Value
Returns unspecified type
Class Interface
Methods
__init__(self, config_path)
Purpose: Initialize the application with optional configuration path. Parameters ---------- config_path : str, optional Path to configuration file. If not provided, default configuration is used.
Parameters:
config_path: Parameter
Returns: None
_setup_logging(self)
Purpose: Configure logging for the application.
Returns: None
_load_configuration(self, config_path)
Purpose: Load configuration from file. Parameters ---------- config_path : str Path to configuration file
Parameters:
config_path: Type: str
Returns: None
_init_database(self)
Purpose: Initialize database connection and schema.
Returns: None
_init_document_permissions(self)
Purpose: Initialize document permissions and shares. This performs a background check to ensure all documents have correct permissions based on their status and user roles: - Authors/owners have write access only during DRAFT status - Reviewers have write access only during active review periods - Approvers have read-only access during approval cycles
Returns: None
_perform_sso_login_with_user_info(self, user_info)
Purpose: Logs in a user based on user_info dictionary.
Parameters:
user_info: Type: dict
Returns: None
_check_and_process_sso_cookie(self)
Purpose: Checks for SSO login cookie and processes it.
Returns: None
_init_sso_authentication(self)
Purpose: Initialize Azure SSO authentication. This sets up the OAuth2 authentication flow with Azure AD, allowing users to log in with their Microsoft accounts.
Returns: None
_ensure_admin_user(self)
Purpose: Ensure admin user exists in the database.
Returns: None
_load_initial_data(self)
Purpose: Load initial data into the database.
Returns: None
_ensure_document_types_exist(self)
Purpose: Ensure document types exist in the database.
Returns: None
_ensure_departments_exist(self)
Purpose: Ensure departments exist in the database.
Returns: None
_create_panel_template(self) -> pn.Template
Purpose: Create and configure the Panel template. Returns ------- pn.Template Configured Panel template
Returns: Returns pn.Template
_setup_layout(self)
Purpose: Set up the main application layout with dynamic containers.
Returns: None
_setup_header(self)
Purpose: Set up the application header.
Returns: None
_setup_sidebar(self, integrated)
Purpose: Set up the sidebar with filters and actions.
Parameters:
integrated: Parameter
Returns: None
load_admin(self, event)
Purpose: Load the admin panel.
Parameters:
event: Parameter
Returns: None
_setup_admin_sidebar(self)
Purpose: Set up the sidebar specifically for the admin panel.
Returns: None
_init_ui_components(self)
Purpose: Initialize UI components for the application.
Returns: None
show_notification(self, message, level, duration)
Purpose: Show a notification message. Parameters ---------- message : str The message to display level : str, optional The notification level (info, warning, error, success) duration : int, optional How long to display the message (in milliseconds)
Parameters:
message: Type: strlevel: Type: strduration: Type: int
Returns: None
clear_notification(self)
Purpose: Clear the notification area.
Returns: None
load_dashboard(self, event)
Purpose: Load the document dashboard.
Parameters:
event: Parameter
Returns: None
login(self, username, password, sso_token) -> bool
Purpose: Authenticate a user using either credentials or SSO token. Parameters ---------- username : str, optional Username or email (not required for SSO login) password : str, optional Password (not required for SSO login) sso_token : dict, optional SSO authentication token from Azure Returns ------- bool True if authentication successful, False otherwise
Parameters:
username: Type: strpassword: Type: strsso_token: Type: dict
Returns: Returns bool
logout(self, event)
Purpose: Log out the current user.
Parameters:
event: Parameter
Returns: None
show_login(self, event)
Purpose: Show the login form with fragment-aware SSO option.
Parameters:
event: Parameter
Returns: None
load_document(self, document_uid, event)
Purpose: Load a specific document.
Parameters:
document_uid: Type: strevent: Parameter
Returns: None
load_reviews(self, event)
Purpose: Load the reviews view.
Parameters:
event: Parameter
Returns: None
load_review(self, review_uid, event)
Purpose: Load a specific review. Parameters ---------- review_uid : str UID of the review to load event : event, optional Event that triggered this action
Parameters:
review_uid: Type: strevent: Parameter
Returns: None
load_approvals(self, event)
Purpose: Load the approvals panel.
Parameters:
event: Parameter
Returns: None
load_approval(self, approval_uid, event)
Purpose: Load a specific approval cycle. Parameters ---------- approval_uid : str UID of the approval cycle to load event : event, optional Event that triggered this action
Parameters:
approval_uid: Type: strevent: Parameter
Returns: None
_update_sidebar_buttons(self)
Purpose: Update sidebar button states based on current view.
Returns: None
create_document(self, event)
Purpose: Create a new document from the navigation menu.
Parameters:
event: Parameter
Returns: None
get_server_info(self) -> Dict[str, Any]
Purpose: Get server information for health checks. Returns ------- Dict[str, Any] Server information
Returns: Returns Dict[str, Any]
_get_uptime(self) -> int
Purpose: Get application uptime in seconds. Returns ------- int Uptime in seconds
Returns: Returns int
_check_database_status(self) -> str
Purpose: Check database connection status. Returns ------- str Database status ('connected', 'disconnected', or error message)
Returns: Returns str
health_check(self) -> Dict[str, Any]
Purpose: Perform a health check on the application. Returns ------- Dict[str, Any] Health check results
Returns: Returns Dict[str, Any]
initialize(self)
Purpose: Initialize the application and handle routing.
Returns: None
initialize_integration(self, datacapture_app)
Purpose: Initialize integration with DataCapture application.
Parameters:
datacapture_app: Parameter
Returns: None
import_user_from_datacapture(self, dc_user)
Purpose: Import user from DataCapture application.
Parameters:
dc_user: Parameter
Returns: None
get_integrated_view(self)
Purpose: Get a view optimized for integration with datacapture. Returns ------- pn.viewable.Viewable Panel viewable for integration
Returns: See docstring for return details
is_standalone_mode(self)
Purpose: Determine if CDocs is running in standalone mode (not embedded in datacapture). Returns ------- bool True if running standalone, False if embedded
Returns: See docstring for return details
_setup_navigation(self, integrated)
Purpose: Set up navigation buttons for the application.
Parameters:
integrated: Parameter
Returns: None
handle_datacapture_document(self, document_data, event)
Purpose: Handle a document submitted from datacapture application. Parameters ---------- document_data : Dict[str, Any] Document data from datacapture event : event, optional Event that triggered this action Returns ------- Dict[str, Any] Result of document processing
Parameters:
document_data: Parameterevent: Parameter
Returns: See docstring for return details
login_user(self, user)
Purpose: Set the current user and update UI accordingly. Parameters ---------- user : DocUser The user to log in
Parameters:
user: Parameter
Returns: None
servable(self, title)
Purpose: Make the application servable in standalone mode. Parameters ---------- title : str The title for the application Returns ------- panel.viewable.Viewable The servable application
Parameters:
title: Parameter
Returns: See docstring for return details
_init_review_panel(self)
Purpose: Initialize the review panel.
Returns: None
_setup_review_sidebar(self)
Purpose: Set up the sidebar specifically for the reviews panel.
Returns: None
_setup_approval_sidebar(self)
Purpose: Set up the sidebar specifically for the approvals panel.
Returns: None
_init_approval_panel(self)
Purpose: Initialize the approval panel with the updated approval cycle model.
Returns: None
navigate_to_training(self, document_uid)
Purpose: Navigate to training completion for a specific document.
Parameters:
document_uid: Type: str
Returns: None
navigate_to_training_dashboard(self, event)
Purpose: Navigate to the training dashboard.
Parameters:
event: Parameter
Returns: None
navigate_to_training_management(self, document_uid)
Purpose: Navigate to training management for a specific document.
Parameters:
document_uid: Type: str
Returns: None
navigate_to(self, view_name, document_uid)
Purpose: Navigate to a specific view.
Parameters:
view_name: Parameterdocument_uid: Parameter
Returns: None
navigate_to_training_completion(self, document_uid)
Purpose: Navigate to training completion for a specific document.
Parameters:
document_uid: Type: str
Returns: None
handle_sso_callback(self)
Purpose: Handle the callback from Azure SSO authentication.
Returns: None
Required Imports
import panel as pn
import param
import logging
import sys
import os
Usage Example
# Example usage:
# result = ControlledDocumentApp(bases)
Similar Components
AI-powered semantic similarity - components with related functionality:
-
class ControlledDocApp 80.3% similar
-
class CDocsApp 76.0% similar
-
class ControlledDocumentFlaskApp 71.8% similar
-
function main_v19 66.8% similar
-
class Application 58.2% similar