function process_chat_background
Processes chat requests asynchronously in a background thread, managing RAG engine interactions, progress updates, and session state for various query modes including basic, extensive, full_reading, and deep_reflection.
/tf/active/vicechatdev/docchat/app.py
517 - 628
complex
Purpose
This function serves as the main background worker for handling chat queries in a RAG (Retrieval-Augmented Generation) system. It configures the RAG engine with user-specified parameters, processes queries through different search modes, manages progress callbacks, handles document filtering and keyword searches, and stores results in session state. It's designed to run asynchronously to prevent blocking the main application thread during potentially long-running document retrieval and LLM generation operations.
Source Code
def process_chat_background(task_id, query, mode, model, chat_history, session_id, config_opts, source_filters, manual_keywords=None):
"""Process chat request in background thread with detailed progress updates"""
try:
logger.info(f"🚀 Starting background task {task_id} for mode: {mode}")
logger.info(f"📁 SOURCE FILTERS RECEIVED: {source_filters}")
logger.info(f"📁 SOURCE FILTERS TYPE: {type(source_filters)}")
logger.info(f"📁 SOURCE FILTERS LENGTH: {len(source_filters) if source_filters else 0}")
logger.info(f"🔍 MANUAL KEYWORDS: {manual_keywords}")
update_task_progress(task_id, "Initializing chat engine...")
if not rag_engine:
fail_task(task_id, "RAG engine not available")
return
# Configure RAG engine
update_task_progress(task_id, "Configuring search parameters...")
try:
rag_engine.set_model(model)
logger.info(f"Background task: Using model {model}")
except Exception as e:
logger.warning(f"Background task: Failed to set model: {e}")
# Set web search option
if hasattr(rag_engine, 'enable_web_search'):
rag_engine.enable_web_search = config_opts.get('enable_web_search', False)
logger.info(f"Background task: Web search = {rag_engine.enable_web_search}")
# Set memory option
if hasattr(rag_engine, 'enable_memory'):
rag_engine.enable_memory = config_opts.get('enable_memory', True)
logger.info(f"Background task: Memory = {rag_engine.enable_memory}")
# Progress updates based on mode
if mode == 'basic':
update_task_progress(task_id, "Expanding query variants...")
elif mode == 'extensive':
update_task_progress(task_id, "Retrieving relevant documents (extensive search)...")
elif mode == 'full_reading':
update_task_progress(task_id, "Reading all documents...")
elif mode == 'deep_reflection':
base_mode = config_opts.get('base_mode', 'extensive')
logger.info(f"🔧 Deep Reflection Config - iterations: {config_opts.get('reflection_iterations', 2)}, base_mode: {base_mode}, override_sources: {config_opts.get('override_sources', False)}")
update_task_progress(task_id, f"Starting deep reflection ({config_opts.get('reflection_iterations', 2) + 1} iterations, base={base_mode})...")
else:
update_task_progress(task_id, "Searching documents and generating response...")
# Process query with RAG engine
result = rag_engine.chat(
query=query,
mode=mode,
chat_history=chat_history,
progress_callback=lambda msg: update_task_progress(task_id, msg),
source_filters=source_filters,
manual_keywords=manual_keywords,
enable_web_search=config_opts.get('enable_web_search', False),
enable_memory=config_opts.get('enable_memory', True),
enable_reference_filtering=config_opts.get('enable_reference_filtering', True),
reference_threshold=config_opts.get('reference_threshold', 0.3),
custom_instructions=config_opts.get('custom_instructions', ''),
output_language=config_opts.get('output_language', 'en'),
top_k=config_opts.get('top_k', config.EXTENSIVE_MODE_TOP_K if mode == 'extensive' else config.DEFAULT_TOP_K),
use_reranking=config_opts.get('use_reranking', True),
skip_extraction=config_opts.get('skip_extraction', False),
reflection_iterations=config_opts.get('reflection_iterations', 2),
base_mode=config_opts.get('base_mode', 'extensive'),
override_sources=config_opts.get('override_sources', False)
)
update_task_progress(task_id, "Formatting response...")
# Extract answer and references from result
answer = result.get('response', '')
references = result.get('references', [])
sources = [ref.get('file_name', 'Unknown') for ref in references] if references else []
# Prepare metadata
metadata = {
'mode': mode,
'model': model
}
if result.get('num_chunks'):
metadata['num_chunks'] = result.get('num_chunks')
if result.get('num_documents'):
metadata['num_documents'] = result.get('num_documents')
if result.get('num_relevant_documents'):
metadata['num_relevant_documents'] = result.get('num_relevant_documents')
# Add assistant message to session with metadata and references
add_message_to_session(session_id, 'assistant', answer, metadata=metadata, references=references)
# Complete task with result
complete_task(task_id, {
'answer': answer,
'sources': sources,
'references': references,
'metadata': metadata,
'mode': mode,
'model': model
})
logger.info(f"✅ Background task {task_id} completed successfully")
except Exception as e:
logger.error(f"❌ Background task {task_id} failed: {e}")
try:
import traceback
logger.error(f"Traceback: {traceback.format_exc()}")
except:
logger.error(f"Could not format traceback")
fail_task(task_id, str(e))
Parameters
| Name | Type | Default | Kind |
|---|---|---|---|
task_id |
- | - | positional_or_keyword |
query |
- | - | positional_or_keyword |
mode |
- | - | positional_or_keyword |
model |
- | - | positional_or_keyword |
chat_history |
- | - | positional_or_keyword |
session_id |
- | - | positional_or_keyword |
config_opts |
- | - | positional_or_keyword |
source_filters |
- | - | positional_or_keyword |
manual_keywords |
- | None | positional_or_keyword |
Parameter Details
task_id: Unique identifier for tracking this background task's progress and completion status. Used to update task state via update_task_progress, complete_task, and fail_task functions.
query: The user's natural language question or prompt to be processed by the RAG system. This is the main input that drives document retrieval and response generation.
mode: Search/processing mode that determines retrieval strategy. Valid values: 'basic' (simple query expansion), 'extensive' (comprehensive document search), 'full_reading' (reads all documents), 'deep_reflection' (iterative refinement with multiple passes).
model: The LLM model identifier to use for generation (e.g., 'gpt-4', 'gpt-3.5-turbo'). Passed to rag_engine.set_model().
chat_history: List of previous conversation messages for context-aware responses. Format depends on RAG engine implementation, typically list of dicts with 'role' and 'content' keys.
session_id: Unique session identifier for storing conversation history and associating messages with a specific user session.
config_opts: Dictionary of configuration options including: 'enable_web_search' (bool), 'enable_memory' (bool), 'enable_reference_filtering' (bool), 'reference_threshold' (float), 'custom_instructions' (str), 'output_language' (str), 'top_k' (int), 'use_reranking' (bool), 'skip_extraction' (bool), 'reflection_iterations' (int), 'base_mode' (str), 'override_sources' (bool).
source_filters: List or collection of source document identifiers to restrict search scope. Only documents matching these filters will be considered during retrieval. Can be None for no filtering.
manual_keywords: Optional list of specific keywords to use for document retrieval, overriding automatic keyword extraction. Defaults to None for automatic keyword generation.
Return Value
This function does not return a value (returns None implicitly). Instead, it communicates results through side effects: updates task progress via update_task_progress(), stores the final result via complete_task() with a dict containing 'answer', 'sources', 'references', 'metadata', 'mode', and 'model' keys, adds messages to session via add_message_to_session(), or signals failure via fail_task() if an exception occurs.
Dependencies
flasklogginguuidpathlibthreadingwerkzeugconfigrag_enginedocument_indexerauth.azure_authpython-docxreportlabtraceback
Required Imports
import logging
import config
from rag_engine import DocChatRAG
Conditional/Optional Imports
These imports are only needed under specific conditions:
import traceback
Condition: only used in exception handling to format detailed error traces
OptionalUsage Example
import threading
import logging
from rag_engine import DocChatRAG
import config
# Initialize logger and RAG engine
logger = logging.getLogger(__name__)
rag_engine = DocChatRAG()
# Define required helper functions
def update_task_progress(task_id, message):
print(f"Task {task_id}: {message}")
def complete_task(task_id, result):
print(f"Task {task_id} completed: {result}")
def fail_task(task_id, error):
print(f"Task {task_id} failed: {error}")
def add_message_to_session(session_id, role, content, metadata=None, references=None):
print(f"Session {session_id}: {role} - {content[:50]}...")
# Run background task
task_id = "task_123"
query = "What are the key findings in the research papers?"
mode = "extensive"
model = "gpt-4"
chat_history = [{"role": "user", "content": "Hello"}]
session_id = "session_456"
config_opts = {
"enable_web_search": False,
"enable_memory": True,
"top_k": 10,
"use_reranking": True
}
source_filters = ["research_paper_1.pdf", "research_paper_2.pdf"]
# Execute in background thread
thread = threading.Thread(
target=process_chat_background,
args=(task_id, query, mode, model, chat_history, session_id, config_opts, source_filters, None)
)
thread.start()
Best Practices
- Always run this function in a separate thread to avoid blocking the main application
- Ensure all required global functions (update_task_progress, complete_task, fail_task, add_message_to_session) are properly defined before calling
- Verify that rag_engine is initialized and not None before starting background tasks
- Use unique task_id values to prevent conflicts when tracking multiple concurrent tasks
- Handle the case where rag_engine.set_model() may fail gracefully (function logs warning but continues)
- Provide appropriate config_opts defaults to ensure backward compatibility if new options are added
- Monitor logs for detailed execution flow, especially for debugging mode-specific behavior
- Consider implementing task timeout mechanisms externally since this function doesn't have built-in timeout handling
- Ensure source_filters format matches what rag_engine.chat() expects (function logs type and length for debugging)
- For deep_reflection mode, carefully configure reflection_iterations, base_mode, and override_sources to control computational cost
Tags
Similar Components
AI-powered semantic similarity - components with related functionality:
-
function process_full_reading_background 82.1% similar
-
function api_chat 76.4% similar
-
function process_chat_request_background 73.9% similar
-
function chat 72.9% similar
-
function basic_rag_example 70.9% similar