🔍 Code Extractor

function process_chat_background

Maturity: 51

Processes chat requests asynchronously in a background thread, managing RAG engine interactions, progress updates, and session state for various query modes including basic, extensive, full_reading, and deep_reflection.

File:
/tf/active/vicechatdev/docchat/app.py
Lines:
517 - 628
Complexity:
complex

Purpose

This function serves as the main background worker for handling chat queries in a RAG (Retrieval-Augmented Generation) system. It configures the RAG engine with user-specified parameters, processes queries through different search modes, manages progress callbacks, handles document filtering and keyword searches, and stores results in session state. It's designed to run asynchronously to prevent blocking the main application thread during potentially long-running document retrieval and LLM generation operations.

Source Code

def process_chat_background(task_id, query, mode, model, chat_history, session_id, config_opts, source_filters, manual_keywords=None):
    """Process chat request in background thread with detailed progress updates"""
    try:
        logger.info(f"🚀 Starting background task {task_id} for mode: {mode}")
        logger.info(f"📁 SOURCE FILTERS RECEIVED: {source_filters}")
        logger.info(f"📁 SOURCE FILTERS TYPE: {type(source_filters)}")
        logger.info(f"📁 SOURCE FILTERS LENGTH: {len(source_filters) if source_filters else 0}")
        logger.info(f"🔍 MANUAL KEYWORDS: {manual_keywords}")
        
        update_task_progress(task_id, "Initializing chat engine...")
        
        if not rag_engine:
            fail_task(task_id, "RAG engine not available")
            return
        
        # Configure RAG engine
        update_task_progress(task_id, "Configuring search parameters...")
        
        try:
            rag_engine.set_model(model)
            logger.info(f"Background task: Using model {model}")
        except Exception as e:
            logger.warning(f"Background task: Failed to set model: {e}")
        
        # Set web search option
        if hasattr(rag_engine, 'enable_web_search'):
            rag_engine.enable_web_search = config_opts.get('enable_web_search', False)
            logger.info(f"Background task: Web search = {rag_engine.enable_web_search}")
        
        # Set memory option
        if hasattr(rag_engine, 'enable_memory'):
            rag_engine.enable_memory = config_opts.get('enable_memory', True)
            logger.info(f"Background task: Memory = {rag_engine.enable_memory}")
        
        # Progress updates based on mode
        if mode == 'basic':
            update_task_progress(task_id, "Expanding query variants...")
        elif mode == 'extensive':
            update_task_progress(task_id, "Retrieving relevant documents (extensive search)...")
        elif mode == 'full_reading':
            update_task_progress(task_id, "Reading all documents...")
        elif mode == 'deep_reflection':
            base_mode = config_opts.get('base_mode', 'extensive')
            logger.info(f"🔧 Deep Reflection Config - iterations: {config_opts.get('reflection_iterations', 2)}, base_mode: {base_mode}, override_sources: {config_opts.get('override_sources', False)}")
            update_task_progress(task_id, f"Starting deep reflection ({config_opts.get('reflection_iterations', 2) + 1} iterations, base={base_mode})...")
        else:
            update_task_progress(task_id, "Searching documents and generating response...")
        
        # Process query with RAG engine
        result = rag_engine.chat(
            query=query,
            mode=mode,
            chat_history=chat_history,
            progress_callback=lambda msg: update_task_progress(task_id, msg),
            source_filters=source_filters,
            manual_keywords=manual_keywords,
            enable_web_search=config_opts.get('enable_web_search', False),
            enable_memory=config_opts.get('enable_memory', True),
            enable_reference_filtering=config_opts.get('enable_reference_filtering', True),
            reference_threshold=config_opts.get('reference_threshold', 0.3),
            custom_instructions=config_opts.get('custom_instructions', ''),
            output_language=config_opts.get('output_language', 'en'),
            top_k=config_opts.get('top_k', config.EXTENSIVE_MODE_TOP_K if mode == 'extensive' else config.DEFAULT_TOP_K),
            use_reranking=config_opts.get('use_reranking', True),
            skip_extraction=config_opts.get('skip_extraction', False),
            reflection_iterations=config_opts.get('reflection_iterations', 2),
            base_mode=config_opts.get('base_mode', 'extensive'),
            override_sources=config_opts.get('override_sources', False)
        )
        
        update_task_progress(task_id, "Formatting response...")
        
        # Extract answer and references from result
        answer = result.get('response', '')
        references = result.get('references', [])
        sources = [ref.get('file_name', 'Unknown') for ref in references] if references else []
        
        # Prepare metadata
        metadata = {
            'mode': mode,
            'model': model
        }
        if result.get('num_chunks'):
            metadata['num_chunks'] = result.get('num_chunks')
        if result.get('num_documents'):
            metadata['num_documents'] = result.get('num_documents')
        if result.get('num_relevant_documents'):
            metadata['num_relevant_documents'] = result.get('num_relevant_documents')
        
        # Add assistant message to session with metadata and references
        add_message_to_session(session_id, 'assistant', answer, metadata=metadata, references=references)
        
        # Complete task with result
        complete_task(task_id, {
            'answer': answer,
            'sources': sources,
            'references': references,
            'metadata': metadata,
            'mode': mode,
            'model': model
        })
        
        logger.info(f"✅ Background task {task_id} completed successfully")
        
    except Exception as e:
        logger.error(f"❌ Background task {task_id} failed: {e}")
        try:
            import traceback
            logger.error(f"Traceback: {traceback.format_exc()}")
        except:
            logger.error(f"Could not format traceback")
        fail_task(task_id, str(e))

