class DocumentDashboard
Dashboard for viewing and managing controlled documents.
/tf/active/vicechatdev/CDocs/ui/document_dashboard.py
39 - 2187
moderate
Purpose
Dashboard for viewing and managing controlled documents.
Source Code
class DocumentDashboard(param.Parameterized):
"""Dashboard for viewing and managing controlled documents."""
doc_selected = param.String(default='')
search_query = param.String(default='')
doc_type_filter = param.Parameter() # Use Parameter to accept any type of value
department_filter = param.String(default='')
status_filter = param.String(default='')
date_from = param.Date(default=None)
date_to = param.Date(default=None)
def __init__(self, parent_app=None, **params):
super().__init__(**params)
self.parent_app = parent_app
self.user = None # Will be set by parent app
self.document_list = None
self._view_handler_added = False
# Initialize template attribute to None - avoid AttributeError
self.template = None
# Initialize notification area
self.notification_area = pn.pane.Markdown("")
# Add this line to initialize the document_list_container
self.document_list_container = pn.Column(sizing_mode='stretch_width')
# Add this line to initialize the tasks panel
self.tasks_panel = create_user_tasks_panel(parent_app=parent_app)
self.tasks_panel_container = pn.Column(sizing_mode='stretch_width')
# Initialize document type and status lists
from CDocs.config import settings
# Use full names for document types in dropdowns
self.document_types = list(settings.DOCUMENT_TYPES.keys())
# Use full names for statuses in dropdowns
self.document_statuses = list(settings.DOCUMENT_STATUSES.keys())
# Remove reference to DEFAULT_DEPARTMENTS and use DEPARTMENTS instead
self.departments = list(settings.DEPARTMENTS.keys())
# Create search and filter widgets
self.search_input = TextInput(name="Search", placeholder="Search documents...", width=200)
# Create document type filter - use full names
doc_types = [''] + self.document_types
self.doc_type_select = Select(name="Document Type", options=doc_types, width=200)
# Create status filter
statuses = [''] + self.document_statuses
self.status_select = Select(name="Status", options=statuses, width=200)
# Create department filter - use full names
departments = [''] + self.departments
self.department_select = Select(name="Department", options=departments, width=200)
# Create date filters
self.date_from_picker = DatePicker(name="From Date", width=200)
self.date_to_picker = DatePicker(name="To Date", width=200)
# Store filters for later access
self.filters = {
'search': self.search_input,
'doc_type': self.doc_type_select,
'status': self.status_select,
'department': self.department_select,
'date_from': self.date_from_picker,
'date_to': self.date_to_picker
}
# Initialize documents DataFrame if pandas is available
if HAS_PANDAS:
self.documents_df = pd.DataFrame()
# Create button for new document
self.create_doc_btn = Button(
name='Create New Document',
button_type='primary',
width=150,
disabled=True # Default to disabled until user is set
)
self.create_doc_btn.on_click(self.show_create_document_form)
# Add this line to track event registrations
self._event_handlers_registered = False
# Add notification manager
self.notification_manager = NotificationManager()
# Initialize notification components
self.notification_panel = None
self.notification_count_badge = pn.pane.HTML("<span class='badge'>0</span>")
# Initialize notification timer as None - will be set up later
self.notification_timer = None
# Set up notification callback
self._setup_notification_callback()
def _setup_notification_callback(self):
"""Set up notification callback timer."""
try:
# Only set up notifications if user is set and timer isn't already running
if self.user and not self.notification_timer:
# Start periodic notification updates every 30 seconds
self.notification_timer = pn.state.add_periodic_callback(
self._refresh_notifications,
period=30000, # 30 seconds
start=False # Don't start immediately
)
logger.debug("Notification callback timer created")
elif not self.user:
# User not set yet, just create placeholder timer
logger.debug("User not set, notification callback will be set up when user is assigned")
except Exception as e:
logger.error(f"Error setting up notification callback: {e}")
def _refresh_notifications(self):
"""Refresh notifications from the server."""
try:
if not self.user:
return
# Get latest notifications for user
notifications = self.notification_manager.get_user_notifications(
user_uid=self.user.uid,
unread_only=False,
limit=20
)
# Count unread notifications
unread_count = self.notification_manager.count_user_notifications(
user_uid=self.user.uid,
unread_only=True
)
# Update notification count badge
badge_color = "red" if unread_count > 0 else "gray"
self.notification_count_badge.object = f"""
<span style="background-color: {badge_color}; color: white;
border-radius: 50%; padding: 2px 6px; font-size: 12px;">
{unread_count}
</span>
"""
# Update notification panel content if it exists
if hasattr(self, 'notification_content') and self.notification_content:
notification_html = self._format_notifications_html(notifications)
self.notification_content.object = notification_html
logger.debug(f"Updated notification panel with {len(notifications)} notifications")
except Exception as e:
logger.warning(f"Error refreshing notifications: {e}")
def _update_notification_display(self, notifications):
"""Update the notification display area."""
try:
if not notifications:
return
# Format notifications for display
notification_html = "<div class='notifications'>"
for notif in notifications[:3]: # Show only top 3
notification_html += f"""
<div class='notification-item'>
<small>{notif.get('message', 'No message')}</small>
</div>
"""
notification_html += "</div>"
# Update notification area if it exists
if hasattr(self, 'notification_area') and self.notification_area:
self.notification_area.object = notification_html
except Exception as e:
logger.warning(f"Error updating notification display: {e}")
def _get_field_case_insensitive(self, data_dict, field_names):
"""Helper method to get field value from dictionary with case-insensitive field name matching."""
try:
if not isinstance(data_dict, dict):
return None
for field_name in field_names:
# Try exact match first
if field_name in data_dict:
return data_dict[field_name]
# Try case-insensitive match
for key in data_dict.keys():
if key.lower() == field_name.lower():
return data_dict[key]
return None
except Exception as e:
logger.error(f"Error in _get_field_case_insensitive: {e}")
return None
def __del__(self):
"""Cleanup when dashboard is destroyed."""
try:
# Stop notification timer if running
if hasattr(self, 'notification_timer') and self.notification_timer.running:
self.notification_timer.stop()
except Exception as e:
logger.debug(f"Error stopping notification timer during cleanup: {e}")
def stop_notifications(self):
"""Stop notification polling - useful when switching views."""
try:
if hasattr(self, 'notification_timer') and self.notification_timer.running:
self.notification_timer.stop()
logger.debug("Notification timer stopped")
except Exception as e:
logger.error(f"Error stopping notification timer: {e}")
def restart_notifications(self):
"""Restart notification polling - useful when returning to dashboard."""
try:
if hasattr(self, 'notification_timer'):
# Stop first if running
if self.notification_timer.running:
self.notification_timer.stop()
logger.debug("Notification timer stopped before restart")
# Start again
self.notification_timer.start()
logger.debug("Notification timer restarted")
except RuntimeError as e:
if "already started" in str(e):
logger.debug("Notification timer already running, no restart needed")
else:
logger.error(f"Error restarting notification timer: {e}")
except Exception as e:
logger.error(f"Error restarting notification timer: {e}")
def set_user(self, user):
"""Set the current user from parent application"""
self.user = user
# Set up notifications for this user
self._setup_notification_callback()
# Start notification polling - only if not already started
if user and hasattr(self, 'notification_timer') and self.notification_timer:
try:
# Check if timer is already running before starting
if not self.notification_timer.running:
self.notification_timer.start()
logger.debug("Notification timer started")
else:
logger.debug("Notification timer already running, skipping start")
except RuntimeError as e:
# Handle case where timer is already started
if "already started" in str(e):
logger.debug("Notification timer already running")
else:
logger.error(f"Error starting notification timer: {e}")
except Exception as e:
logger.error(f"Unexpected error with notification timer: {e}")
# Initial notification load
if hasattr(self, '_refresh_notifications'):
self._refresh_notifications()
# Update UI elements that depend on user permissions
self._update_ui_for_user()
# Update the tasks panel with the current user
if hasattr(self, 'tasks_panel') and self.tasks_panel:
self.tasks_panel.set_user(user)
# Modified: Check if template attribute exists before using it
# The template attribute might not be initialized in this class
if hasattr(self, 'template') and self.template:
self._setup_sidebar()
else:
logger.debug("No template attribute available, skipping sidebar setup")
# Load initial documents
self.update_document_list()
def _update_ui_for_user(self):
"""Update UI components based on current user permissions"""
# Example: Update create button based on permissions
if hasattr(self, 'create_doc_btn'):
# Enable button if user has permission or if no permissions system
has_permission = True
if hasattr(permissions, 'user_has_permission'):
has_permission = permissions.user_has_permission(self.user, "CREATE_DOCUMENT")
self.create_doc_btn.disabled = not has_permission
def _setup_sidebar(self):
"""Set up the sidebar filters and actions."""
try:
# Check if template exists and has a sidebar
if not hasattr(self, 'template') or not self.template or not hasattr(self.template, 'sidebar'):
logger.warning("No template sidebar available for setup")
return
# Create sidebar section for document management
self.template.sidebar.append(pn.pane.Markdown("## Document Management"))
# Add the create document button to sidebar
self.template.sidebar.append(self.create_doc_btn)
# Create sidebar section for filters
self.template.sidebar.append(pn.pane.Markdown("## Filters"))
# Add widgets to sidebar
self.template.sidebar.append(self.search_input)
self.template.sidebar.append(self.doc_type_select)
self.template.sidebar.append(self.status_select)
self.template.sidebar.append(self.department_select)
self.template.sidebar.append(self.date_from_picker)
self.template.sidebar.append(self.date_to_picker)
# Create button to apply filters
apply_btn = pn.widgets.Button(
name="Apply Filters",
button_type="default",
width=200
)
apply_btn.on_click(self._apply_filters)
self.template.sidebar.append(apply_btn)
# Create button to clear filters
clear_btn = pn.widgets.Button(
name="Clear Filters",
button_type="default",
width=200
)
clear_btn.on_click(self._clear_filters)
self.template.sidebar.append(clear_btn)
except Exception as e:
logger.error(f"Error setting up sidebar: {e}")
import traceback
logger.error(traceback.format_exc())
def get_dashboard_view(self):
"""Get the dashboard view with document list, notifications, and actions."""
try:
# Create main dashboard layout with integrated notification header
dashboard_layout = pn.Column(
self._create_header_with_notifications(),
pn.Row(
pn.Column(
self._create_filters_section(),
self._create_notifications_section(), # Only filters and notifications in sidebar
width=300,
styles={'background': '#f8f9fa', 'padding': '8px'},
scroll=True
),
pn.Column(
self.notification_area, # Status messages
self._create_tasks_section_full_width(), # Tasks above document list
self.document_list_container,
sizing_mode='stretch_width'
),
sizing_mode='stretch_width'
),
sizing_mode='stretch_width'
)
# Start notification polling when dashboard loads - with safety check
if self.user and hasattr(self, 'notification_timer'):
try:
if not self.notification_timer.running:
self.notification_timer.start()
logger.debug("Notification timer started from get_dashboard_view")
except RuntimeError as e:
if "already started" in str(e):
logger.debug("Notification timer already running in get_dashboard_view")
else:
raise e
# Initialize document list if needed
if not hasattr(self, 'document_list_container'):
self.document_list_container = pn.Column(sizing_mode='stretch_width')
# If document_list doesn't exist or document_list_container is empty,
# initialize with a message and trigger an update
if self.document_list is None or len(self.document_list_container) == 0:
self.document_list_container.clear()
self.document_list_container.append(pn.pane.Markdown("## Documents"))
self.document_list_container.append(pn.pane.Markdown("Loading documents..."))
# Trigger update_document_list after a short delay to ensure the UI is rendered
pn.state.onload(lambda: self.update_document_list())
return dashboard_layout
except Exception as e:
logger.error(f"Error creating dashboard view: {e}")
logger.error(f"Traceback: {traceback.format_exc()}")
# Return error message with fallback layout
return pn.Column(
pn.pane.Markdown("# Document Dashboard"),
pn.pane.Markdown(f"**Error loading dashboard: {str(e)}**"),
pn.widgets.Button(name="Retry", button_type="default", width=100,
on_click=lambda event: self.get_dashboard_view())
)
def _create_tasks_section_full_width(self):
"""Create the tasks section for full-width display above document list."""
try:
# Get the tasks panel view with no duplicate header
if self.user and hasattr(self, 'tasks_panel') and self.tasks_panel:
# Get tasks data directly to create a cleaner display
tasks_content = self._create_tasks_content()
else:
tasks_content = pn.pane.Markdown("*Please log in to view tasks*",
styles={'text-align': 'center', 'color': '#666', 'padding': '20px'})
# Create tasks section with full width and better styling
tasks_section = pn.Column(
pn.Row(
pn.pane.Markdown("### 📋 My Tasks", margin=(0, 0, 0, 0)),
pn.Spacer(),
pn.widgets.Button(
name="Refresh Tasks",
button_type="default",
width=100,
height=30,
on_click=lambda event: self.refresh_tasks()
),
align='center',
sizing_mode='stretch_width'
),
tasks_content,
sizing_mode='stretch_width',
margin=(0, 0, 10, 0), # Bottom margin to separate from document list
styles={
'background': 'white',
'border': '1px solid #ddd',
'border-radius': '5px',
'padding': '10px'
}
)
return tasks_section
except Exception as e:
logger.error(f"Error creating full-width tasks section: {e}")
return pn.pane.Markdown("*Error loading tasks*")
def _create_tasks_content(self):
"""Create the tasks content without duplicate headers."""
try:
if not self.user:
return pn.pane.Markdown("*Please log in to view tasks*")
# Get clean tasks panel for dashboard
if hasattr(self, 'tasks_panel') and self.tasks_panel:
return self.tasks_panel.get_panel_for_dashboard()
else:
return pn.pane.Markdown("*Tasks panel not initialized*")
except Exception as e:
logger.error(f"Error creating tasks content: {e}")
return pn.pane.Markdown("*Error loading tasks*")
def _create_filters_section(self):
"""Create the filters section for the sidebar."""
try:
filters_section = pn.Column(
pn.pane.Markdown("### 🔍 Filters"),
self.search_input,
self.doc_type_select,
self.status_select,
self.department_select,
self.date_from_picker,
self.date_to_picker,
pn.Row(
pn.widgets.Button(name="Apply", button_type="primary", width=90,
on_click=self._apply_filters),
pn.widgets.Button(name="Clear", button_type="default", width=90,
on_click=self._clear_filters)
),
width=290, # Increased to match sidebar width
margin=(0, 0, 10, 0), # Add bottom margin
styles={
'background': 'white',
'border': '1px solid #ddd',
'border-radius': '5px',
'padding': '8px' # Reduced padding
}
)
return filters_section
except Exception as e:
logger.error(f"Error creating filters section: {e}")
return pn.pane.Markdown("*Error loading filters*")
def _create_tasks_section(self):
"""Create the tasks section for the sidebar."""
try:
# Get the tasks panel view
if self.user and hasattr(self, 'tasks_panel') and self.tasks_panel:
tasks_view = self.tasks_panel.get_panel()
else:
tasks_view = pn.pane.Markdown("*Please log in to view tasks*")
# Create tasks section with better sizing and scroll handling
tasks_section = pn.Column(
pn.pane.Markdown("### 📋 My Tasks"),
tasks_view,
width=280,
height=400, # Increased from 250 to 400
scroll=True,
styles={
'background': 'white',
'border': '1px solid #ddd',
'border-radius': '5px',
'padding': '5px' # Reduced padding to maximize content space
}
)
return tasks_section
except Exception as e:
logger.error(f"Error creating tasks section: {e}")
return pn.pane.Markdown("*Error loading tasks*")
def _create_header_with_notifications(self):
"""Create header with integrated notification controls."""
try:
# Create notification counter with better styling and tooltip
self.notification_count_badge = pn.pane.HTML(
"""<span style="background-color: gray; color: white;
border-radius: 50%; padding: 4px 8px; font-size: 12px; font-weight: bold;">
0</span>""",
margin=(0, 5, 0, 0)
)
# Create notification toggle button with better labeling
notification_toggle = pn.widgets.Button(
name="🔔 Notifications",
button_type="default",
width=140,
height=40,
margin=(0, 0, 0, 0)
)
notification_toggle.on_click(self._toggle_notifications)
# Create notification controls container with the counter and button together
notification_controls = pn.Row(
pn.pane.HTML(
"<span style='font-size: 14px; color: #666; margin-right: 8px;'>Unread:</span>",
margin=(12, 0, 0, 0) # Align text vertically with button
),
self.notification_count_badge,
notification_toggle,
align='center',
margin=(0, 0, 0, 0),
styles={
'background': '#f8f9fa',
'border': '1px solid #dee2e6',
'border-radius': '8px',
'padding': '8px 12px'
}
)
# Create main header with dashboard title and notification controls grouped together
header = pn.Row(
pn.pane.Markdown(
"# Document Dashboard",
margin=(0, 0, 0, 0),
styles={'margin-bottom': '0px'}
),
pn.Spacer(),
notification_controls,
align='center',
sizing_mode='stretch_width',
margin=(10, 0, 10, 0)
)
return header
except Exception as e:
logger.error(f"Error creating header: {e}")
return pn.pane.Markdown("# Document Dashboard")
def _create_notifications_section(self):
"""Create the notifications section for the sidebar using Panel widgets."""
try:
# Create container for notification items
self.notification_items_container = pn.Column(sizing_mode='stretch_width')
# Create toggle for showing read notifications
self.show_read_toggle = pn.widgets.Toggle(
name="Show Read",
value=False, # Default to only show unread
width=80,
height=30
)
# Create action buttons for notifications
mark_all_read_btn = pn.widgets.Button(
name="Mark All Read",
button_type="default",
width=90,
height=30
)
mark_all_read_btn.on_click(lambda event: self._mark_all_notifications_read())
refresh_btn = pn.widgets.Button(
name="Refresh",
button_type="default",
width=70,
height=30
)
refresh_btn.on_click(lambda event: self._refresh_notifications_with_widgets())
# Add toggle handler
self.show_read_toggle.param.watch(
lambda event: self._refresh_notifications_with_widgets(),
'value'
)
# Create header with notification summary
header_row = pn.Row(
pn.pane.Markdown("### 📋 Notification Details", margin=(0, 0, 0, 0)),
pn.Spacer(),
sizing_mode='stretch_width'
)
# Create control row with proper spacing and help text
controls_row = pn.Column(
pn.Row(
self.show_read_toggle,
pn.Spacer(width=5),
refresh_btn,
pn.Spacer(width=5),
mark_all_read_btn,
sizing_mode='stretch_width',
margin=(5, 0, 5, 0)
),
pn.pane.HTML(
"<small style='color: #666; font-style: italic;'>"
"Click the 🔔 Notifications button above to show/hide this panel"
"</small>",
margin=(0, 0, 5, 0)
)
)
self.notification_panel = pn.Column(
header_row,
controls_row,
pn.layout.Divider(margin=(5, 0, 5, 0)),
self.notification_items_container,
width=280,
height=350, # Increased height to accommodate controls
scroll=True,
styles={
'background': 'white',
'border': '1px solid #ddd',
'border-radius': '5px',
'padding': '8px'
},
visible=False # Start hidden
)
# Initial load of notifications
self._refresh_notifications_with_widgets()
return self.notification_panel
except Exception as e:
logger.error(f"Error creating notifications section: {e}")
return pn.pane.Markdown("*Error loading notifications*")
def _create_notification_widget(self, notification: Dict[str, Any]):
"""Create a Panel widget for a single notification."""
try:
# Get notification details
created_date = notification.get('created_date', datetime.now())
if isinstance(created_date, str):
try:
created_date = datetime.fromisoformat(created_date.replace('Z', '+00:00'))
except:
created_date = datetime.now()
time_ago = self._format_time_ago(created_date)
# Determine notification style based on priority and read status
is_read = notification.get('read', False)
priority = notification.get('priority', 'INFO')
bg_color = '#f8f9fa' if is_read else '#e3f2fd'
border_color = {
'HIGH': '#dc3545',
'MEDIUM': '#ffc107',
'INFO': '#007bff',
'LOW': '#6c757d'
}.get(priority, '#007bff')
# Create notification content
message = notification.get('message', 'No message')
notification_type = notification.get('notification_type', 'SYSTEM')
# Get notification UID - check multiple possible field names
notification_uid = notification.get('uid') or notification.get('UID') or notification.get('id') or ''
# Log the notification UID for debugging
logger.debug(f"Creating notification widget with UID: '{notification_uid}' for message: '{message[:50]}'")
# Create action button with better sizing
if notification_uid and not is_read:
action_btn = pn.widgets.Button(
name="Mark Read",
button_type="primary",
width=75,
height=22,
margin=(0, 0, 0, 0)
)
# Define click handler with proper UID capture
def mark_read_handler(event, captured_uid=notification_uid):
logger.debug(f"Mark read handler called with UID: '{captured_uid}'")
try:
success = self.mark_notification_read_callback(captured_uid)
if success:
logger.info(f"Successfully marked notification {captured_uid} as read")
# Refresh the notifications
self._refresh_notifications_with_widgets()
else:
logger.error(f"Failed to mark notification {captured_uid} as read")
except Exception as e:
logger.error(f"Error in mark_read_handler for UID {captured_uid}: {e}")
action_btn.on_click(mark_read_handler)
elif is_read:
# Show smaller "Read" indicator for read notifications
action_btn = pn.pane.HTML(
'<span style="color: #6c757d; font-size: 11px; font-style: italic;">Read</span>',
width=75,
height=22,
margin=(0, 0, 0, 0)
)
else:
# No valid UID - show small error indicator
action_btn = pn.pane.HTML(
'<span style="color: #dc3545; font-size: 11px;">No UID</span>',
width=75,
height=22,
margin=(0, 0, 0, 0)
)
logger.warning(f"Notification missing UID: {notification}")
# Create message content with proper text wrapping
message_content = pn.pane.Markdown(
f"**{message}**" if not is_read else message,
styles={
'font-size': '12px',
'margin': '0',
'word-wrap': 'break-word',
'overflow-wrap': 'break-word'
},
width=180, # Leave space for button
margin=(0, 0, 0, 0)
)
# Create time/type info
time_info = pn.pane.Markdown(
f"*{time_ago} • {notification_type}*",
styles={
'font-size': '10px',
'color': '#666',
'margin': '0',
'margin-top': '2px'
},
width=260,
margin=(0, 0, 0, 0)
)
# Create notification widget with improved layout
notification_widget = pn.Column(
pn.Row(
message_content,
pn.Spacer(width=5),
action_btn,
sizing_mode='stretch_width',
margin=(0, 0, 0, 0)
),
time_info,
styles={
'background-color': bg_color,
'border-left': f'3px solid {border_color}',
'padding': '6px',
'margin': '2px 0',
'border-radius': '3px'
},
margin=(0, 0, 0, 0),
sizing_mode='stretch_width'
)
return notification_widget
except Exception as e:
logger.error(f"Error creating notification widget: {e}")
return pn.pane.Markdown("*Error loading notification*")
def mark_notification_read_callback(self, notification_uid: str):
"""Callback method to mark notification as read from UI interactions."""
try:
# Check for valid UID first
if not notification_uid or not notification_uid.strip():
logger.error("mark_notification_read_callback called with empty or None notification_uid")
return False
if not self.user or not hasattr(self.user, 'uid'):
logger.warning("Cannot mark notification as read: no user")
return False
logger.info(f"Attempting to mark notification '{notification_uid}' as read for user {self.user.uid}")
success = self.notification_manager.mark_notification_read(
notification_uid=notification_uid,
read=True
)
if success:
# Refresh notifications immediately
self._refresh_notifications_with_widgets()
logger.info(f"Marked notification {notification_uid} as read")
return True
else:
logger.error(f"Failed to mark notification {notification_uid} as read")
return False
except Exception as e:
logger.error(f"Error in mark_notification_read_callback: {e}")
return False
def _refresh_notifications_with_widgets(self):
"""Refresh notifications using Panel widgets instead of HTML."""
try:
if not self.user or not hasattr(self.user, 'uid'):
return
# Get show_read preference from toggle
show_read = getattr(self, 'show_read_toggle', None)
unread_only = not (show_read and show_read.value)
# Get notifications from database
notifications = self.notification_manager.get_user_notifications(
user_uid=self.user.uid,
unread_only=unread_only,
limit=20
)
# Debug the notification data structure
logger.debug(f"Retrieved {len(notifications)} notifications (unread_only={unread_only})")
for i, notif in enumerate(notifications[:3]): # Log first 3 notifications
logger.debug(f"Notification {i}: keys={list(notif.keys())}, uid='{notif.get('uid', 'MISSING')}', read={notif.get('read', False)}")
# Count unread notifications for badge
unread_count = self.notification_manager.count_user_notifications(
user_uid=self.user.uid,
unread_only=True
)
# Update notification count badge with improved styling
if hasattr(self, 'notification_count_badge'):
if unread_count > 0:
badge_color = "#dc3545" # Bootstrap danger red
badge_text = str(unread_count) if unread_count < 100 else "99+"
pulse_animation = "animation: pulse 2s infinite;"
else:
badge_color = "#6c757d" # Bootstrap secondary gray
badge_text = "0"
pulse_animation = ""
self.notification_count_badge.object = f"""
<span style="background-color: {badge_color}; color: white;
border-radius: 50%; padding: 4px 8px; font-size: 12px; font-weight: bold;
{pulse_animation}">
{badge_text}
</span>
"""
# Clear notification items container
if hasattr(self, 'notification_items_container'):
self.notification_items_container.clear()
# Create notification widgets
if not notifications:
no_notifications_text = "No unread notifications" if unread_only else "No notifications"
self.notification_items_container.append(
pn.pane.Markdown(f"*{no_notifications_text}*",
styles={'text-align': 'center', 'color': '#666', 'padding': '10px'})
)
else:
for notification in notifications:
notification_widget = self._create_notification_widget(notification)
self.notification_items_container.append(notification_widget)
logger.debug(f"Updated notification widgets with {len(notifications)} notifications")
except Exception as e:
logger.error(f"Error refreshing notifications with widgets: {e}")
def _refresh_notifications(self):
"""Refresh notifications from database."""
try:
if not self.user or not hasattr(self.user, 'uid'):
return
# Get notifications from database
notifications = self.notification_manager.get_user_notifications(
user_uid=self.user.uid,
unread_only=False,
limit=20
)
# Count unread notifications
unread_count = self.notification_manager.count_user_notifications(
user_uid=self.user.uid,
unread_only=True
)
# Update notification count badge
badge_color = "red" if unread_count > 0 else "gray"
self.notification_count_badge.object = f"""
<span style="background-color: {badge_color}; color: white;
border-radius: 50%; padding: 2px 6px; font-size: 12px;">
{unread_count}
</span>
"""
# Update notification panel content
if hasattr(self, 'notification_content') and self.notification_content:
notification_html = self._format_notifications_html(notifications)
self.notification_content.object = notification_html
logger.debug(f"Updated notification panel with {len(notifications)} notifications")
except Exception as e:
logger.error(f"Error refreshing notifications: {e}")
def _format_notifications_html(self, notifications: List[Dict[str, Any]]) -> str:
"""Format notifications as HTML with proper click handling."""
try:
if not notifications:
return "<div style='padding: 10px; text-align: center; color: #666;'>No notifications</div>"
html_parts = []
for notification in notifications:
# Get notification details
created_date = notification.get('created_date', datetime.now())
if isinstance(created_date, str):
try:
created_date = datetime.fromisoformat(created_date.replace('Z', '+00:00'))
except:
created_date = datetime.now()
time_ago = self._format_time_ago(created_date)
# Determine notification style based on priority and read status
is_read = notification.get('read', False)
priority = notification.get('priority', 'INFO')
bg_color = '#f9f9f9' if is_read else '#e1f5fe'
border_color = {
'HIGH': '#d32f2f',
'MEDIUM': '#ffa000',
'INFO': '#1976d2',
'LOW': '#757575'
}.get(priority, '#1976d2')
# Get notification UID for click handling
notification_uid = notification.get('uid', '')
# Create notification HTML with data attributes for Panel to handle
notification_html = f"""
<div class="notification-item"
data-notification-uid="{notification_uid}"
style="border-left: 3px solid {border_color}; background-color: {bg_color};
padding: 8px; margin: 5px 0; border-radius: 3px; cursor: pointer;
transition: background-color 0.2s;"
onmouseover="this.style.backgroundColor='#e8f5fd'"
onmouseout="this.style.backgroundColor='{bg_color}'"
onclick="markNotificationRead('{notification_uid}')">
<div style="font-weight: {'normal' if is_read else 'bold'}; font-size: 13px;">
{notification.get('message', 'No message')}
</div>
<div style="font-size: 11px; color: #666; margin-top: 4px;">
{time_ago} • {notification.get('notification_type', 'SYSTEM')}
</div>
</div>
"""
html_parts.append(notification_html)
# Add JavaScript function to handle clicks
javascript = """
<script>
function markNotificationRead(notificationUid) {
// Send POST request to mark notification as read
fetch('/mark_notification_read', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify({
notification_uid: notificationUid
})
})
.then(response => response.json())
.then(data => {
if (data.success) {
// Find the notification element and update its style
const notificationEl = document.querySelector(`[data-notification-uid="${notificationUid}"]`);
if (notificationEl) {
notificationEl.style.backgroundColor = '#f9f9f9';
notificationEl.style.opacity = '1';
notificationEl.style.cursor = 'pointer';
const messageEl = notificationEl.querySelector('div:first-child');
if (messageEl) {
messageEl.style.fontWeight = 'normal';
}
}
// Trigger refresh of notifications
setTimeout(() => {
if (window.dashboardApp && window.dashboardApp.refreshNotifications) {
window.dashboardApp.refreshNotifications();
}
}, 500);
}
})
.catch(error => {
console.error('Error marking notification as read:', error);
});
}
</script>
"""
return "".join(html_parts) + javascript
except Exception as e:
logger.error(f"Error formatting notifications: {e}")
return f"<div style='color: red;'>Error loading notifications: {str(e)}</div>"
def _format_time_ago(self, created_date: datetime) -> str:
"""Format datetime as 'time ago' string."""
try:
now = datetime.now()
if created_date.tzinfo:
now = now.replace(tzinfo=created_date.tzinfo)
diff = now - created_date
if diff.days > 0:
return f"{diff.days}d ago"
elif diff.seconds > 3600:
hours = diff.seconds // 3600
return f"{hours}h ago"
elif diff.seconds > 60:
minutes = diff.seconds // 60
return f"{minutes}m ago"
else:
return "Just now"
except Exception as e:
logger.error(f"Error formatting time: {e}")
return "Unknown"
def _toggle_notifications(self, event=None):
"""Toggle notification panel visibility with better user feedback."""
try:
if hasattr(self, 'notification_panel') and self.notification_panel:
# Toggle visibility
current_visible = getattr(self.notification_panel, 'visible', False)
self.notification_panel.visible = not current_visible
# Update button text to reflect current state
if hasattr(event, 'obj'): # Check if event has the button object
button = event.obj
if not current_visible: # Panel is now visible
button.name = "🔔 Hide Notifications"
# Refresh notifications when opening
self._refresh_notifications_with_widgets()
else: # Panel is now hidden
button.name = "🔔 Notifications"
# Provide user feedback
if not current_visible:
# Panel is now visible
if hasattr(self, 'notification_area'):
self.notification_area.object = "💡 Notification panel opened. Use the toggle below to show/hide read notifications."
# Clear the message after 3 seconds
pn.state.add_periodic_callback(
lambda: setattr(self.notification_area, 'object', '') if hasattr(self, 'notification_area') else None,
period=3000,
count=1
)
else:
# Panel is now hidden
if hasattr(self, 'notification_area'):
self.notification_area.object = ""
except Exception as e:
logger.error(f"Error toggling notifications: {e}")
def _mark_all_notifications_read(self):
"""Mark all user notifications as read."""
try:
if not self.user or not hasattr(self.user, 'uid'):
logger.warning("Cannot mark notifications as read: no user")
return
notifications = self.notification_manager.get_user_notifications(
user_uid=self.user.uid,
unread_only=True,
limit=50
)
marked_count = 0
for notification in notifications:
try:
self.notification_manager.mark_notification_read(
notification_uid=notification.get('uid'),
read=True
)
marked_count += 1
except Exception as mark_error:
logger.error(f"Error marking notification {notification.get('uid')} as read: {mark_error}")
logger.info(f"Marked {marked_count} notifications as read")
# Refresh the display immediately
self._refresh_notifications_with_widgets()
# Show feedback to user in notification area
if hasattr(self, 'notification_area') and marked_count > 0:
self.notification_area.object = f"✓ Marked {marked_count} notifications as read"
# Clear the message after 3 seconds
def clear_message():
if hasattr(self, 'notification_area'):
self.notification_area.object = ""
# Use Panel's scheduling to clear the message
pn.state.add_periodic_callback(clear_message, 3000, count=1)
except Exception as e:
logger.error(f"Error marking notifications as read: {e}")
def _create_document_list(self):
"""Create the document list component."""
try:
import panel as pn
# Initialize with empty pandas DataFrame
import pandas as pd
empty_df = pd.DataFrame(columns=[
'doc_number', 'title', 'doc_type', 'status', 'created_date', 'action'
])
# Create tabulator with pandas DataFrame - using proper pagination value
self.document_list = pn.widgets.Tabulator(
empty_df,
formatters={'action': 'html'},
pagination="local", # Use "local" instead of True
page_size=10,
sizing_mode="stretch_width",
height=400,
selectable=True
)
# Add click handler for row selection
self.document_list.on_click(self.document_selected)
except Exception as e:
logger.error(f"Error creating document list: {e}")
import traceback
logger.error(traceback.format_exc())
self.document_list = None
# Create a fallback table using markdown
md_table = """
| Document Number | Title | Type | Status | Created |
|---|---|---|---|---|
| No documents found | | | | |
"""
self.document_list = pn.pane.Markdown(md_table)
# Add this method to refresh tasks when needed
def refresh_tasks(self):
"""Refresh the tasks panel"""
if hasattr(self, 'tasks_panel') and self.tasks_panel:
self.tasks_panel.load_tasks()
#@guard_execution(cooldown_ms=1000)
def update_document_list(self, event=None):
"""Update the document list with current filters."""
logger.debug("Updating document list")
try:
# Show loading message
self.notification_area.object = "Loading documents..."
# Also refresh tasks panel
self.refresh_tasks()
# Clear document list container
if not hasattr(self, 'document_list_container'):
logger.debug("Creating document_list_container")
self.document_list_container = pn.Column(sizing_mode='stretch_width')
self.document_list_container.clear()
# Set up document list header
self.document_list_container.append(pn.pane.Markdown("## Document List"))
# Get filter values and convert to codes for the backend
from CDocs.config import settings
# Get raw filter values
search_text = self.search_input.value.strip() if hasattr(self, 'search_input') and self.search_input.value else None
doc_type_full = self.doc_type_select.value if hasattr(self, 'doc_type_select') and self.doc_type_select.value else None
status_full = self.status_select.value if hasattr(self, 'status_select') and self.status_select.value else None
department_full = self.department_select.value if hasattr(self, 'department_select') and self.department_select.value else None
date_from = self.date_from_picker.value if hasattr(self, 'date_from_picker') else None
date_to = self.date_to_picker.value if hasattr(self, 'date_to_picker') else None
# Convert full names to codes for backend query
doc_type_code = None
if doc_type_full and doc_type_full != '':
doc_type_code = settings.get_document_type_code(doc_type_full)
logger.debug(f"Converted doc type '{doc_type_full}' to code '{doc_type_code}'")
status_code = None
if status_full and status_full != '':
status_code = settings.get_document_status_code(status_full)
logger.debug(f"Converted status '{status_full}' to code '{status_code}'")
department_code = None
if department_full and department_full != '':
department_code = settings.get_department_code(department_full)
logger.debug(f"Converted department '{department_full}' to code '{department_code}'")
logger.debug(f"Final filter codes - Type: {doc_type_code}, Status: {status_code}, Dept: {department_code}, Search: {search_text}")
# Get documents with converted codes
from CDocs.controllers.document_controller import get_documents
result = get_documents(
user=self.user,
doc_type=doc_type_code,
status=status_code,
department=department_code, # Add department filter
date_from=date_from.isoformat() if date_from else None,
date_to=date_to.isoformat() if date_to else None,
search_text=search_text
)
documents = result.get('documents', [])
logger.debug(f"Retrieved {len(documents)} documents with filters applied")
if not documents:
self.document_list_container.append(pn.pane.Markdown("No documents found matching your criteria."))
self.notification_area.object = ""
return
# Debug the Neo4j document structure
if documents and len(documents) > 0:
logger.debug(f"Sample document data: {documents[0]}")
# Format documents for display with Neo4j field names
data = []
for doc in documents:
# Map Neo4j field names to our display names
doc_data = {
# Look for UID in various possible field names (case-insensitive)
'UID': self._get_field_case_insensitive(doc, ['UID', 'uid']),
# Map docNumber (Neo4j) to doc_number (UI)
'doc_number': self._get_field_case_insensitive(doc, ['docNumber', 'doc_number']),
'title': doc.get('title', ''),
'revision': doc.get('revision', ''),
# Map status fields and convert code to full name
'status': settings.get_document_status_name(
self._get_field_case_insensitive(doc, ['status'])
),
# Map docType (Neo4j) to doc_type (UI), converting code to full name
'doc_type': settings.get_document_type_name(
self._get_field_case_insensitive(doc, ['docType', 'doc_type'])
),
# Convert department code to full name
'department': settings.get_department_name(doc.get('department', '')),
# Map date fields
'last_updated': self._get_field_case_insensitive(
doc, ['modifiedDate', 'modified_date', 'last_modified_date', 'createdDate', 'created_date']
)
}
# Add a "View" action button with document UID for JavaScript access
doc_data['action'] = f'<button class="btn btn-sm btn-primary view-doc-btn">View</button>'
# Add a simplified static access indicator based on document status
status = doc_data['status'].lower()
if status == 'draft':
# Draft documents typically have edit access for owners
doc_data['access'] = '<span class="badge bg-success">Editable</span>'
elif status == 'in review':
# In review may have edit access for reviewers
doc_data['access'] = '<span class="badge bg-warning">Review</span>'
elif status == 'in approval':
# In approval typically has read-only access
doc_data['access'] = '<span class="badge bg-secondary">Approval</span>'
else:
# Published, effective, etc. are typically read-only
doc_data['access'] = '<span class="badge bg-secondary">Read-only</span>'
data.append(doc_data)
# Create DataFrame
import pandas as pd
df = pd.DataFrame(data)
# Format date columns
date_columns = ['last_updated']
for col in date_columns:
if (col in df.columns):
df[col] = pd.to_datetime(df[col], errors='coerce').dt.strftime('%Y-%m-%d')
# Create Tabulator widget for Panel 1.6.1
doc_table = pn.widgets.Tabulator(
df,
formatters={
'action': {
'type': 'html',
'label': 'Action'
},
'access': {
'type': 'html',
'label': 'Access'
}
},
selectable=True,
pagination='local',
page_size=10,
sizing_mode='stretch_width',
hidden_columns=['UID'],
selection=[]
)
# Update reference and set up handlers - ONCE
self.document_list = doc_table
# Setup event handlers - register click events WITHOUT JavaScript
self.document_list.on_click(self.document_selected)
# Add table to container
self.document_list_container.append(doc_table)
# Apply simple styling (no JavaScript callbacks)
pn.extension(raw_css=["""
<style>
.badge {
display: inline-block;
padding: 0.35em 0.65em;
font-size: 0.75em;
font-weight: 700;
line-height: 1;
text-align: center;
white-space: nowrap;
vertical-align: baseline;
border-radius: 0.25rem;
}
.bg-success { background-color: #198754; color: white; }
.bg-warning { background-color: #ffc107; color: black; }
.bg-secondary { background-color: #6c757d; color: white; }
</style>
"""])
# Show document count and applied filters
filter_info = []
if search_text:
filter_info.append(f"Search: '{search_text}'")
if doc_type_full:
filter_info.append(f"Type: {doc_type_full}")
if status_full:
filter_info.append(f"Status: {status_full}")
if department_full:
filter_info.append(f"Department: {department_full}")
if date_from:
filter_info.append(f"From: {date_from}")
if date_to:
filter_info.append(f"To: {date_to}")
if filter_info:
filter_text = " | ".join(filter_info)
self.document_list_container.append(
pn.pane.Markdown(f"*Showing {len(documents)} document(s) with filters: {filter_text}*")
)
else:
self.document_list_container.append(
pn.pane.Markdown(f"*Showing {len(documents)} document(s) (no filters applied)*")
)
# Clear notification
self.notification_area.object = ""
except Exception as e:
logger.error(f"Error updating document list: {str(e)}")
import traceback
logger.error(f"Traceback: {traceback.format_exc()}")
self.notification_area.object = f"**Error loading documents:** {str(e)}"
# Safely create or clear the container
if not hasattr(self, 'document_list_container'):
self.document_list_container = pn.Column(sizing_mode='stretch_width')
else:
self.document_list_container.clear()
self.document_list_container.append(pn.pane.Markdown("Error loading documents. Please try again."))
def _show_sample_documents(self):
"""Show sample documents when database access fails."""
try:
# Create sample data
sample_data = [
{
"doc_number": "SOP-QA-001",
"title": "Document Control Procedure",
"doc_type": "SOP",
"status": "Published",
"created_date": "2025-01-15",
"action": "<button class='btn btn-sm btn-primary'>View</button>"
},
{
"doc_number": "POL-QA-001",
"title": "Quality Management System Manual",
"doc_type": "Policy",
"status": "Published",
"created_date": "2025-02-01",
"action": "<button class='btn btn-sm btn-primary'>View</button>"
},
{
"doc_number": "FORM-QA-001",
"title": "Change Control Request Form",
"doc_type": "Form",
"status": "Draft",
"created_date": "2025-03-10",
"action": "<button class='btn btn-sm btn-primary'>View</button>"
},
{
"doc_number": "WI-MFG-001",
"title": "Product Assembly Instructions",
"doc_type": "Work Instruction",
"status": "In Review",
"created_date": "2025-03-05",
"action": "<button class='btn btn-sm btn-primary'>View</button>"
},
{
"doc_number": "TMPL-RES-001",
"title": "Research Report Template",
"doc_type": "Template",
"status": "Approved",
"created_date": "2025-02-20",
"action": "<button class='btn btn-sm btn-primary'>View</button>"
}
]
# Create document list if it doesn't exist
if not self.document_list:
self._create_document_list()
# Convert to pandas DataFrame for Tabulator
try:
import pandas as pd
sample_df = pd.DataFrame(sample_data)
# Update the document list with sample data
if isinstance(self.document_list, pn.widgets.Tabulator):
self.document_list.value = sample_df
elif isinstance(self.document_list, pn.pane.Markdown):
# If we have the markdown fallback
self.document_list.object = self._format_documents_as_markdown(sample_data)
except ImportError:
# If pandas is not available, use markdown fallback
self.document_list = pn.pane.Markdown(self._format_documents_as_markdown(sample_data))
except Exception as e:
logger.error(f"Error in _show_sample_documents: {e}")
if hasattr(self, 'notification_area'):
self.notification_area.object = f"Error showing sample documents: {str(e)}"
def _format_documents_as_markdown(self, documents):
"""Format documents as markdown table for fallback display."""
header = """
| Document Number | Title | Type | Status | Created |
|---|---|---|---|---|
"""
if not documents:
return header + "| No documents found | | | | |"
rows = []
for doc in documents:
row = f"| {doc.get('doc_number', 'N/A')} | {doc.get('title', 'Untitled')} | " \
f"{doc.get('doc_type', 'General')} | {doc.get('status', 'Draft')} | " \
f"{doc.get('created_date', '')} |"
rows.append(row)
return header + "\n".join(rows)
#@guard_execution()
def document_selected(self, event):
"""Handle document selection from table"""
logger.debug("Document selection event triggered")
try:
logger.debug(f"Event type: {type(event).__name__}")
# With Panel 1.6.1, the event structure is different
# We need to access the document_list directly instead of relying on the event
# Get the currently selected document table
if not hasattr(self, 'document_list') or self.document_list is None:
logger.error("No document list available")
self.notification_area.object = "**Error:** Document list not initialized"
return
table = self.document_list # Use the current document_list
# Extract row information from the event
row_index = None
# Handle different event types
if hasattr(event, 'row') and event.row is not None: # CellClickEvent
row_index = event.row
logger.debug(f"Using row from cell click: {row_index}")
# Also check if this is a click on the action column (View button)
if hasattr(event, 'column') and event.column == 'action':
logger.debug("Click detected on action (View) button")
elif hasattr(event, 'selected') and event.selected: # Selection event
row_index = event.selected[0] if isinstance(event.selected, list) else event.selected
logger.debug(f"Using row from selection event: {row_index}")
elif hasattr(event, 'new') and event.new: # Alternative selection format
row_index = event.new[0] if isinstance(event.new, list) else event.new
logger.debug(f"Using row from event.new: {row_index}")
if row_index is None:
logger.warning("Could not determine row index from event")
# Try a fallback method - check if there's a selection on the table
if hasattr(table, 'selection') and table.selection:
row_index = table.selection[0] if isinstance(table.selection, list) else table.selection
logger.debug(f"Using row from table selection: {row_index}")
else:
self.notification_area.object = "**Error:** Could not determine which document was selected"
return
# Get the DataFrame from the table
df = None
if hasattr(table, 'value'):
df = table.value
logger.debug(f"Got DataFrame from table.value with {len(df)} rows")
else:
logger.error("Table has no value attribute")
self.notification_area.object = "**Error:** Could not access table data"
return
if df is None or len(df) == 0 or row_index >= len(df):
logger.error(f"Invalid DataFrame or row index: df={df is not None}, len(df)={len(df) if df is not None else 0}, idx={row_index}")
self.notification_area.object = "**Error:** Could not access selected document data"
return
# Debug the DataFrame columns and the selected row
logger.debug(f"DataFrame columns: {df.columns.tolist()}")
logger.debug(f"Selected row data: {df.iloc[row_index].to_dict()}")
# Check for 'uid' column and extract document UID
if 'UID' in df.columns:
doc_uid = df.iloc[row_index]['UID']
logger.debug(f"Found document UID in 'UID' column: {doc_uid}")
# Call parent app's load_document method
if hasattr(self, 'parent_app') and self.parent_app and hasattr(self.parent_app, 'load_document'):
logger.debug(f"Calling parent_app.load_document with UID: {doc_uid}")
self.parent_app.load_document(doc_uid)
else:
logger.error("No parent_app reference available to load document")
self.notification_area.object = "**Error:** Cannot load document (missing parent app reference)"
return
# Fallback: Try to find the document by doc_number
if 'doc_number' in df.columns:
doc_number = df.iloc[row_index]['doc_number']
logger.debug(f"Looking up document by doc_number: {doc_number}")
# Import within function to avoid circular imports
from CDocs.controllers.document_controller import get_documents
result = get_documents(
user=self.user,
doc_number=doc_number
)
documents = result.get('documents', [])
if documents and len(documents) > 0:
# Try to get the UID using the same helper method we use when building the table
doc_uid = self._get_field_case_insensitive(documents[0], ['UID', 'uid'])
if doc_uid:
logger.debug(f"Found UID by doc_number lookup: {doc_uid}")
# Call parent app's load_document method
if hasattr(self, 'parent_app') and self.parent_app and hasattr(self.parent_app, 'load_document'):
self.parent_app.load_document(doc_uid)
else:
logger.error("No parent_app reference available to load document")
self.notification_area.object = "**Error:** Cannot load document (missing parent app reference)"
return
# If we got here, we couldn't find a document UID
logger.error("Could not determine document UID from row data")
self.notification_area.object = "**Error:** Could not determine document ID"
except Exception as e:
logger.error(f"Error selecting document: {str(e)}")
import traceback
logger.error(f"Traceback: {traceback.format_exc()}")
self.notification_area.object = f"**Error:** {str(e)}"
def show_create_document_form(self, event=None):
"""Show the create document form with custom document number option"""
logger.debug("Showing create document form")
try:
# Clear document list container
self.document_list_container.clear()
# Set notification
self.notification_area.object = "Creating new document"
# Create form widgets
title_input = pn.widgets.TextInput(name="Title", placeholder="Enter document title", width=400)
# Get document types from settings
from CDocs.config import settings
# Use full names as options but store the mapping to codes
doc_type_options = list(settings.DOCUMENT_TYPES.keys()) # Full names as options
logger.debug(f"Document type dropdown options: {doc_type_options}")
# Create the dropdown with full names as options
doc_type_select = pn.widgets.Select(
name="Document Type",
options=doc_type_options,
width=200
)
# Department selector - use full names as options
dept_options = list(settings.DEPARTMENTS.keys()) # Full names as options
department_select = pn.widgets.Select(name="Department", options=dept_options, width=200)
# Custom document number option
custom_number_toggle = pn.widgets.Switch(name="Use Custom Document Number", value=False)
custom_number_input = pn.widgets.TextInput(
name="Document Number",
placeholder="e.g., CUSTOM-DOC-001",
width=300,
visible=False
)
validation_status = pn.pane.Markdown("", width=400, visible=False)
# Toggle visibility handler
def toggle_custom_number(event):
custom_number_input.visible = event.new
validation_status.visible = event.new
if not event.new:
custom_number_input.value = ""
validation_status.object = ""
custom_number_toggle.param.watch(toggle_custom_number, 'value')
# Real-time validation handler
def validate_number(event):
if event.new and len(event.new.strip()) > 0:
try:
from CDocs.controllers.document_controller import validate_document_number
result = validate_document_number(event.new.strip().upper())
if result.get('valid', False):
validation_status.object = f"✅ {result.get('message', 'Valid')}"
validation_status.styles = {'color': 'green'}
else:
validation_status.object = f"❌ {result.get('message', 'Invalid')}"
validation_status.styles = {'color': 'red'}
except Exception as e:
validation_status.object = f"❌ Error validating: {str(e)}"
validation_status.styles = {'color': 'red'}
else:
validation_status.object = ""
custom_number_input.param.watch(validate_number, 'value')
# Custom path storage toggle
custom_path_toggle = pn.widgets.Switch(name="Custom File Location", value=False)
# Custom path input - initially hidden
custom_path_input = pn.widgets.TextInput(
name="Custom Path",
placeholder="/custom/path/for/document",
width=400,
visible=False
)
# Help text for custom path
custom_path_help = pn.pane.Markdown(
"""
*Custom path will store:*
- Editable versions in a folder named after the document number
- Published PDF version directly in the specified path
""",
visible=False,
styles={'font-style': 'italic', 'color': '#6c757d', 'font-size': '0.9em'}
)
# Content textarea
content_area = pn.widgets.TextAreaInput(
name="Content",
placeholder="Enter document content or upload a file",
rows=10,
width=600
)
# File upload widget
file_upload = pn.widgets.FileInput(name="Upload File", accept=".docx,.pdf,.txt,.md", width=300)
# Buttons
create_btn = pn.widgets.Button(name="Create Document", button_type="success", width=150)
cancel_btn = pn.widgets.Button(name="Cancel", button_type="default", width=100)
# Define toggle handler for custom path
def toggle_custom_path(event):
custom_path_input.visible = event.new
custom_path_help.visible = event.new
# Add toggle handler
custom_path_toggle.param.watch(toggle_custom_path, 'value')
# Create form layout - adding custom document number and path sections
create_form = pn.Column(
pn.pane.Markdown("# Create New Document"),
title_input,
pn.Row(doc_type_select, department_select),
pn.Spacer(height=10),
pn.pane.Markdown("### Document Number"),
custom_number_toggle,
custom_number_input,
validation_status,
pn.Spacer(height=10),
pn.pane.Markdown("### Storage Location"),
custom_path_toggle,
custom_path_input,
custom_path_help,
pn.Spacer(height=10),
pn.Row(pn.Column(pn.pane.Markdown("**Upload File:**"), file_upload),
pn.Column(pn.pane.Markdown("**Or enter content directly:**"))),
content_area,
pn.Row(
cancel_btn,
create_btn,
align='end'
),
width=700,
styles={'background':'#f8f9fa'},
css_classes=['p-4', 'border', 'rounded']
)
# Define file upload handler
def handle_file_upload(event):
if file_upload.value is not None:
filename = file_upload.filename
# Extract file content based on type
if filename.endswith('.txt') or filename.endswith('.md'):
# Text files can be displayed directly
try:
content = file_upload.value.decode('utf-8')
content_area.value = content
except:
self.notification_area.object = "**Error:** Failed to read file as text"
else:
# For other file types, just note that the file will be stored
content_area.value = f"[File content from: {filename}]"
# Add file upload handler
file_upload.param.watch(handle_file_upload, 'value')
# Define button handlers
def handle_create(event):
logger.debug("Create button clicked")
# Validate form
if not title_input.value:
self.notification_area.object = "**Error:** Document title is required"
return
if not doc_type_select.value:
self.notification_area.object = "**Error:** Document type is required"
return
# Validate custom document number if provided
custom_doc_number = None
if custom_number_toggle.value and custom_number_input.value:
custom_doc_number = custom_number_input.value.strip().upper()
# Final validation
try:
from CDocs.controllers.document_controller import validate_document_number
validation_result = validate_document_number(custom_doc_number)
if not validation_result.get('valid', False):
self.notification_area.object = f"**Error:** {validation_result.get('message', 'Invalid document number')}"
return
except Exception as e:
self.notification_area.object = f"**Error:** Failed to validate document number: {str(e)}"
return
# Validate custom path if enabled
custom_path = None
if custom_path_toggle.value:
if not custom_path_input.value:
self.notification_area.object = "**Error:** Custom path is required when custom file location is enabled"
return
# Sanitize custom path (remove trailing slashes, etc.)
custom_path = custom_path_input.value.rstrip('/')
# Basic validation of custom path format
if not custom_path.startswith('/'):
self.notification_area.object = "**Error:** Custom path must start with a slash (/)"
return
# Create document
try:
from CDocs.controllers.document_controller import create_document, create_document_version
# Extract content from file or text area
content = content_area.value or ""
file_content = None
file_name = None
if file_upload.value:
file_content = file_upload.value # Store the binary content
file_name = file_upload.filename
# If text area is empty, try to extract text for preview
if not content_area.value:
try:
# Try to extract text if it's a text file
content = file_upload.value.decode('utf-8')
except:
# For binary files, save the reference
content = f"[File content from: {file_upload.filename}]"
# Convert full names to codes for storage in Neo4j
# Get the document type code from the selected full name
doc_type_full_name = doc_type_select.value
doc_type_code = settings.get_document_type_code(doc_type_full_name)
# Get the department code from the selected full name
dept_full_name = department_select.value
dept_code = settings.get_department_code(dept_full_name)
logger.info(f"Creating document with type: {doc_type_full_name} (code: {doc_type_code})")
logger.info(f"Department: {dept_full_name} (code: {dept_code})")
if custom_doc_number:
logger.info(f"Using custom document number: {custom_doc_number}")
# Prepare additional properties
properties = {}
if custom_path:
properties["custom_path"] = custom_path
logger.info(f"Using custom path for document: {custom_path}")
# Show notification for document creation
self.notification_area.object = "**Creating document...**"
# Create the document in Neo4j with custom number if provided
result = create_document(
user=self.user,
title=title_input.value,
doc_text=content,
doc_type=doc_type_code, # Send the code, not the full name
department=dept_code, # Send the code, not the full name
status='DRAFT',
doc_number=custom_doc_number, # Pass custom number if provided
properties=properties
)
# Check if document was created successfully
if not result or not isinstance(result, dict) or 'document' not in result:
self.notification_area.object = "**Error:** Document creation failed - invalid result"
return
# Get the document UID and number for the new document
document_uid = result['document'].get('uid')
doc_number = result['document'].get('doc_number', 'Unknown')
# If uploading a file (binary or text), create an initial version with the file
if file_content and file_name:
self.notification_area.object = f"**Creating initial version for document {doc_number}...**"
# Get file type from extension
file_extension = file_name.split('.')[-1].lower() if '.' in file_name else ''
# Create a version with the file content
version_result = create_document_version(
user=self.user,
document_uid=document_uid,
file_content=file_content,
file_name=file_name,
comment="Initial document version"
)
if not version_result or not version_result.get('UID'):
error_msg = version_result.get('message', 'Unknown error') if version_result else 'Failed to create version'
logger.error(f"Failed to create version: {error_msg}")
self.notification_area.object = f"**Warning:** Document {doc_number} created but initial version failed: {error_msg}"
self.update_document_list()
return
# Import FileCloud controller and upload file to FileCloud
try:
from CDocs.controllers.filecloud_controller import upload_document_to_filecloud
from CDocs.models.document import ControlledDocument
# Get the document object using the UID
document = ControlledDocument(uid=document_uid)
if document:
# Upload the file to FileCloud
self.notification_area.object = f"**Uploading file to FileCloud for document {doc_number}...**"
# Set document metadata for storage
metadata = {
"doc_uid": document_uid,
"doc_number": doc_number,
"version_uid": version_result.get('UID'),
"version_number": "0.1", # Initial version
"title": title_input.value,
"status": "DRAFT"
}
# Upload to FileCloud
filecloud_result = upload_document_to_filecloud(
user=self.user,
document=document,
file_content=file_content,
version_comment="Initial document version",
metadata=metadata
)
if not filecloud_result or not filecloud_result.get('success'):
error_msg = filecloud_result.get('message', 'Unknown error') if filecloud_result else 'Failed to upload to FileCloud'
logger.error(f"Failed to upload to FileCloud: {error_msg}")
self.notification_area.object = f"**Warning:** Document {doc_number} created but FileCloud upload failed: {error_msg}"
self.update_document_list()
return
except Exception as fc_error:
logger.error(f"Error uploading to FileCloud: {fc_error}")
self.notification_area.object = f"**Warning:** Document {doc_number} created but FileCloud upload failed: {str(fc_error)}"
self.update_document_list()
return
# Show success message
success_msg = f"**Success:** Document {doc_number} created"
if custom_doc_number:
success_msg += f" with custom number"
self.notification_area.object = success_msg
# Update sharing permissions for document
try:
from CDocs.controllers.share_controller import manage_document_permissions
from CDocs.models.document import ControlledDocument
document = ControlledDocument(uid=document_uid)
permission_result = manage_document_permissions(document)
logger.debug(f"Permission management result: {permission_result}")
except Exception as perm_error:
logger.warning(f"Error managing document permissions: {perm_error}")
# Don't fail the whole operation for permission errors
# Update document list
self.update_document_list()
except Exception as e:
logger.error(f"Error creating document: {e}")
import traceback
logger.error(traceback.format_exc())
self.notification_area.object = f"**Error creating document:** {str(e)}"
def handle_cancel(event):
# Reset form and return to document list
self.notification_area.object = ""
self.update_document_list()
# Bind handlers
create_btn.on_click(handle_create)
cancel_btn.on_click(handle_cancel)
# Add form to document list container
self.document_list_container.append(create_form)
except Exception as e:
logger.error(f"Error showing create document form: {e}")
import traceback
logger.error(traceback.format_exc())
self.notification_area.object = f"**Error showing create document form:** {str(e)}"
# Return to document list
self.update_document_list()
def _apply_filters(self, event=None):
"""Apply the selected filters to the document list."""
try:
# Show loading message
self.notification_area.object = "Applying filters..."
# Get filter values - use the values directly since update_document_list handles conversion
search_text = self.search_input.value.strip() if self.search_input.value else None
doc_type = self.doc_type_select.value if self.doc_type_select.value else None
status = self.status_select.value if self.status_select.value else None
department = self.department_select.value if self.department_select.value else None
date_from = self.date_from_picker.value
date_to = self.date_to_picker.value
# Log filter values for debugging
logger.debug(f"Applying filters - Search: '{search_text}', Type: '{doc_type}', Status: '{status}', Dept: '{department}'")
# Update the document list with filters
self.update_document_list()
except Exception as e:
logger.error(f"Error applying filters: {e}")
import traceback
logger.error(traceback.format_exc())
self.notification_area.object = f"Error applying filters: {str(e)}"
def _clear_filters(self, event=None):
"""Clear all filters and reset the document list."""
try:
# Reset filter values
self.search_input.value = ""
self.doc_type_select.value = ""
self.status_select.value = ""
self.department_select.value = ""
if hasattr(self, 'date_from_picker'):
self.date_from_picker.value = None
if hasattr(self, 'date_to_picker'):
self.date_to_picker.value = None
# Update document list with no filters
self.update_document_list()
except Exception as e:
logger.error(f"Error clearing filters: {e}")
self.notification_area.object = f"Error clearing filters: {str(e)}"
def _get_valid_button_type(self, button_type: str) -> str:
"""Get a valid button type for Panel 1.6.1 compatibility."""
# Valid button types in Panel 1.6.1
valid_types = ['default', 'primary', 'success', 'warning', 'danger', 'light']
# Map deprecated/invalid types to valid ones
type_mapping = {
'info': 'primary', # Map info to primary
'secondary': 'light', # Map secondary to light
}
# Return mapped type if it exists, otherwise check if original is valid
if button_type in type_mapping:
return type_mapping[button_type]
elif button_type in valid_types:
return button_type
else:
logger.warning(f"Invalid button type '{button_type}', defaulting to 'default'")
return 'default'
def _create_action_buttons(self):
"""Create action buttons for the dashboard."""
try:
# Create document button
self.create_doc_btn = pn.widgets.Button(
name="Create Document",
button_type=self._get_valid_button_type("success"),
width=150
)
self.create_doc_btn.on_click(self.show_create_document_form)
# Training dashboard button - using helper method for compatibility
self.training_btn = pn.widgets.Button(
name="Training Dashboard",
button_type=self._get_valid_button_type("primary"),
width=150
)
# Use a lambda to handle the event parameter properly
self.training_btn.on_click(lambda event: self._navigate_to_training(event))
# Refresh button
self.refresh_btn = pn.widgets.Button(
name="Refresh",
button_type=self._get_valid_button_type("default"),
width=100
)
self.refresh_btn.on_click(self.refresh_data)
# Return button row
return pn.Row(
self.create_doc_btn,
self.training_btn,
pn.layout.HSpacer(),
self.refresh_btn,
sizing_mode='stretch_width'
)
except Exception as e:
logger.error(f"Error creating action buttons: {e}")
return pn.Row()
def _navigate_to_training(self, event=None):
"""Navigate to training dashboard."""
try:
if self.parent_app and hasattr(self.parent_app, 'navigate_to_training_dashboard'):
# Call without passing the event since the method doesn't expect it
self.parent_app.navigate_to_training_dashboard()
else:
logger.warning("Parent app does not have training navigation method")
except Exception as e:
logger.error(f"Error navigating to training: {e}")
Parameters
| Name | Type | Default | Kind |
|---|---|---|---|
bases |
param.Parameterized | - |
Parameter Details
bases: Parameter of type param.Parameterized
Return Value
Returns unspecified type
Class Interface
Methods
__init__(self, parent_app)
Purpose: Internal method: init
Parameters:
parent_app: Parameter
Returns: None
_setup_notification_callback(self)
Purpose: Set up notification callback timer.
Returns: None
_refresh_notifications(self)
Purpose: Refresh notifications from the server.
Returns: None
_update_notification_display(self, notifications)
Purpose: Update the notification display area.
Parameters:
notifications: Parameter
Returns: None
_get_field_case_insensitive(self, data_dict, field_names)
Purpose: Helper method to get field value from dictionary with case-insensitive field name matching.
Parameters:
data_dict: Parameterfield_names: Parameter
Returns: None
__del__(self)
Purpose: Cleanup when dashboard is destroyed.
Returns: None
stop_notifications(self)
Purpose: Stop notification polling - useful when switching views.
Returns: None
restart_notifications(self)
Purpose: Restart notification polling - useful when returning to dashboard.
Returns: See docstring for return details
set_user(self, user)
Purpose: Set the current user from parent application
Parameters:
user: Parameter
Returns: None
_update_ui_for_user(self)
Purpose: Update UI components based on current user permissions
Returns: None
_setup_sidebar(self)
Purpose: Set up the sidebar filters and actions.
Returns: None
get_dashboard_view(self)
Purpose: Get the dashboard view with document list, notifications, and actions.
Returns: None
_create_tasks_section_full_width(self)
Purpose: Create the tasks section for full-width display above document list.
Returns: None
_create_tasks_content(self)
Purpose: Create the tasks content without duplicate headers.
Returns: None
_create_filters_section(self)
Purpose: Create the filters section for the sidebar.
Returns: None
_create_tasks_section(self)
Purpose: Create the tasks section for the sidebar.
Returns: None
_create_header_with_notifications(self)
Purpose: Create header with integrated notification controls.
Returns: None
_create_notifications_section(self)
Purpose: Create the notifications section for the sidebar using Panel widgets.
Returns: None
_create_notification_widget(self, notification)
Purpose: Create a Panel widget for a single notification.
Parameters:
notification: Type: Dict[str, Any]
Returns: None
mark_notification_read_callback(self, notification_uid)
Purpose: Callback method to mark notification as read from UI interactions.
Parameters:
notification_uid: Type: str
Returns: None
_refresh_notifications_with_widgets(self)
Purpose: Refresh notifications using Panel widgets instead of HTML.
Returns: None
_refresh_notifications(self)
Purpose: Refresh notifications from database.
Returns: None
_format_notifications_html(self, notifications) -> str
Purpose: Format notifications as HTML with proper click handling.
Parameters:
notifications: Type: List[Dict[str, Any]]
Returns: Returns str
_format_time_ago(self, created_date) -> str
Purpose: Format datetime as 'time ago' string.
Parameters:
created_date: Type: datetime
Returns: Returns str
_toggle_notifications(self, event)
Purpose: Toggle notification panel visibility with better user feedback.
Parameters:
event: Parameter
Returns: None
_mark_all_notifications_read(self)
Purpose: Mark all user notifications as read.
Returns: None
_create_document_list(self)
Purpose: Create the document list component.
Returns: None
refresh_tasks(self)
Purpose: Refresh the tasks panel
Returns: None
update_document_list(self, event)
Purpose: Update the document list with current filters.
Parameters:
event: Parameter
Returns: None
_show_sample_documents(self)
Purpose: Show sample documents when database access fails.
Returns: None
_format_documents_as_markdown(self, documents)
Purpose: Format documents as markdown table for fallback display.
Parameters:
documents: Parameter
Returns: None
document_selected(self, event)
Purpose: Handle document selection from table
Parameters:
event: Parameter
Returns: None
show_create_document_form(self, event)
Purpose: Show the create document form with custom document number option
Parameters:
event: Parameter
Returns: None
_apply_filters(self, event)
Purpose: Apply the selected filters to the document list.
Parameters:
event: Parameter
Returns: None
_clear_filters(self, event)
Purpose: Clear all filters and reset the document list.
Parameters:
event: Parameter
Returns: None
_get_valid_button_type(self, button_type) -> str
Purpose: Get a valid button type for Panel 1.6.1 compatibility.
Parameters:
button_type: Type: str
Returns: Returns str
_create_action_buttons(self)
Purpose: Create action buttons for the dashboard.
Returns: None
_navigate_to_training(self, event)
Purpose: Navigate to training dashboard.
Parameters:
event: Parameter
Returns: None
Required Imports
import logging
from typing import Dict
from typing import Any
from typing import List
from typing import Optional
Usage Example
# Example usage:
# result = DocumentDashboard(bases)
Tags
Similar Components
AI-powered semantic similarity - components with related functionality:
-
class ControlledDocument 70.1% similar
-
class ControlledDocApp 64.8% similar
-
function controlled_docs_navigation 63.4% similar
-
class DocumentAccessControls 62.8% similar
-
function create_document 62.8% similar