function fail_task
Marks a task as failed by updating its status, recording the error message, and setting the completion timestamp in a thread-safe manner.
/tf/active/vicechatdev/vice_ai/app.py
68 - 74
simple
Purpose
This function is used in a task management system to handle task failures. It updates the task's metadata in the active_tasks dictionary to reflect a failed state, stores the error information for debugging/logging purposes, and records when the failure occurred. The function uses a lock to ensure thread-safe access to the shared task dictionary, making it suitable for concurrent/multi-threaded environments.
Source Code
def fail_task(task_id, error):
"""Mark task as failed with error"""
with task_lock:
if task_id in active_tasks:
active_tasks[task_id]['status'] = 'failed'
active_tasks[task_id]['error'] = str(error)
active_tasks[task_id]['completed_at'] = datetime.now()
Parameters
| Name | Type | Default | Kind |
|---|---|---|---|
task_id |
- | - | positional_or_keyword |
error |
- | - | positional_or_keyword |
Parameter Details
task_id: Unique identifier for the task to be marked as failed. Expected to be a key that exists in the active_tasks dictionary. Typically a string (UUID) or integer that was assigned when the task was created.
error: The error object or message describing why the task failed. Can be an Exception object, error string, or any object that can be converted to a string via str(). This will be stored in the task's metadata for later retrieval and debugging.
Return Value
This function does not return any value (implicitly returns None). It performs an in-place update of the active_tasks dictionary.
Dependencies
datetimethreading
Required Imports
from datetime import datetime
from threading import Lock
Usage Example
from datetime import datetime
from threading import Lock
# Required global setup
active_tasks = {}
task_lock = Lock()
# Create a sample task
task_id = 'task_123'
active_tasks[task_id] = {
'status': 'running',
'created_at': datetime.now(),
'error': None,
'completed_at': None
}
# Mark the task as failed
try:
# Some operation that might fail
raise ValueError('Database connection failed')
except Exception as e:
fail_task(task_id, e)
# Check the updated task status
print(active_tasks[task_id]['status']) # Output: 'failed'
print(active_tasks[task_id]['error']) # Output: 'Database connection failed'
print(active_tasks[task_id]['completed_at']) # Output: datetime object
Best Practices
- Always ensure that task_lock and active_tasks are properly initialized as global variables before calling this function
- Verify that the task_id exists in active_tasks before calling to avoid KeyError, or add error handling within the function
- Use this function within try-except blocks when handling task failures to ensure proper error propagation
- Consider adding logging statements to track task failures for monitoring and debugging purposes
- The function silently does nothing if task_id is not in active_tasks; consider adding validation or logging for this case
- Ensure that the error parameter is serializable if the task data will be persisted or transmitted over a network
- This function is thread-safe due to the lock, making it suitable for use in multi-threaded Flask applications or background task processors
Tags
Similar Components
AI-powered semantic similarity - components with related functionality:
-
function fail_task_v1 98.5% similar
-
function complete_task_v1 76.6% similar
-
function complete_task 76.0% similar
-
function update_task_progress 66.6% similar
-
function create_task 65.0% similar