Parameters

Name Type Default Kind
task_id - - positional_or_keyword
query - - positional_or_keyword
mode - - positional_or_keyword
model - - positional_or_keyword
chat_history - - positional_or_keyword
session_id - - positional_or_keyword
config_opts - - positional_or_keyword
source_filters - - positional_or_keyword
manual_keywords - None positional_or_keyword

Parameter Details

task_id: Unique identifier for tracking this background task's progress and completion status. Used to update task state via update_task_progress, complete_task, and fail_task functions.

query: The user's natural language question or prompt to be processed by the RAG system. This is the main input that drives document retrieval and response generation.

mode: Search/processing mode that determines retrieval strategy. Valid values: 'basic' (simple query expansion), 'extensive' (comprehensive document search), 'full_reading' (reads all documents), 'deep_reflection' (iterative refinement with multiple passes).

model: The LLM model identifier to use for generation (e.g., 'gpt-4', 'gpt-3.5-turbo'). Passed to rag_engine.set_model().

chat_history: List of previous conversation messages for context-aware responses. Format depends on RAG engine implementation, typically list of dicts with 'role' and 'content' keys.

session_id: Unique session identifier for storing conversation history and associating messages with a specific user session.

config_opts: Dictionary of configuration options including: 'enable_web_search' (bool), 'enable_memory' (bool), 'enable_reference_filtering' (bool), 'reference_threshold' (float), 'custom_instructions' (str), 'output_language' (str), 'top_k' (int), 'use_reranking' (bool), 'skip_extraction' (bool), 'reflection_iterations' (int), 'base_mode' (str), 'override_sources' (bool).

source_filters: List or collection of source document identifiers to restrict search scope. Only documents matching these filters will be considered during retrieval. Can be None for no filtering.

manual_keywords: Optional list of specific keywords to use for document retrieval, overriding automatic keyword extraction. Defaults to None for automatic keyword generation.

Return Value

This function does not return a value (returns None implicitly). Instead, it communicates results through side effects: updates task progress via update_task_progress(), stores the final result via complete_task() with a dict containing 'answer', 'sources', 'references', 'metadata', 'mode', and 'model' keys, adds messages to session via add_message_to_session(), or signals failure via fail_task() if an exception occurs.

