🔍 Code Extractor

function process_full_reading_background

Maturity: 48

Asynchronous background task processor that executes a full reading mode RAG (Retrieval-Augmented Generation) query, tracks progress, and stores results in session history.

File:
/tf/active/vicechatdev/docchat/app.py
Lines:
1038 - 1072
Complexity:
moderate

Purpose

This function is designed to run as a background thread/task to process computationally intensive full reading mode queries without blocking the main application. It interfaces with a RAG engine to retrieve and process documents, provides real-time progress updates via callbacks, handles error scenarios, and persists conversation history with metadata and references to a session store.

Source Code

def process_full_reading_background(task_id, query, chat_history, session_id):
    """Process full reading mode in background"""
    try:
        def progress_callback(message):
            update_task_progress(task_id, message)
        
        result = rag_engine.chat(
            query=query,
            mode='full_reading',
            chat_history=chat_history,
            progress_callback=progress_callback
        )
        
        # Prepare metadata
        metadata = {
            'mode': 'full_reading',
            'num_documents': result.get('num_documents'),
            'num_relevant_documents': result.get('num_relevant_documents')
        }
        references = result.get('references', [])
        
        # Add to session history with metadata and references
        add_message_to_session(session_id, 'assistant', result['response'], metadata=metadata, references=references)
        
        complete_task(task_id, {
            'response': result['response'],
            'mode': result['mode'],
            'context': result.get('context', []),
            'references': references,
            'metadata': metadata
        })
        
    except Exception as e:
        logger.error(f"Background task error: {e}")
        fail_task(task_id, str(e))

Parameters

Name Type Default Kind
task_id - - positional_or_keyword
query - - positional_or_keyword
chat_history - - positional_or_keyword
session_id - - positional_or_keyword

Parameter Details

task_id: Unique identifier for the background task, used to track and update task progress and completion status. Expected to be a string or UUID that can be used with update_task_progress, complete_task, and fail_task functions.

query: The user's question or prompt string that will be processed by the RAG engine in full reading mode. Should be a non-empty string containing the natural language query.

chat_history: List or array of previous conversation messages to provide context for the current query. Expected format depends on rag_engine.chat requirements, typically a list of dictionaries with 'role' and 'content' keys.

session_id: Unique identifier for the user session, used to associate the assistant's response with the correct conversation thread in persistent storage. Should be a string or UUID.

Return Value

This function does not return a value (implicit None). Instead, it produces side effects: (1) Updates task progress via update_task_progress callback, (2) Adds assistant message to session via add_message_to_session, (3) Marks task as complete via complete_task with result dictionary containing 'response', 'mode', 'context', 'references', and 'metadata' keys, or (4) Marks task as failed via fail_task with error message string if an exception occurs.

Dependencies

  • flask
  • logging
  • threading
  • uuid
  • pathlib
  • datetime
  • werkzeug
  • functools
  • json
  • os
  • time
  • python-docx
  • reportlab
  • traceback

Required Imports

import logging
from rag_engine import DocChatRAG
import config

Usage Example

import threading
import logging
from rag_engine import DocChatRAG

# Setup required globals
logger = logging.getLogger(__name__)
rag_engine = DocChatRAG()
task_store = {}
session_store = {}

def update_task_progress(task_id, message):
    task_store[task_id]['progress'] = message

def complete_task(task_id, result):
    task_store[task_id]['status'] = 'completed'
    task_store[task_id]['result'] = result

def fail_task(task_id, error):
    task_store[task_id]['status'] = 'failed'
    task_store[task_id]['error'] = error

def add_message_to_session(session_id, role, content, metadata=None, references=None):
    if session_id not in session_store:
        session_store[session_id] = []
    session_store[session_id].append({
        'role': role,
        'content': content,
        'metadata': metadata,
        'references': references
    })

# Execute background task
task_id = 'task_123'
task_store[task_id] = {'status': 'running'}

thread = threading.Thread(
    target=process_full_reading_background,
    args=(task_id, 'What are the key findings?', [], 'session_456')
)
thread.start()

# Check task status
thread.join()
print(task_store[task_id]['status'])
print(task_store[task_id].get('result', {}).get('response'))

Best Practices

  • Always run this function in a separate thread or background worker to avoid blocking the main application thread
  • Ensure all required global functions (update_task_progress, complete_task, fail_task, add_message_to_session) are properly defined before calling
  • The rag_engine instance must be thread-safe or properly synchronized if shared across multiple background tasks
  • Implement proper task_id generation and tracking mechanism to avoid collisions in concurrent scenarios
  • Consider adding timeout mechanisms to prevent indefinitely running tasks
  • Log exceptions with full stack traces for debugging (currently uses logger.error)
  • Validate that session_id exists before processing to avoid orphaned results
  • The progress_callback mechanism allows for real-time UI updates in web applications
  • Ensure the task_store or equivalent mechanism is thread-safe when accessed from multiple threads

Similar Components

AI-powered semantic similarity - components with related functionality:

  • function process_chat_background 82.1% similar

    Processes chat requests asynchronously in a background thread, managing RAG engine interactions, progress updates, and session state for various query modes including basic, extensive, full_reading, and deep_reflection.

    From: /tf/active/vicechatdev/docchat/app.py
  • function full_reading_example 78.8% similar

    Demonstrates the full reading mode of a RAG (Retrieval-Augmented Generation) system by processing all documents to answer a comprehensive query about key findings.

    From: /tf/active/vicechatdev/docchat/example_usage.py
  • class DocChatRAG 66.5% similar

    Main RAG engine with three operating modes: 1. Basic RAG (similarity search) 2. Extensive (full document retrieval with preprocessing) 3. Full Reading (process all documents)

    From: /tf/active/vicechatdev/docchat/rag_engine.py
  • function basic_rag_example 62.6% similar

    Demonstrates a basic RAG (Retrieval-Augmented Generation) workflow by initializing a DocChatRAG engine, executing a sample query about document topics, and displaying the response with metadata.

    From: /tf/active/vicechatdev/docchat/example_usage.py
  • function init_engines 60.2% similar

    Initializes the RAG (Retrieval-Augmented Generation) engine and document indexer components, loads persisted sessions, and optionally starts background auto-indexing of documents.

    From: /tf/active/vicechatdev/docchat/app.py
← Back to Browse