function process_full_reading_background
Asynchronous background task processor that executes a full reading mode RAG (Retrieval-Augmented Generation) query, tracks progress, and stores results in session history.
/tf/active/vicechatdev/docchat/app.py
1038 - 1072
moderate
Purpose
This function is designed to run as a background thread/task to process computationally intensive full reading mode queries without blocking the main application. It interfaces with a RAG engine to retrieve and process documents, provides real-time progress updates via callbacks, handles error scenarios, and persists conversation history with metadata and references to a session store.
Source Code
def process_full_reading_background(task_id, query, chat_history, session_id):
"""Process full reading mode in background"""
try:
def progress_callback(message):
update_task_progress(task_id, message)
result = rag_engine.chat(
query=query,
mode='full_reading',
chat_history=chat_history,
progress_callback=progress_callback
)
# Prepare metadata
metadata = {
'mode': 'full_reading',
'num_documents': result.get('num_documents'),
'num_relevant_documents': result.get('num_relevant_documents')
}
references = result.get('references', [])
# Add to session history with metadata and references
add_message_to_session(session_id, 'assistant', result['response'], metadata=metadata, references=references)
complete_task(task_id, {
'response': result['response'],
'mode': result['mode'],
'context': result.get('context', []),
'references': references,
'metadata': metadata
})
except Exception as e:
logger.error(f"Background task error: {e}")
fail_task(task_id, str(e))
Parameters
| Name | Type | Default | Kind |
|---|---|---|---|
task_id |
- | - | positional_or_keyword |
query |
- | - | positional_or_keyword |
chat_history |
- | - | positional_or_keyword |
session_id |
- | - | positional_or_keyword |
Parameter Details
task_id: Unique identifier for the background task, used to track and update task progress and completion status. Expected to be a string or UUID that can be used with update_task_progress, complete_task, and fail_task functions.
query: The user's question or prompt string that will be processed by the RAG engine in full reading mode. Should be a non-empty string containing the natural language query.
chat_history: List or array of previous conversation messages to provide context for the current query. Expected format depends on rag_engine.chat requirements, typically a list of dictionaries with 'role' and 'content' keys.
session_id: Unique identifier for the user session, used to associate the assistant's response with the correct conversation thread in persistent storage. Should be a string or UUID.
Return Value
This function does not return a value (implicit None). Instead, it produces side effects: (1) Updates task progress via update_task_progress callback, (2) Adds assistant message to session via add_message_to_session, (3) Marks task as complete via complete_task with result dictionary containing 'response', 'mode', 'context', 'references', and 'metadata' keys, or (4) Marks task as failed via fail_task with error message string if an exception occurs.
Dependencies
flaskloggingthreadinguuidpathlibdatetimewerkzeugfunctoolsjsonostimepython-docxreportlabtraceback
Required Imports
import logging
from rag_engine import DocChatRAG
import config
Usage Example
import threading
import logging
from rag_engine import DocChatRAG
# Setup required globals
logger = logging.getLogger(__name__)
rag_engine = DocChatRAG()
task_store = {}
session_store = {}
def update_task_progress(task_id, message):
task_store[task_id]['progress'] = message
def complete_task(task_id, result):
task_store[task_id]['status'] = 'completed'
task_store[task_id]['result'] = result
def fail_task(task_id, error):
task_store[task_id]['status'] = 'failed'
task_store[task_id]['error'] = error
def add_message_to_session(session_id, role, content, metadata=None, references=None):
if session_id not in session_store:
session_store[session_id] = []
session_store[session_id].append({
'role': role,
'content': content,
'metadata': metadata,
'references': references
})
# Execute background task
task_id = 'task_123'
task_store[task_id] = {'status': 'running'}
thread = threading.Thread(
target=process_full_reading_background,
args=(task_id, 'What are the key findings?', [], 'session_456')
)
thread.start()
# Check task status
thread.join()
print(task_store[task_id]['status'])
print(task_store[task_id].get('result', {}).get('response'))
Best Practices
- Always run this function in a separate thread or background worker to avoid blocking the main application thread
- Ensure all required global functions (update_task_progress, complete_task, fail_task, add_message_to_session) are properly defined before calling
- The rag_engine instance must be thread-safe or properly synchronized if shared across multiple background tasks
- Implement proper task_id generation and tracking mechanism to avoid collisions in concurrent scenarios
- Consider adding timeout mechanisms to prevent indefinitely running tasks
- Log exceptions with full stack traces for debugging (currently uses logger.error)
- Validate that session_id exists before processing to avoid orphaned results
- The progress_callback mechanism allows for real-time UI updates in web applications
- Ensure the task_store or equivalent mechanism is thread-safe when accessed from multiple threads
Tags
Similar Components
AI-powered semantic similarity - components with related functionality:
-
function process_chat_background 82.1% similar
-
function full_reading_example 78.8% similar
-
class DocChatRAG 66.5% similar
-
function basic_rag_example 62.6% similar
-
function init_engines 60.2% similar