Dependencies

  • flask
  • logging
  • uuid
  • pathlib
  • threading
  • werkzeug
  • config
  • rag_engine
  • document_indexer
  • auth.azure_auth
  • python-docx
  • reportlab
  • traceback

Required Imports

import logging
import config
from rag_engine import DocChatRAG

Conditional/Optional Imports

These imports are only needed under specific conditions:

import traceback

Condition: only used in exception handling to format detailed error traces

Optional

Usage Example

import threading
import logging
from rag_engine import DocChatRAG
import config

# Initialize logger and RAG engine
logger = logging.getLogger(__name__)
rag_engine = DocChatRAG()

# Define required helper functions
def update_task_progress(task_id, message):
    print(f"Task {task_id}: {message}")

def complete_task(task_id, result):
    print(f"Task {task_id} completed: {result}")

def fail_task(task_id, error):
    print(f"Task {task_id} failed: {error}")

def add_message_to_session(session_id, role, content, metadata=None, references=None):
    print(f"Session {session_id}: {role} - {content[:50]}...")

# Run background task
task_id = "task_123"
query = "What are the key findings in the research papers?"
mode = "extensive"
model = "gpt-4"
chat_history = [{"role": "user", "content": "Hello"}]
session_id = "session_456"
config_opts = {
    "enable_web_search": False,
    "enable_memory": True,
    "top_k": 10,
    "use_reranking": True
}
source_filters = ["research_paper_1.pdf", "research_paper_2.pdf"]

# Execute in background thread
thread = threading.Thread(
    target=process_chat_background,
    args=(task_id, query, mode, model, chat_history, session_id, config_opts, source_filters, None)
)
thread.start()

Best Practices

  • Always run this function in a separate thread to avoid blocking the main application
  • Ensure all required global functions (update_task_progress, complete_task, fail_task, add_message_to_session) are properly defined before calling
  • Verify that rag_engine is initialized and not None before starting background tasks
  • Use unique task_id values to prevent conflicts when tracking multiple concurrent tasks
  • Handle the case where rag_engine.set_model() may fail gracefully (function logs warning but continues)
  • Provide appropriate config_opts defaults to ensure backward compatibility if new options are added
  • Monitor logs for detailed execution flow, especially for debugging mode-specific behavior
  • Consider implementing task timeout mechanisms externally since this function doesn't have built-in timeout handling
  • Ensure source_filters format matches what rag_engine.chat() expects (function logs type and length for debugging)
  • For deep_reflection mode, carefully configure reflection_iterations, base_mode, and override_sources to control computational cost

Similar Components

AI-powered semantic similarity - components with related functionality:

  • function process_full_reading_background 82.1% similar

    Asynchronous background task processor that executes a full reading mode RAG (Retrieval-Augmented Generation) query, tracks progress, and stores results in session history.

    From: /tf/active/vicechatdev/docchat/app.py
  • function api_chat 76.4% similar

    Flask API endpoint that handles chat requests asynchronously, processing user queries through a RAG (Retrieval-Augmented Generation) engine with support for multiple modes, memory, web search, and custom configurations.

    From: /tf/active/vicechatdev/docchat/app.py
  • function process_chat_request_background 73.9% similar

    Process chat request in background thread

    From: /tf/active/vicechatdev/vice_ai/app.py
  • function chat 72.9% similar

    Flask route handler that processes chat requests with RAG (Retrieval-Augmented Generation) capabilities, managing conversation sessions, chat history, and document-based question answering.

    From: /tf/active/vicechatdev/docchat/blueprint.py
  • function basic_rag_example 70.9% similar

    Demonstrates a basic RAG (Retrieval-Augmented Generation) workflow by initializing a DocChatRAG engine, executing a sample query about document topics, and displaying the response with metadata.

    From: /tf/active/vicechatdev/docchat/example_usage.py
← Back to Browse