class WorkflowPanelBase
Base class for workflow panels (review and approval) that provides common functionality and UI components.
/tf/active/vicechatdev/CDocs single class/ui/workflow_panel_base.py
39 - 1557
moderate
Purpose
Base class for workflow panels (review and approval) that provides common functionality and UI components.
Source Code
class WorkflowPanelBase:
"""
Base class for workflow panels (review and approval) that provides
common functionality and UI components.
"""
def __init__(self, template,
session_manager=None,
parent_app=None,
embedded=False,
workflow_type=None,
**params):
"""
Initialize a workflow panel
Args:
template: Panel template for displaying the UI
session_manager: Authentication session manager
parent_app: Parent application reference for navigation
embedded: Whether this panel is embedded in another UI
workflow_type: Type of workflow ('REVIEW' or 'APPROVAL')
"""
self.template = template
self.session_manager = session_manager or SessionManager()
self.parent_app = parent_app
self.user = self._get_current_user()
self.notification_area = pn.pane.Markdown("")
self.workflow_data = None
self.document_data = None
self.embedded = embedded
self.workflow_type = workflow_type or 'REVIEW'
# Controller for this workflow type (set by subclasses)
self.controller = None
# Set display names based on workflow type
self.workflow_name = "Workflow"
self.participant_name = "Participant"
self.cycle_name = "Cycle"
self.assignment_name = "Assignment"
if workflow_type == 'REVIEW':
self.workflow_name = "Review"
self.participant_name = "Reviewer"
self.cycle_name = "Review Cycle"
self.assignment_name = "Review Assignment"
elif workflow_type == 'APPROVAL':
self.workflow_name = "Approval"
self.participant_name = "Approver"
self.cycle_name = "Approval Cycle"
self.assignment_name = "Approval Assignment"
# Create container for main content
self.main_content = pn.Column(sizing_mode='stretch_width')
# Initialize important containers
self.stats_area = pn.Column(sizing_mode='stretch_width')
self.workflow_list_area = pn.Column(sizing_mode='stretch_width')
self.workflow_detail_area = pn.Column(sizing_mode='stretch_width')
self.document_detail_area = pn.Column(sizing_mode='stretch_width')
# Set up basic UI structure
self._setup_ui()
# Set current tab/view state
self.current_tab = 'my_assignments'
self.cycle_uid = ''
self.document_uid = ''
def _setup_ui(self):
"""Set up the basic UI structure based on embedded mode"""
# Set up the user interface
if not self.embedded:
# Only set up template components in standalone mode
self._setup_header()
self._setup_sidebar()
self.template.main.append(self.notification_area)
self.template.main.append(self.main_content)
# Always set up main content components
self.main_content.append(self.stats_area)
self.main_content.append(self.workflow_list_area)
self.main_content.append(self.workflow_detail_area)
self.main_content.append(self.document_detail_area)
# Hide detail areas initially
self.workflow_detail_area.visible = False
self.document_detail_area.visible = False
def _get_current_user(self) -> Optional['DocUser']:
"""Get the current user from session or parent app"""
# First check if parent app has current_user
if hasattr(self, 'parent_app') and self.parent_app and hasattr(self.parent_app, 'current_user'):
return self.parent_app.current_user
# Otherwise try session manager
if self.session_manager:
user_id = self.session_manager.get_user_id()
if user_id:
return DocUser.get_by_uid(user_id)
return None
def set_user(self, user):
"""Set the current user - needed for compatibility with main app"""
# Store the user object
self.user = user
# Reload workflow data with the new user
try:
# Update statistics and reload data
self._update_workflow_statistics()
self._load_pending_assignments()
except Exception as e:
logger.error(f"Error in set_user: {str(e)}")
def _setup_header(self):
"""Set up the header with title and actions"""
# Create back button
back_btn = Button(
name='Back to Dashboard',
button_type='default',
width=150
)
back_btn.on_click(self._navigate_back)
# Create refresh button
refresh_btn = Button(
name='Refresh',
button_type='default',
width=100
)
refresh_btn.on_click(self._refresh_current_view)
# Header with buttons
header = Row(
pn.pane.Markdown(f"# {self.workflow_name} Management"),
refresh_btn,
back_btn,
sizing_mode='stretch_width',
align='end'
)
self.template.header.append(header)
def _setup_sidebar(self):
"""Set up the sidebar with navigation options"""
# Create navigation buttons
my_assignments_btn = Button(
name=f'My Pending {self.workflow_name}s',
button_type='primary',
width=200
)
my_assignments_btn.on_click(self._load_pending_assignments)
completed_assignments_btn = Button(
name=f'My Completed {self.workflow_name}s',
button_type='default',
width=200
)
completed_assignments_btn.on_click(self._load_completed_assignments)
# Function to check admin permission based on workflow type
manage_perm = f"MANAGE_{self.workflow_type}S"
# Add management buttons if user has workflow management permission
if self.user and permissions.user_has_permission(self.user, manage_perm):
all_workflows_btn = Button(
name=f'All {self.workflow_name}s',
button_type='default',
width=200
)
all_workflows_btn.on_click(self._load_all_workflows)
# Add to navigation area
navigation = Column(
Markdown("## Navigation"),
my_assignments_btn,
completed_assignments_btn,
all_workflows_btn,
sizing_mode='fixed'
)
else:
# Add to navigation area without all workflows button
navigation = Column(
Markdown("## Navigation"),
my_assignments_btn,
completed_assignments_btn,
sizing_mode='fixed'
)
self.template.sidebar.append(navigation)
# Add statistics area
self.stats_area = Column(
Markdown(f"## {self.workflow_name} Statistics"),
Markdown("*Loading statistics...*"),
sizing_mode='fixed',
styles={'background': '#f8f9fa'},
css_classes=['p-3', 'border', 'rounded']
)
self.template.sidebar.append(self.stats_area)
# Update statistics
self._update_workflow_statistics()
def _refresh_current_view(self, event=None):
"""Refresh the current view"""
if self.current_tab == 'my_assignments':
self._load_pending_assignments()
elif self.current_tab == 'completed_assignments':
self._load_completed_assignments()
elif self.current_tab == 'all_workflows':
self._load_all_workflows()
elif self.current_tab == 'workflow_detail':
self._load_cycle(self.cycle_uid)
def _navigate_back(self, event=None):
"""Navigate back to dashboard"""
# Check if we have a parent app reference
if hasattr(self, 'parent_app') and self.parent_app is not None:
# Use parent app's load_dashboard method
try:
if hasattr(self.parent_app, 'load_dashboard'):
self.parent_app.load_dashboard()
return
except Exception as e:
logger.error(f"Error navigating back via parent app: {e}")
# Fallback to direct state manipulation
try:
if pn.state.curdoc:
pn.state.curdoc.clear()
# Reload dashboard page
from CDocs.ui.dashboard_panel import create_dashboard_panel
dashboard = create_dashboard_panel(self.session_manager)
dashboard.servable()
else:
# Last resort - use JavaScript
pn.state.execute("window.location.href = '/'")
except Exception as e:
logger.error(f"Error navigating back to dashboard: {e}")
def _update_workflow_statistics(self):
"""Update the workflow statistics display."""
try:
if not self.user or not self.controller:
# Create empty stats area if no user or controller
self.stats_area.clear()
self.stats_area.append(pn.pane.Markdown(f"# {self.workflow_name} Statistics"))
self.stats_area.append(pn.pane.Markdown("*Please log in to view statistics*"))
return
# Get workflow statistics with error handling
try:
stats = self.controller.get_workflow_statistics()
if not stats:
stats = {} # Ensure we have a dictionary even if None is returned
# Format counts with safe default values
total_count = stats.get('total_cycles', 0)
pending_count = stats.get('pending_cycles', 0)
in_progress_count = stats.get('in_progress_cycles', 0)
completed_count = stats.get('completed_cycles', 0)
# Calculate percentages with safe handling for division by zero
if total_count > 0:
pending_pct = (pending_count / total_count) * 100
in_progress_pct = (in_progress_count / total_count) * 100
completed_pct = (completed_count / total_count) * 100
else:
pending_pct = in_progress_pct = completed_pct = 0
# Format averages with safe default values and handling for None
avg_duration = stats.get('average_duration_days')
avg_duration_str = f"{avg_duration:.1f} days" if avg_duration is not None else "N/A"
avg_completion = stats.get('average_completion_days')
avg_completion_str = f"{avg_completion:.1f} days" if avg_completion is not None else "N/A"
overdue_count = stats.get('overdue_count', 0)
overdue_pct = (overdue_count / total_count) * 100 if total_count > 0 else 0
# Create statistics cards
stats_cards = [
pn.Column(
pn.pane.Markdown(f"### Total {self.workflow_name}s"),
pn.pane.Markdown(f"**{total_count}**"),
width=150,
styles={'background':'#f8f9fa'},
css_classes=['p-2', 'border', 'rounded']
),
pn.Column(
pn.pane.Markdown("### Pending"),
pn.pane.Markdown(f"**{pending_count}** ({pending_pct:.0f}%)"),
width=150,
styles={'background':'#f8f9fa'},
css_classes=['p-2', 'border', 'rounded']
),
pn.Column(
pn.pane.Markdown("### In Progress"),
pn.pane.Markdown(f"**{in_progress_count}** ({in_progress_pct:.0f}%)"),
width=150,
styles={'background':'#f8f9fa'},
css_classes=['p-2', 'border', 'rounded']
),
pn.Column(
pn.pane.Markdown("### Completed"),
pn.pane.Markdown(f"**{completed_count}** ({completed_pct:.0f}%)"),
width=150,
styles={'background':'#f8f9fa'},
css_classes=['p-2', 'border', 'rounded']
),
pn.Column(
pn.pane.Markdown("### Avg Duration"),
pn.pane.Markdown(f"**{avg_duration_str}**"),
width=150,
styles={'background':'#f8f9fa'},
css_classes=['p-2', 'border', 'rounded']
),
pn.Column(
pn.pane.Markdown("### Overdue"),
pn.pane.Markdown(f"**{overdue_count}** ({overdue_pct:.0f}%)"),
width=150,
styles={'background':'#f8f9fa', 'color': '#dc3545' if overdue_count > 0 else ''},
css_classes=['p-2', 'border', 'rounded']
)
]
# Clear stats area and add cards
self.stats_area.clear()
self.stats_area.append(pn.Row(*stats_cards, sizing_mode='stretch_width'))
except Exception as e:
logger.error(f"Error fetching workflow statistics: {e}")
self.stats_area.clear()
self.stats_area.append(pn.pane.Markdown(f"# {self.workflow_name} Statistics"))
self.stats_area.append(pn.pane.Markdown(f"*Error loading statistics: {str(e)}*"))
except Exception as e:
logger.error(f"Error updating workflow statistics: {e}")
self.stats_area.clear()
self.stats_area.append(pn.pane.Markdown(f"# {self.workflow_name} Statistics"))
self.stats_area.append(pn.pane.Markdown(f"*Error loading statistics*"))
def _load_pending_assignments(self, event=None):
"""Load pending assignments for the current user"""
try:
self.current_tab = 'my_assignments'
self.notification_area.object = f"Loading your pending {self.workflow_name.lower()}s..."
# Clear areas and prepare main content
self.workflow_list_area.clear()
self.workflow_detail_area.clear()
self.document_detail_area.clear()
self.workflow_detail_area.visible = False
self.document_detail_area.visible = False
# Make sure the main content only contains the necessary components in the right order
self.main_content.clear()
# Update statistics
self._update_workflow_statistics()
# Add the stats area, list area, and detail areas back to main_content
self.main_content.append(self.stats_area)
self.main_content.append(self.workflow_list_area)
self.main_content.append(self.workflow_detail_area)
self.main_content.append(self.document_detail_area)
if not self.user:
self.notification_area.object = "Please log in to view your assignments"
self.workflow_list_area.clear()
self.workflow_list_area.append(Markdown(f"# My Pending {self.workflow_name}s"))
self.workflow_list_area.append(Markdown("*Please log in to view assignments*"))
return
# Create a main container that will hold all content with explicit structure
pending_assignments_container = pn.Column(sizing_mode='stretch_width')
# Add title to the main container
pending_assignments_container.append(pn.pane.Markdown(f"# My Pending {self.workflow_name}s"))
pending_assignments_container.append(self.notification_area)
# Get pending assignments using the appropriate controller method
try:
# Different controller function naming conventions between review and approval
if self.workflow_type == 'REVIEW':
from CDocs.controllers.review_controller import get_user_pending_reviews
result = get_user_pending_reviews(self.user)
assignments = result.get('reviews', [])
else:
from CDocs.controllers.approval_controller import get_user_pending_approvals
result = get_user_pending_approvals(self.user)
assignments = result.get('approvals', [])
# Create table for pending assignments
if assignments:
# Format data for table
table_data = []
for assignment in assignments:
# Skip if no assignment or doesn't have required data
if not assignment:
continue
# Safely extract document info
doc_info = assignment.get('document') or {}
doc_title = doc_info.get('title', 'Unknown Document')
doc_number = doc_info.get('doc_number', doc_info.get('docNumber', 'N/A'))
# Extract cycle and assignment details
cycle_uid = assignment.get('UID', '')
if not cycle_uid:
continue
status = assignment.get('status', 'PENDING')
# Handle date fields with proper conversion from Neo4j DateTime objects
start_date = assignment.get('startDate', assignment.get('start_date', ''))
due_date = assignment.get('dueDate', assignment.get('due_date', ''))
# Format dates
if hasattr(start_date, 'to_native'):
# Neo4j DateTime object
start_date = start_date.to_native().strftime('%Y-%m-%d')
elif isinstance(start_date, str) and 'T' in start_date:
start_date = start_date.split('T')[0]
if hasattr(due_date, 'to_native'):
# Neo4j DateTime object
due_date = due_date.to_native().strftime('%Y-%m-%d')
elif isinstance(due_date, str) and 'T' in due_date:
due_date = due_date.split('T')[0]
# Create view button
view_button = f'<button class="view-{self.workflow_name.lower()}-btn" data-uid="{cycle_uid}">View</button>'
# Add to table data with UID column for reference
table_data.append({
'Document': f"{doc_title} ({doc_number})",
'Status': status,
'Started': start_date,
'Due': due_date,
'Action': view_button,
'UID': cycle_uid
})
if table_data:
# Create DataFrame for Tabulator
import pandas as pd
df = pd.DataFrame(table_data)
# Create Tabulator widget
table = pn.widgets.Tabulator(
df,
formatters={'Action': {'type': 'html'}},
selectable=True,
pagination='local',
page_size=10,
sizing_mode='stretch_width',
hidden_columns=['UID']
)
# Add click handler
table.on_click(self._workflow_selected)
# Add table to container
pending_assignments_container.append(table)
# Add JavaScript to handle button clicks
js_code = f"""
function setupViewButtons() {{
// Set timeout to ensure DOM is ready
setTimeout(function() {{
const buttons = document.querySelectorAll('.view-{self.workflow_name.lower()}-btn');
buttons.forEach(button => {{
button.addEventListener('click', function(e) {{
const uid = e.target.getAttribute('data-uid');
// Send message to Python
if (uid) {{
console.log('View {self.workflow_name.lower()} clicked:', uid);
comm.send({{event_type: 'view_{self.workflow_name.lower()}', uid: uid}});
}}
}});
}});
}}, 500);
}}
// Initial setup
setupViewButtons();
// Setup observer for pagination changes
const observer = new MutationObserver(function(mutations) {{
setupViewButtons();
}});
// Start observing
const tabulator = document.querySelector('.tabulator');
if (tabulator) {{
observer.observe(tabulator, {{ childList: true, subtree: true }});
}}
"""
# Add JS to panel
js_pane = pn.pane.HTML(f"<script>{js_code}</script>", width=0, height=0)
pending_assignments_container.append(js_pane)
# Add message handler
def handle_js_msg(event):
if event.get('event_type') == f'view_{self.workflow_name.lower()}':
cycle_uid = event.get('uid')
if cycle_uid:
self._load_cycle(cycle_uid)
if pn.state.curdoc:
pn.state.curdoc.on_message(handle_js_msg)
# Add counter at bottom
pending_assignments_container.append(pn.pane.Markdown(f"*Showing {len(table_data)} pending {self.workflow_name.lower()}(s)*"))
else:
# No data after formatting
pending_assignments_container.append(pn.pane.Markdown(f"*No pending {self.workflow_name.lower()}s found*"))
else:
# No assignments returned from API
pending_assignments_container.append(pn.pane.Markdown(f"*No pending {self.workflow_name.lower()}s found*"))
# Add the container to the workflow list area
self.workflow_list_area.append(pending_assignments_container)
self.notification_area.object = ""
except Exception as e:
logger.error(f"Error loading pending assignments: {e}")
logger.error(traceback.format_exc())
self.notification_area.object = f"**Error:** {str(e)}"
pending_assignments_container.append(pn.pane.Markdown(f"*Error loading pending {self.workflow_name.lower()}s: {str(e)}*"))
self.workflow_list_area.append(pending_assignments_container)
except Exception as e:
logger.error(f"Error loading pending assignments: {e}")
logger.error(traceback.format_exc())
self.notification_area.object = f"**Error:** {str(e)}"
self.main_content.clear()
self.main_content.append(self.stats_area)
self.workflow_list_area.clear()
self.workflow_list_area.append(Markdown(f"# My Pending {self.workflow_name}s"))
self.workflow_list_area.append(Markdown(f"*Error loading pending {self.workflow_name.lower()}s: {str(e)}*"))
self.main_content.append(self.workflow_list_area)
def _load_completed_assignments(self, event=None):
"""Load completed assignments for the current user"""
try:
self.current_tab = 'completed_assignments'
self.notification_area.object = f"Loading your completed {self.workflow_name.lower()}s..."
# Clear areas and prepare main content
self.workflow_list_area.clear()
self.workflow_detail_area.clear()
self.document_detail_area.clear()
self.workflow_detail_area.visible = False
self.document_detail_area.visible = False
# Make sure the main content only contains the necessary components in the right order
self.main_content.clear()
# Update statistics
self._update_workflow_statistics()
# Add the stats area, list area, and detail areas back to main_content
self.main_content.append(self.stats_area)
self.main_content.append(self.workflow_list_area)
self.main_content.append(self.workflow_detail_area)
self.main_content.append(self.document_detail_area)
if not self.user:
self.notification_area.object = "Please log in to view your assignments"
self.workflow_list_area.clear()
self.workflow_list_area.append(Markdown(f"# My Completed {self.workflow_name}s"))
self.workflow_list_area.append(Markdown("*Please log in to view assignments*"))
return
# Create a main container that will hold all content with explicit structure
completed_assignments_container = pn.Column(sizing_mode='stretch_width')
# Add title to the main container
completed_assignments_container.append(pn.pane.Markdown(f"# My Completed {self.workflow_name}s"))
completed_assignments_container.append(self.notification_area)
# Get completed assignments using the appropriate controller method
try:
# Different controller function naming conventions between review and approval
if self.workflow_type == 'REVIEW':
result = self.controller.get_user_assigned_reviews(
user=self.user,
status_filter=["COMPLETED", "REJECTED", "SKIPPED"],
include_completed=True
)
assignments = result.get('assignments', [])
else:
result = self.controller.get_user_assigned_approvals(
user=self.user,
status_filter=["COMPLETED", "REJECTED", "SKIPPED"],
include_completed=True
)
assignments = result.get('assignments', [])
# Create table for completed assignments
if assignments:
# Format data for table
table_data = []
for assignment in assignments:
# Skip if no assignment or doesn't have required data
if not assignment:
continue
# Safely extract document info
doc_info = assignment.get('document') or {}
doc_title = doc_info.get('title', 'Unknown Document')
doc_number = doc_info.get('doc_number', doc_info.get('docNumber', 'N/A'))
# Extract cycle and assignment details
cycle_uid = assignment.get('cycle_uid', assignment.get('UID', ''))
if not cycle_uid:
continue
status = assignment.get('status', 'COMPLETED')
decision = assignment.get('decision', '')
# Handle date fields with proper conversion from Neo4j DateTime objects
completed_date = assignment.get('decision_date', assignment.get('completed_at', ''))
# Format dates
if hasattr(completed_date, 'to_native'):
# Neo4j DateTime object
completed_date = completed_date.to_native().strftime('%Y-%m-%d')
elif isinstance(completed_date, str) and 'T' in completed_date:
completed_date = completed_date.split('T')[0]
# Create view button
view_button = f'<button class="view-{self.workflow_name.lower()}-btn" data-uid="{cycle_uid}">View</button>'
# Add to table data with UID column for reference
table_data.append({
'Document': f"{doc_title} ({doc_number})",
'Status': status,
'Decision': decision,
'Completed': completed_date,
'Action': view_button,
'UID': cycle_uid
})
if table_data:
# Create DataFrame for Tabulator
import pandas as pd
df = pd.DataFrame(table_data)
# Create Tabulator widget
table = pn.widgets.Tabulator(
df,
formatters={'Action': {'type': 'html'}},
selectable=True,
pagination='local',
page_size=10,
sizing_mode='stretch_width',
hidden_columns=['UID']
)
# Add click handler
table.on_click(self._workflow_selected)
# Add table to container
completed_assignments_container.append(table)
# Add JavaScript to handle button clicks
js_code = f"""
function setupViewButtons() {{
// Set timeout to ensure DOM is ready
setTimeout(function() {{
const buttons = document.querySelectorAll('.view-{self.workflow_name.lower()}-btn');
buttons.forEach(button => {{
button.addEventListener('click', function(e) {{
const uid = e.target.getAttribute('data-uid');
// Send message to Python
if (uid) {{
console.log('View {self.workflow_name.lower()} clicked:', uid);
comm.send({{event_type: 'view_{self.workflow_name.lower()}', uid: uid}});
}}
}});
}});
}}, 500);
}}
// Initial setup
setupViewButtons();
// Setup observer for pagination changes
const observer = new MutationObserver(function(mutations) {{
setupViewButtons();
}});
// Start observing
const tabulator = document.querySelector('.tabulator');
if (tabulator) {{
observer.observe(tabulator, {{ childList: true, subtree: true }});
}}
"""
# Add JS to panel
js_pane = pn.pane.HTML(f"<script>{js_code}</script>", width=0, height=0)
completed_assignments_container.append(js_pane)
# Add message handler
def handle_js_msg(event):
if event.get('event_type') == f'view_{self.workflow_name.lower()}':
cycle_uid = event.get('uid')
if cycle_uid:
self._load_cycle(cycle_uid)
if pn.state.curdoc:
pn.state.curdoc.on_message(handle_js_msg)
# Add counter at bottom
completed_assignments_container.append(pn.pane.Markdown(f"*Showing {len(table_data)} completed {self.workflow_name.lower()}(s)*"))
else:
# No data after formatting
completed_assignments_container.append(pn.pane.Markdown(f"*No completed {self.workflow_name.lower()}s found*"))
else:
# No assignments returned from API
completed_assignments_container.append(pn.pane.Markdown(f"*No completed {self.workflow_name.lower()}s found*"))
# Add the container to the workflow list area
self.workflow_list_area.append(completed_assignments_container)
self.notification_area.object = ""
except Exception as e:
logger.error(f"Error loading completed assignments: {e}")
logger.error(traceback.format_exc())
self.notification_area.object = f"**Error:** {str(e)}"
completed_assignments_container.append(pn.pane.Markdown(f"*Error loading completed {self.workflow_name.lower()}s: {str(e)}*"))
self.workflow_list_area.append(completed_assignments_container)
except Exception as e:
logger.error(f"Error loading completed assignments: {e}")
logger.error(traceback.format_exc())
self.notification_area.object = f"**Error:** {str(e)}"
self.main_content.clear()
self.main_content.append(self.stats_area)
self.workflow_list_area.clear()
self.workflow_list_area.append(Markdown(f"# My Completed {self.workflow_name}s"))
self.workflow_list_area.append(Markdown(f"*Error loading completed {self.workflow_name.lower()}s: {str(e)}*"))
self.main_content.append(self.workflow_list_area)
def _load_all_workflows(self, event=None):
"""Load all workflow cycles (admin view)"""
try:
self.current_tab = 'all_workflows'
self.notification_area.object = f"Loading all {self.workflow_name.lower()} cycles..."
# Clear areas and prepare main content
self.workflow_list_area.clear()
self.workflow_detail_area.clear()
self.document_detail_area.clear()
self.workflow_detail_area.visible = False
self.document_detail_area.visible = False
# Make sure the main content only contains the necessary components in the right order
self.main_content.clear()
# Update statistics
self._update_workflow_statistics()
# Add the stats area, list area, and detail areas back to main_content
self.main_content.append(self.stats_area)
self.main_content.append(self.workflow_list_area)
self.main_content.append(self.workflow_detail_area)
self.main_content.append(self.document_detail_area)
# Check for user and permission
manage_perm = f"MANAGE_{self.workflow_type}S"
if not self.user:
self.notification_area.object = "Please log in to access this feature"
self.workflow_list_area.clear()
self.workflow_list_area.append(Markdown(f"# All {self.workflow_name}s"))
self.workflow_list_area.append(Markdown("*Please log in to view this page*"))
return
if not permissions.user_has_permission(self.user, manage_perm):
self.notification_area.object = "You don't have permission to access this feature"
self.workflow_list_area.clear()
self.workflow_list_area.append(Markdown(f"# All {self.workflow_name}s"))
self.workflow_list_area.append(Markdown("*You don't have permission to view this page*"))
return
# Create a main container that will hold all content with explicit structure
all_workflows_container = pn.Column(sizing_mode='stretch_width')
# Add title to the main container
all_workflows_container.append(pn.pane.Markdown(f"# All {self.workflow_name}s"))
# Create accordion for better layout control
accordion = pn.Accordion(sizing_mode='stretch_width')
# Create filters panel components
status_opts = ['PENDING', 'IN_PROGRESS', 'COMPLETED', 'APPROVED', 'REJECTED', 'CANCELED']
status_filter = pn.widgets.MultiChoice(
name='Status Filter',
options=status_opts,
value=['PENDING', 'IN_PROGRESS'],
width=300
)
# Add workflow type filter based on workflow type
workflow_type_opts = []
if self.workflow_type == 'REVIEW':
workflow_type_opts = [''] + settings.REVIEW_TYPES
else:
workflow_type_opts = [''] + settings.APPROVAL_WORKFLOW_TYPES
workflow_type_filter = pn.widgets.Select(
name=f"{self.workflow_name} Type",
options=workflow_type_opts,
width=300
)
# Add document type filter
doc_type_filter = pn.widgets.Select(
name='Document Type',
options=[''] + list(settings.DOCUMENT_TYPES.keys()),
width=300
)
date_range = pn.widgets.DateRangeSlider(
name='Date Range',
start=datetime.now() - timedelta(days=90),
end=datetime.now(),
value=(datetime.now() - timedelta(days=30), datetime.now()),
width=300
)
filter_btn = pn.widgets.Button(
name='Apply Filters',
button_type='primary',
width=150
)
# Create filters panel with better structure and add to accordion
filters_panel = pn.Column(
pn.Row(
pn.Column(
status_filter,
workflow_type_filter,
width=350
),
pn.Column(
doc_type_filter,
date_range,
width=350
),
pn.Column(
filter_btn,
pn.layout.VSpacer(),
width=150
),
sizing_mode='stretch_width'
),
margin=(10, 10, 10, 10),
sizing_mode='stretch_width',
height=200
)
# Add filters panel to accordion
accordion.append((f"{self.workflow_name} Filters", filters_panel))
# Create the table area with its own container and margin
self._workflows_table_area = pn.Column(
pn.pane.Markdown(f"*Loading {self.workflow_name.lower()} cycles...*"),
margin=(10, 10, 10, 10),
sizing_mode='stretch_width'
)
# Add table area to accordion - always start expanded
accordion.append((f"{self.workflow_name} List", self._workflows_table_area))
# Always show the workflow list panel by opening that accordion tab
accordion.active = [0, 1] # Open both panels
# Add accordion to main container
all_workflows_container.append(accordion)
# Add the full container to the workflow list area
self.workflow_list_area.append(all_workflows_container)
# Set up filter button handler
def filter_workflows(event):
status_values = status_filter.value
date_from = date_range.value[0]
date_to = date_range.value[1]
workflow_type_value = workflow_type_filter.value
doc_type_value = doc_type_filter.value
self._load_filtered_workflows(
status_filter=status_values,
date_from=date_from,
date_to=date_to,
workflow_type=workflow_type_value if workflow_type_value else None,
doc_type=doc_type_value if doc_type_value else None
)
filter_btn.on_click(filter_workflows)
# Initial load of workflows with default filters
self._load_filtered_workflows(
status_filter=['PENDING', 'IN_PROGRESS'],
date_from=datetime.now() - timedelta(days=30),
date_to=datetime.now()
)
# Clear notification
self.notification_area.object = ""
except Exception as e:
logger.error(f"Error loading all workflow cycles: {e}")
logger.error(traceback.format_exc())
self.notification_area.object = f"**Error:** {str(e)}"
self.main_content.clear()
self.main_content.append(self.stats_area)
self.workflow_list_area.clear()
self.workflow_list_area.append(pn.pane.Markdown(f"# All {self.workflow_name}s"))
self.workflow_list_area.append(pn.pane.Markdown(f"*Error loading all {self.workflow_name.lower()} cycles: {str(e)}*"))
self.main_content.append(self.workflow_list_area)
def _load_filtered_workflows(self, status_filter=None, date_from=None, date_to=None, workflow_type=None, doc_type=None):
"""Load workflows based on filters"""
try:
# Ensure the table area exists and show loading indicator
if not hasattr(self, '_workflows_table_area'):
self._workflows_table_area = pn.Column(
pn.pane.Markdown(f"*Loading {self.workflow_name.lower()} cycles...*"),
sizing_mode='stretch_width'
)
else:
self._workflows_table_area.clear()
self._workflows_table_area.append(
pn.pane.Markdown(f"*Loading {self.workflow_name.lower()} cycles...*")
)
# Create query parameters
query = {}
if status_filter:
query['status'] = ','.join(status_filter)
if date_from:
query['date_from'] = date_from
if date_to:
query['date_to'] = date_to
if workflow_type:
if self.workflow_type == 'REVIEW':
query['review_type'] = workflow_type
else:
query['approval_type'] = workflow_type
if doc_type:
query['doc_type'] = doc_type
# Call controller to get cycles
try:
cycles, total_count = self.controller.search_cycles(query=query)
if not cycles or len(cycles) == 0:
self._workflows_table_area.clear()
self._workflows_table_area.append(pn.pane.Markdown(f"*No {self.workflow_name.lower()} cycles found matching the criteria*"))
return
# Create table with cycles data
self._create_workflows_table(cycles)
except Exception as db_error:
logger.error(f"Database error in _load_filtered_workflows: {db_error}")
self._workflows_table_area.clear()
self._workflows_table_area.append(
pn.pane.Markdown(f"*Error loading {self.workflow_name.lower()} cycles: {str(db_error)}*")
)
except Exception as e:
logger.error(f"Error in _load_filtered_workflows: {e}")
logger.error(traceback.format_exc())
if hasattr(self, '_workflows_table_area'):
self._workflows_table_area.clear()
self._workflows_table_area.append(
pn.pane.Markdown(f"*Error loading {self.workflow_name.lower()} cycles: {str(e)}*")
)
def _create_workflows_table(self, cycles):
"""Create a Tabulator table for workflow cycles"""
try:
# Extract needed data for each cycle
table_data = []
for cycle in cycles:
# Handle different naming conventions
start_date = cycle.get('startDate') or cycle.get('started_at')
due_date = cycle.get('dueDate') or cycle.get('due_date')
completion_date = cycle.get('completionDate') or cycle.get('completed_at')
# Format dates
formatted_start = self._format_date(start_date)
formatted_due = self._format_date(due_date)
formatted_completion = self._format_date(completion_date)
# Format status with HTML
status_html = self._format_status_html(cycle.get('status', 'UNKNOWN'))
# Get document info
doc_number = None
doc_title = None
if 'document' in cycle:
doc_number = cycle['document'].get('doc_number') or cycle['document'].get('docNumber', '')
doc_title = cycle['document'].get('title', '')
# Format document info
document = f"{doc_number}: {doc_title}" if doc_number and doc_title else "Unknown"
# Get participant count
participant_count = 0
if 'reviewer_assignments' in cycle:
participant_count = len(cycle['reviewer_assignments'])
elif 'approver_assignments' in cycle:
participant_count = len(cycle['approver_assignments'])
elif 'reviewers' in cycle:
participant_count = len(cycle['reviewers'])
elif 'approvers' in cycle:
participant_count = len(cycle['approvers'])
# Add to table data
table_data.append({
'uid': cycle.get('UID') or cycle.get('uid', ''),
'document': document,
'document_uid': cycle.get('document', {}).get('uid', ''),
'status': cycle.get('status', 'UNKNOWN'),
'status_html': status_html,
'start_date': formatted_start,
'due_date': formatted_due,
'completion_date': formatted_completion,
'participant_count': participant_count,
'initiated_by': cycle.get('initiated_by_name', '')
})
# Convert to DataFrame for Tabulator
df = pd.DataFrame(table_data)
# Create formatter for status column
formatters = {
'status_html': {'type': 'html'}
}
# Create Tabulator widget
table = pn.widgets.Tabulator(
df,
formatters=formatters,
selectable=True,
header_filters=True,
show_index=False,
layout='fit_columns',
height=600,
sizing_mode='stretch_width',
columns={
'uid': {'visible': False},
'document_uid': {'visible': False},
'status': {'visible': False},
'status_html': {'title': 'Status'},
'document': {'title': 'Document', 'width': 300},
'start_date': {'title': 'Started', 'width': 100},
'due_date': {'title': 'Due Date', 'width': 100},
'completion_date': {'title': 'Completed', 'width': 100},
'participant_count': {'title': f"{self.participant_name}s", 'width': 80},
'initiated_by': {'title': 'Initiated By', 'width': 150}
}
)
# Add row click handler
table.on_click(self._workflow_selected)
# Update the table area with this table
self._workflows_table_area.clear()
self._workflows_table_area.append(
pn.pane.Markdown(f"**{len(table_data)} {self.workflow_name} cycles found**")
)
self._workflows_table_area.append(table)
except Exception as e:
logger.error(f"Error creating workflows table: {e}")
self._workflows_table_area.clear()
self._workflows_table_area.append(
pn.pane.Markdown(f"*Error creating {self.workflow_name.lower()} cycles table: {str(e)}*")
)
def _format_status_html(self, status):
"""Format workflow status as HTML with color-coding"""
if not status:
return '<span>Unknown</span>'
# Define color mapping - first use workflow-specific mapping if available
status_colors = {}
if self.workflow_type == 'REVIEW' and hasattr(settings, 'REVIEW_STATUSES'):
for key, val in settings.REVIEW_STATUSES.items():
status_colors[key] = {'bg': val.get('color', '#e9ecef'), 'text': '#212529'}
elif self.workflow_type == 'APPROVAL' and hasattr(settings, 'APPROVAL_STATUSES'):
for key, val in settings.APPROVAL_STATUSES.items():
status_colors[key] = {'bg': val.get('color', '#e9ecef'), 'text': '#212529'}
else:
# Default colors
status_colors = {
'PENDING': {'bg': '#e9ecef', 'text': '#212529'},
'IN_PROGRESS': {'bg': '#fff3cd', 'text': '#664d03'},
'COMPLETED': {'bg': '#d1e7dd', 'text': '#0f5132'},
'APPROVED': {'bg': '#d1e7dd', 'text': '#0f5132'},
'REJECTED': {'bg': '#f8d7da', 'text': '#842029'},
'CANCELED': {'bg': '#f5f5f5', 'text': '#6c757d'}
}
# Get color for status or use default gray
color_info = status_colors.get(status, {'bg': '#f8f9fa', 'text': '#212529'})
# Return HTML with inline styling for the status badge
return f'''<span style="display: inline-block; padding: 0.25rem 0.5rem;
font-weight: bold; background-color: {color_info['bg']};
color: {color_info['text']}; border-radius: 0.25rem;
text-align: center; min-width: 80px;">{status}</span>'''
def _convert_neo4j_datetime(self, dt_value):
"""Convert Neo4j DateTime to Python datetime"""
if not dt_value:
return None
# Check if it's a string
if isinstance(dt_value, str):
try:
return datetime.fromisoformat(dt_value)
except ValueError:
pass
# Handle Neo4j DateTime objects
try:
if hasattr(dt_value, '__class__') and dt_value.__class__.__name__ == 'DateTime':
return datetime(
year=dt_value.year,
month=dt_value.month,
day=dt_value.day,
hour=dt_value.hour,
minute=dt_value.minute,
second=dt_value.second
)
except Exception:
pass
# Return as-is if we can't convert
return dt_value
def _workflow_selected(self, event):
"""Handle workflow selection from table"""
try:
# Handle different event types
row_index = None
cycle_uid = None
# Check if this is a CellClickEvent
if hasattr(event, 'row') and event.row is not None:
row_index = event.row
elif hasattr(event, 'new') and event.new is not None:
# Handle Tabulator selection_event
row_index = event.new[0] if len(event.new) > 0 else None
# Exit if we couldn't determine the row
if row_index is None:
return
# Get the data from the table
table = event.obj
if not hasattr(table, 'value') or not hasattr(table.value, 'iloc'):
return
# Get cycle UID from the table
cycle_uid = table.value.iloc[row_index].get('uid')
# Exit if we couldn't determine the cycle UID
if not cycle_uid:
self.notification_area.object = "Could not determine selected workflow cycle"
return
# Log found UID
logger.info(f"Loading workflow cycle with UID: {cycle_uid}")
# Load the selected cycle
self._load_cycle(cycle_uid)
except Exception as e:
logger.error(f"Error selecting workflow cycle: {e}")
logger.error(traceback.format_exc())
self.notification_area.object = f"**Error:** {str(e)}"
def _load_cycle(self, cycle_uid):
"""Load and display a specific workflow cycle"""
try:
self.current_tab = 'workflow_detail'
self.cycle_uid = cycle_uid
self.notification_area.object = f"Loading {self.workflow_name.lower()} details..."
# Clear areas and prepare main content
self.workflow_list_area.clear()
self.workflow_detail_area.clear()
self.document_detail_area.clear()
self.workflow_detail_area.visible = True
self.main_content.clear()
self._update_workflow_statistics()
# Add content areas back to main_content
self.main_content.append(self.stats_area)
self.main_content.append(self.workflow_detail_area)
if not self.user:
self.notification_area.object = "Please log in to view this page"
self.workflow_detail_area.clear()
self.workflow_detail_area.append(Markdown(f"# {self.workflow_name} Details"))
self.workflow_detail_area.append(Markdown("*Please log in to view this information*"))
return
# Get workflow cycle data from controller
try:
# Use the correct method name get_cycle_by_uid which exists in both controller types
self.workflow_data = self.controller.get_cycle_by_uid(cycle_uid)
if not self.workflow_data:
raise ResourceNotFoundError(f"{self.workflow_name} cycle not found")
# Get document data from cycle
document_uid = None
if 'document' in self.workflow_data:
document_uid = self.workflow_data['document'].get('uid')
if document_uid:
self.document_uid = document_uid
self.document_data = get_document(document_uid)
# Create workflow detail view
self._create_workflow_detail_view()
# Clear notification
self.notification_area.object = ""
except ResourceNotFoundError as e:
self.notification_area.object = f"**Not Found:** {str(e)}"
self.workflow_detail_area.clear()
self.workflow_detail_area.append(Markdown(f"# {self.workflow_name} Details"))
self.workflow_detail_area.append(Markdown(f"*{self.workflow_name} cycle not found*"))
except Exception as e:
logger.error(f"Error loading {self.workflow_name.lower()} cycle: {e}")
self.notification_area.object = f"**Error:** {str(e)}"
self.workflow_detail_area.clear()
self.workflow_detail_area.append(Markdown(f"# {self.workflow_name} Details"))
self.workflow_detail_area.append(Markdown(f"*Error loading {self.workflow_name.lower()} details: {str(e)}*"))
except Exception as e:
logger.error(f"Error in _load_cycle: {e}")
self.notification_area.object = f"**Error:** {str(e)}"
def _create_assignment_list(self, title, assignments):
"""Create a list of user assignments"""
try:
# Clear container
self.workflow_list_area.clear()
# Add title
self.workflow_list_area.append(Markdown(f"# {title}"))
# Handle empty assignment list
if not assignments or len(assignments) == 0:
self.workflow_list_area.append(
Markdown(f"*You have no {self.workflow_name.lower()} assignments in this category.*")
)
return
# Generate table data
table_data = []
for assignment in assignments:
# Get document info from different potential paths
doc_number = None
doc_title = None
document_uid = None
if "document" in assignment:
doc_number = assignment["document"].get("doc_number", assignment["document"].get("docNumber", ""))
doc_title = assignment["document"].get("title", "")
document_uid = assignment["document"].get("uid", "")
# Handle both possible cycle field names
cycle = assignment.get("cycle", {}) or assignment.get("review_cycle", {})
# Status formatting with HTML
status = assignment.get("status", "PENDING")
status_html = self._format_status_html(status)
# Determine due date
due_date = assignment.get("due_date") or cycle.get("due_date") or cycle.get("dueDate")
# Add to table
table_data.append({
"cycle_uid": assignment.get("cycle_uid") or cycle.get("uid", ""),
"assignment_uid": assignment.get("uid", ""),
"document": f"{doc_number}: {doc_title}" if doc_number and doc_title else "Unknown Document",
"document_uid": document_uid,
"status": status,
"status_html": status_html,
"due_date": self._format_date(due_date),
"assignment_type": assignment.get("role", self.participant_name),
"date_assigned": self._format_date(assignment.get("assigned_date") or assignment.get("assigned_at", ""))
})
# Convert to DataFrame
df = pd.DataFrame(table_data)
# Create formatter for status column
formatters = {
'status_html': {'type': 'html'}
}
# Create Tabulator widget
table = pn.widgets.Tabulator(
df,
formatters=formatters,
selectable=True,
show_index=False,
layout='fit_columns',
height=400,
sizing_mode='stretch_width',
columns={
'cycle_uid': {'visible': False},
'assignment_uid': {'visible': False},
'document_uid': {'visible': False},
'status': {'visible': False},
'status_html': {'title': 'Status'},
'document': {'title': 'Document', 'width': 300},
'due_date': {'title': 'Due Date', 'width': 100},
'assignment_type': {'title': 'Role', 'width': 80},
'date_assigned': {'title': 'Assigned', 'width': 100}
}
)
# Add row click handler
table.on_click(self._assignment_selected)
# Add table to container
self.workflow_list_area.append(
pn.pane.Markdown(f"**{len(table_data)} assignments found**")
)
self.workflow_list_area.append(table)
except Exception as e:
logger.error(f"Error creating assignments list: {e}")
self.workflow_list_area.clear()
self.workflow_list_area.append(Markdown(f"# {title}"))
self.workflow_list_area.append(Markdown(f"*Error creating assignments list: {str(e)}*"))
def _assignment_selected(self, event):
"""Handle selection of an assignment from the table"""
try:
# Handle different event types
row_index = None
cycle_uid = None
# Check if this is a CellClickEvent
if hasattr(event, 'row') and event.row is not None:
row_index = event.row
elif hasattr(event, 'new') and event.new is not None:
# Handle Tabulator selection_event
row_index = event.new[0] if len(event.new) > 0 else None
# Exit if we couldn't determine the row
if row_index is None:
return
# Get the data from the table
table = event.obj
if not hasattr(table, 'value') or not hasattr(table.value, 'iloc'):
return
# Get cycle UID from the table
cycle_uid = table.value.iloc[row_index].get('cycle_uid')
# Exit if we couldn't determine the cycle UID
if not cycle_uid:
self.notification_area.object = "Could not determine selected workflow cycle"
return
# Load the selected cycle
self._load_cycle(cycle_uid)
except Exception as e:
logger.error(f"Error selecting assignment: {e}")
self.notification_area.object = f"**Error:** {str(e)}"
def _create_workflow_detail_view(self):
"""Create the workflow detail view - must be implemented by subclasses"""
raise NotImplementedError("Subclasses must implement _create_workflow_detail_view")
def _create_participants_dataframe(self, assignments):
"""Create a DataFrame for the participants table - must be implemented by subclasses"""
raise NotImplementedError("Subclasses must implement _create_participants_dataframe")
def _create_comments_area(self, comments):
"""Create the comments area"""
# Sort comments by timestamp
sorted_comments = sorted(
comments,
key=lambda x: x.get('timestamp', x.get('created_at', '')),
reverse=True
)
if not sorted_comments:
return pn.pane.Markdown("*No comments have been added yet.*")
# Create comments HTML
comments_html = "<div class='p-3'>"
for comment in sorted_comments:
# Get comment data
commenter = comment.get('commenter', {}) or comment.get('approver', {})
commenter_name = commenter.get('name', 'Unknown')
# Get comment text and timestamp
comment_text = comment.get('text', '')
timestamp = comment.get('timestamp', comment.get('created_at', ''))
formatted_date = self._format_date(timestamp)
# Get resolution status
is_resolved = comment.get('is_resolved', False)
requires_resolution = comment.get('requires_resolution', False)
# Create badge for requires_resolution
resolution_badge = ""
if requires_resolution:
badge_color = "#dc3545" if not is_resolved else "#28a745"
badge_text = "Needs Resolution" if not is_resolved else "Resolved"
resolution_badge = f"""
<span style="background: {badge_color}; color: white; padding: 2px 5px;
border-radius: 3px; font-size: 0.8em; margin-left: 5px;">
{badge_text}
</span>
"""
# Create comment card
comments_html += f"""
<div class="comment-card" style="border: 1px solid #e0e0e0; border-radius: 5px;
margin-bottom: 15px; background: #f8f9fa;">
<div class="comment-header" style="padding: 10px; border-bottom: 1px solid #e0e0e0;
background: #f0f0f0; font-weight: bold; display: flex; justify-content: space-between;">
<div>{commenter_name} {resolution_badge}</div>
<div style="font-size: 0.9em; color: #666;">{formatted_date}</div>
</div>
<div class="comment-body" style="padding: 10px; white-space: pre-wrap;">{comment_text}</div>
"""
# Add resolution section if the comment has been resolved
if is_resolved and comment.get('resolution'):
resolution_date = self._format_date(comment.get('resolution_date'))
comments_html += f"""
<div class="resolution" style="background: #e8f4f8; padding: 10px; border-top: 1px dashed #ccc;">
<div><strong>Resolution:</strong> {comment.get('resolution')}</div>
<div style="font-size: 0.9em; color: #666;">Resolved on {resolution_date}</div>
</div>
"""
comments_html += "</div>"
comments_html += "</div>"
return pn.pane.HTML(comments_html)
def _create_submit_workflow_actions(self, my_assignment):
"""Create the workflow submission actions form - must be implemented by subclasses"""
raise NotImplementedError("Subclasses must implement _create_submit_workflow_actions")
def _submit_workflow_action(self, decision, comment, section):
"""Submit a workflow decision and comment - must be implemented by subclasses"""
raise NotImplementedError("Subclasses must implement _submit_workflow_action")
def _view_document(self, event=None):
"""Navigate to document view"""
if not self.document_uid:
self.notification_area.object = "Document not found"
return
# Use panel state to navigate to document page
return pn.state.execute(f"window.location.href = '/document/{self.document_uid}'")
def _format_date(self, date_str):
"""Format a date string"""
if not date_str:
return "-"
try:
# Convert to datetime if it's a string
if isinstance(date_str, str):
dt = datetime.fromisoformat(date_str.replace('Z', '+00:00'))
else:
dt = self._convert_neo4j_datetime(date_str)
if dt:
return dt.strftime("%Y-%m-%d")
return "-"
except Exception:
return str(date_str)
def get_main_content(self):
"""Return just the main content for embedding in other panels"""
# Ensure notification area is included with main content
container = pn.Column(
self.notification_area,
self.main_content,
sizing_mode='stretch_width'
)
return container
Parameters
| Name | Type | Default | Kind |
|---|---|---|---|
bases |
- | - |
Parameter Details
bases: Parameter of type
Return Value
Returns unspecified type
Class Interface
Methods
__init__(self, template, session_manager, parent_app, embedded, workflow_type)
Purpose: Initialize a workflow panel Args: template: Panel template for displaying the UI session_manager: Authentication session manager parent_app: Parent application reference for navigation embedded: Whether this panel is embedded in another UI workflow_type: Type of workflow ('REVIEW' or 'APPROVAL')
Parameters:
template: Parametersession_manager: Parameterparent_app: Parameterembedded: Parameterworkflow_type: Parameter
Returns: None
_setup_ui(self)
Purpose: Set up the basic UI structure based on embedded mode
Returns: None
_get_current_user(self) -> Optional['DocUser']
Purpose: Get the current user from session or parent app
Returns: Returns Optional['DocUser']
set_user(self, user)
Purpose: Set the current user - needed for compatibility with main app
Parameters:
user: Parameter
Returns: None
_setup_header(self)
Purpose: Set up the header with title and actions
Returns: None
_setup_sidebar(self)
Purpose: Set up the sidebar with navigation options
Returns: None
_refresh_current_view(self, event)
Purpose: Refresh the current view
Parameters:
event: Parameter
Returns: None
_navigate_back(self, event)
Purpose: Navigate back to dashboard
Parameters:
event: Parameter
Returns: None
_update_workflow_statistics(self)
Purpose: Update the workflow statistics display.
Returns: None
_load_pending_assignments(self, event)
Purpose: Load pending assignments for the current user
Parameters:
event: Parameter
Returns: None
_load_completed_assignments(self, event)
Purpose: Load completed assignments for the current user
Parameters:
event: Parameter
Returns: None
_load_all_workflows(self, event)
Purpose: Load all workflow cycles (admin view)
Parameters:
event: Parameter
Returns: None
_load_filtered_workflows(self, status_filter, date_from, date_to, workflow_type, doc_type)
Purpose: Load workflows based on filters
Parameters:
status_filter: Parameterdate_from: Parameterdate_to: Parameterworkflow_type: Parameterdoc_type: Parameter
Returns: None
_create_workflows_table(self, cycles)
Purpose: Create a Tabulator table for workflow cycles
Parameters:
cycles: Parameter
Returns: None
_format_status_html(self, status)
Purpose: Format workflow status as HTML with color-coding
Parameters:
status: Parameter
Returns: None
_convert_neo4j_datetime(self, dt_value)
Purpose: Convert Neo4j DateTime to Python datetime
Parameters:
dt_value: Parameter
Returns: None
_workflow_selected(self, event)
Purpose: Handle workflow selection from table
Parameters:
event: Parameter
Returns: None
_load_cycle(self, cycle_uid)
Purpose: Load and display a specific workflow cycle
Parameters:
cycle_uid: Parameter
Returns: None
_create_assignment_list(self, title, assignments)
Purpose: Create a list of user assignments
Parameters:
title: Parameterassignments: Parameter
Returns: None
_assignment_selected(self, event)
Purpose: Handle selection of an assignment from the table
Parameters:
event: Parameter
Returns: None
_create_workflow_detail_view(self)
Purpose: Create the workflow detail view - must be implemented by subclasses
Returns: None
_create_participants_dataframe(self, assignments)
Purpose: Create a DataFrame for the participants table - must be implemented by subclasses
Parameters:
assignments: Parameter
Returns: None
_create_comments_area(self, comments)
Purpose: Create the comments area
Parameters:
comments: Parameter
Returns: None
_create_submit_workflow_actions(self, my_assignment)
Purpose: Create the workflow submission actions form - must be implemented by subclasses
Parameters:
my_assignment: Parameter
Returns: None
_submit_workflow_action(self, decision, comment, section)
Purpose: Submit a workflow decision and comment - must be implemented by subclasses
Parameters:
decision: Parametercomment: Parametersection: Parameter
Returns: None
_view_document(self, event)
Purpose: Navigate to document view
Parameters:
event: Parameter
Returns: None
_format_date(self, date_str)
Purpose: Format a date string
Parameters:
date_str: Parameter
Returns: None
get_main_content(self)
Purpose: Return just the main content for embedding in other panels
Returns: See docstring for return details
Required Imports
import logging
import uuid
import types
import traceback
from typing import Dict
Usage Example
# Example usage:
# result = WorkflowPanelBase(bases)
Tags
Similar Components
AI-powered semantic similarity - components with related functionality:
-
class ApprovalPanel 79.7% similar
-
class ReviewPanel 76.8% similar
-
function create_workflow_panel 74.0% similar
-
class WorkflowControllerBase 73.6% similar
-
class WorkflowCycleBase 68.7% similar