🔍 Code Extractor

function process_chat_request_background

Maturity: 25

Process chat request in background thread

File:
/tf/active/vicechatdev/vice_ai/app.py
Lines:
274 - 628
Complexity:
moderate

Purpose

Process chat request in background thread

Source Code

def process_chat_request_background(task_id, message, config, user_info):
    """Process chat request in background thread"""
    try:
        logger.info(f"🚀 Starting background task {task_id}")
        logger.info(f"🔧 Received configuration: {config}")
        update_task_progress(task_id, "Configuring chat engine...")
        
        # Configure chat engine based on user selections
        if hasattr(chat_engine, 'flow_control'):
            # Database search (for internal documents)
            chat_engine.flow_control['enable_search'] = config.get('enable_search', True)
            # Web search (for external web content) - map the frontend's enable_search to web search
            chat_engine.flow_control['enable_web_search'] = config.get('enable_search', True)
            logger.info(f"🔍 Search configuration - Database: {chat_engine.flow_control['enable_search']}, Web: {chat_engine.flow_control['enable_web_search']}")
            
            chat_engine.flow_control['enable_memory'] = config.get('enable_memory', True)
            chat_engine.flow_control['enable_extensive_search'] = config.get('enable_extensive_search', False)
            chat_engine.flow_control['extensive_search_chunks'] = config.get('extensive_chunks', 200)
            chat_engine.flow_control['target_summary_tokens'] = config.get('summary_tokens', 5000)
            chat_engine.flow_control['detail_level'] = config.get('detail_level', 'detailed')
            chat_engine.flow_control['enable_keyword_filtering'] = config.get('enable_keyword_filtering', False)
            chat_engine.flow_control['manual_keywords'] = config.get('manual_keywords', '')
            chat_engine.flow_control['enable_reference_filtering'] = config.get('enable_reference_filtering', True)
            chat_engine.flow_control['relevance_threshold'] = config.get('relevance_threshold', 0.7)

        # Handle chat history for context-aware processing
        if config.get('enable_memory', False) and config.get('chat_history'):
            chat_history = config.get('chat_history', [])
            logger.info(f"📚 Received chat history with {len(chat_history)} messages for context-aware processing")
            
            # Temporarily populate the chat memory with the provided history
            # Clear existing memory first
            if hasattr(chat_engine, 'chat_memory'):
                logger.info(f"🔍 Chat memory before clearing: {len(chat_engine.chat_memory.messages)} messages")
                chat_engine.chat_memory.messages = []
                
                # Add the history messages in pairs (user, assistant)
                added_exchanges = 0
                
                # Group messages into user-assistant pairs
                # Find all user messages and their corresponding assistant responses
                user_messages = [msg for msg in chat_history if msg.get('role') == 'user']
                assistant_messages = [msg for msg in chat_history if msg.get('role') == 'assistant']
                
                logger.info(f"🔍 Found {len(user_messages)} user messages and {len(assistant_messages)} assistant messages")
                
                # Pair them up (assuming they alternate in conversation)
                max_pairs = min(len(user_messages), len(assistant_messages))
                
                for i in range(max_pairs):
                    user_msg = user_messages[i]
                    assistant_msg = assistant_messages[i]
                    
                    if user_msg and assistant_msg:
                        chat_engine.chat_memory.save_context(
                            {"role": "user", "content": user_msg.get('content', '')},
                            {"role": "assistant", "content": assistant_msg.get('content', '')}
                        )
                        added_exchanges += 1
                        logger.info(f"   đŸ’Ŧ Added exchange {added_exchanges}: '{user_msg.get('content', '')[:50]}...' -> '{assistant_msg.get('content', '')[:50]}...'")
                    else:
                        logger.warning(f"   âš ī¸  Missing message in pair {i+1}")
                
                if len(user_messages) != len(assistant_messages):
                    logger.warning(f"   âš ī¸  Uneven number of user ({len(user_messages)}) and assistant ({len(assistant_messages)}) messages")
                
                logger.info(f"✅ Populated chat memory with {added_exchanges} conversation exchanges (total messages: {len(chat_engine.chat_memory.messages)})")
                
                # Verify memory state after population
                if hasattr(chat_engine.chat_memory, 'get_formatted_history'):
                    formatted_history = chat_engine.chat_memory.get_formatted_history()
                    logger.info(f"🔍 Formatted history length: {len(formatted_history)} characters")
                
            else:
                logger.warning("âš ī¸  Chat engine does not have chat_memory attribute")
        elif config.get('enable_memory', False):
            logger.info("📝 Memory enabled but no chat history provided from frontend")

        # Set instruction template if provided
        template = config.get('instruction_template')
        if template and hasattr(chat_engine, 'set_instruction_template'):
            chat_engine.set_instruction_template(template)
        elif template and hasattr(chat_engine, 'current_instruction_template'):
            chat_engine.current_instruction_template = template

        update_task_progress(task_id, "Setting up data sources...")
        
        # Add data sources if specified
        collections = config.get('collections', [])
        
        if hasattr(chat_engine, 'data_handles'):
            # Always clear existing data sources first, regardless of whether new collections are selected
            try:
                chat_engine.data_handles.clear_data()
                logger.info("Background task: Cleared existing data sources")
            except Exception as e:
                logger.warning(f"Background task: Could not clear existing data sources: {e}")
            
            # Add selected collections (only if collections are selected)
            if collections:
                for collection in collections:
                    available_collections = getattr(chat_engine, 'available_collections', [])
                    
                    if collection in available_collections:
                        try:
                            processing_steps = ["similarity", "extend_query"]
                            
                            if config.get('enable_keyword_filtering', False):
                                processing_steps.append("keyword_filter")
                            
                            chat_engine.data_handles.add_data(
                                name=f"Internal data store: {collection}",
                                type="chromaDB",
                                data=collection,
                                filters="",
                                processing_steps=processing_steps,
                                inclusions=10,
                                instructions=""
                            )
                            logger.info(f"Background task: ✅ Added ChromaDB collection: {collection}")
                            
                        except Exception as e:
                            logger.error(f"Background task: ❌ Failed to add collection {collection}: {e}")
            else:
                logger.info("Background task: â„šī¸  No collections selected - data sources cleared")

        # Handle uploaded documents
        uploaded_docs = config.get('uploaded_documents', [])
        if uploaded_docs and hasattr(chat_engine, 'data_handles'):
            logger.info(f"📄 Processing {len(uploaded_docs)} uploaded documents")
            
            for doc_info in uploaded_docs:
                try:
                    # Get document content from storage
                    user_documents = get_user_documents(user_info.get('email', 'unknown'))
                    
                    # Check if required fields exist
                    if 'id' not in doc_info:
                        logger.error(f"Background task: ❌ Document missing 'id' field: {doc_info}")
                        continue
                    
                    doc_id = doc_info['id']
                    doc_type = doc_info.get('type', 'uploaded')
                    
                    # Handle different types of documents
                    if doc_type == 'reference' and doc_id.startswith('ref_') and not doc_id.startswith('ref_block_'):
                        # This is a ChromaDB reference document - skip it since it's already been processed
                        # These come from AI responses that cite ChromaDB collection documents
                        logger.info(f"Background task: â„šī¸  Skipping ChromaDB reference document: {doc_id} (already processed in context)")
                        continue
                    elif doc_type == 'reference_block' or doc_id.startswith('ref_block_'):
                        # This is a reference block - should be processed as uploaded document
                        if doc_id in user_documents:
                            doc_data = user_documents[doc_id]
                            text_content = doc_data['text_content']
                            filename = doc_data['metadata'].get('original_filename', 'Unknown Block')
                            
                            # Add as text data source
                            processing_steps = ["similarity"]
                            if config.get('enable_keyword_filtering', False):
                                processing_steps.append("keyword_filter")
                            
                            chat_engine.data_handles.add_data(
                                name=f"Reference block: {filename}",
                                type="text",
                                data=text_content,
                                filters="",
                                processing_steps=processing_steps,
                                inclusions=5,
                                instructions=f"This content is from the reference block '{filename}'"
                            )
                            logger.info(f"Background task: ✅ Added reference block: {filename}")
                        else:
                            logger.warning(f"Background task: âš ī¸  Reference block {doc_id} not found in user storage")
                    elif doc_type == 'reference' and 'uuid' in doc_info:
                        # This is a Neo4j reference document with UUID - try to retrieve from Neo4j
                        document_uuid = doc_info.get('uuid')
                        if document_uuid and hasattr(chat_engine, 'extensive_search_manager'):
                            try:
                                document_content = chat_engine.extensive_search_manager.get_document_by_uuid(document_uuid)
                                if document_content:
                                    # Add as text data source
                                    processing_steps = ["similarity"]
                                    if config.get('enable_keyword_filtering', False):
                                        processing_steps.append("keyword_filter")
                                    
                                    # Extract filename from content or use UUID
                                    filename = document_content.get('doc_name', f"Document {document_uuid[:8]}")
                                    text_content = document_content.get('content', '')
                                    
                                    chat_engine.data_handles.add_data(
                                        name=f"Reference document: {filename}",
                                        type="text",
                                        data=text_content,
                                        filters="",
                                        processing_steps=processing_steps,
                                        inclusions=5,
                                        instructions=f"This content is from the Neo4j reference document '{filename}' with UUID {document_uuid}"
                                    )
                                    logger.info(f"Background task: ✅ Added Neo4j reference document: {filename} (UUID: {document_uuid})")
                                else:
                                    logger.warning(f"Background task: âš ī¸  Neo4j document {document_uuid} not found or empty")
                            except Exception as e:
                                logger.error(f"Background task: ❌ Failed to retrieve Neo4j document {document_uuid}: {e}")
                        else:
                            logger.warning(f"Background task: âš ī¸  Neo4j reference document missing UUID or extensive_search_manager not available")
                    elif doc_id in user_documents:
                        # This is a regular uploaded document
                        doc_data = user_documents[doc_id]
                        text_content = doc_data['text_content']
                        filename = doc_data['metadata'].get('original_filename', 'Unknown')
                        
                        # Add as text data source
                        processing_steps = ["similarity"]
                        if config.get('enable_keyword_filtering', False):
                            processing_steps.append("keyword_filter")
                        
                        chat_engine.data_handles.add_data(
                            name=f"Uploaded document: {filename}",
                            type="text",
                            data=text_content,
                            filters="",
                            processing_steps=processing_steps,
                            inclusions=5,  # Fewer inclusions for uploaded docs
                            instructions=f"This content is from the uploaded document '{filename}'"
                        )
                        logger.info(f"Background task: ✅ Added uploaded document: {filename}")
                    else:
                        logger.warning(f"Background task: âš ī¸  Document {doc_id} not found in user storage")
                except Exception as e:
                    logger.error(f"Background task: ❌ Failed to add uploaded document {doc_info.get('name', 'unknown')}: {e}")
                    logger.error(f"Background task:    Document info: {doc_info}")
                    import traceback
                    logger.error(f"Background task:    Traceback: {traceback.format_exc()}")

        update_task_progress(task_id, "Generating response... This may take several minutes for extensive searches.")
        
        # Generate response using chat engine
        if hasattr(chat_engine, 'response_callback'):
            raw_response = chat_engine.response_callback(message)
            
            # Handle different response types
            if hasattr(raw_response, '_repr_markdown_'):
                try:
                    response = raw_response._repr_markdown_()
                except:
                    response = str(raw_response)
            elif hasattr(raw_response, 'object'):
                if hasattr(raw_response.object, '_repr_markdown_'):
                    response = raw_response.object._repr_markdown_()
                else:
                    response = str(raw_response.object)
            elif hasattr(raw_response, 'value'):
                response = str(raw_response.value)
            elif str(type(raw_response)).startswith('<class \'panel.'):
                try:
                    if hasattr(raw_response, '_repr_markdown_'):
                        response = raw_response._repr_markdown_()
                    elif hasattr(raw_response, 'object'):
                        response = str(raw_response.object)
                    else:
                        response = str(raw_response)
                except:
                    response = str(raw_response)
            else:
                response = str(raw_response)
                
            # Clean up the response if it contains Markdown object representations
            if isinstance(response, str) and 'Markdown(' in response:
                import re
                match = re.search(r'Markdown\((.*)\)', response, re.DOTALL)
                if match:
                    response = match.group(1).strip('\'"')
            
            update_task_progress(task_id, "Post-processing response...")
            
            # Post-process the response to fix file:// links and enhance document references
            if isinstance(response, str):
                import re
                
                # Fix file:// links and enhance document references
                file_link_pattern = r'<a href=[\'"]file:///[^\'">]*[/\\]([^\'">]+)[\'"][^>]*>([^<]+)</a>'
                def create_document_link(match):
                    filename = match.group(1)
                    link_text = match.group(2)
                    return f'📄 **[{link_text}]** (Document: {filename})'
                response = re.sub(file_link_pattern, create_document_link, response)
                
                md_file_link_pattern = r'\[([^\]]+)\]\(file:///[^)]*[/\\]([^)]+)\)'
                def create_md_document_link(match):
                    link_text = match.group(1)
                    filename = match.group(2)
                    return f'📄 **[{link_text}]** (Document: {filename})'
                response = re.sub(md_file_link_pattern, create_md_document_link, response)
                
                collection_ref_pattern = r'Document from ChromaDB collection [\'"]([^\'"]+)[\'"]:\s*([^<\n]+)'
                def enhance_collection_reference(match):
                    collection = match.group(1)
                    document_info = match.group(2).strip()
                    return f'📚 **Collection: {collection}** → 📄 [{document_info}]'
                response = re.sub(collection_ref_pattern, enhance_collection_reference, response)
                
                local_link_pattern = r'<a href=[\'"](?!https?://)([^\'">]*)[\'"][^>]*>([^<]+)</a>'
                response = re.sub(local_link_pattern, r'📎 **\2**', response)
                
                doc_pattern = r'(\d+(?:\.\d+)*\s+)([A-Z0-9_-]+\.(?:pdf|docx?|xlsx?|pptx?))'
                response = re.sub(doc_pattern, r'📄 **\1\2**', response, flags=re.IGNORECASE)
                
                response = re.sub(r'^(References?)\s*$', r'## 📚 \1', response, flags=re.MULTILINE)
                
                numbered_ref_pattern = r'^\[(\d+)\]:\s*'
                response = re.sub(numbered_ref_pattern, r'**[\1]:** ', response, flags=re.MULTILINE)
            
            logger.info(f"✅ Task {task_id} completed: {len(response)} characters")
            
            # Get available references from the chat engine after response generation
            available_references = []
            if hasattr(chat_engine, 'get_available_references'):
                try:
                    available_references = chat_engine.get_available_references(response_text=response)
                    logger.info(f"📚 Background task: Found {len(available_references)} available references for frontend")
                except Exception as e:
                    logger.warning(f"âš ī¸  Background task: Could not retrieve available references: {e}")
                    # Fallback to get all references without filtering
                    try:
                        available_references = chat_engine.get_available_references()
                        logger.info(f"📚 Background task fallback: Found {len(available_references)} total references")
                    except:
                        available_references = []
            
            # Create result with metadata
            result = {
                'response': response,
                'timestamp': datetime.now().isoformat(),
                'user': user_info['name'],
                'available_references': available_references,
                'debug_info': {
                    'collections_requested': collections,
                    'available_collections': getattr(chat_engine, 'available_collections', []),
                    'has_data_handles': hasattr(chat_engine, 'data_handles'),
                    'database_search_enabled': config.get('enable_search', True),
                    'web_search_enabled': config.get('enable_search', True),
                    'task_id': task_id,
                    'references_count': len(available_references)
                }
            }
            
            complete_task(task_id, result)
            
        else:
            raise Exception("Chat engine response_callback method not available")
            
    except Exception as e:
        logger.error(f"❌ Task {task_id} failed: {e}")
        fail_task(task_id, str(e))

Parameters

Name Type Default Kind
task_id - - positional_or_keyword
message - - positional_or_keyword
config - - positional_or_keyword
user_info - - positional_or_keyword

Parameter Details

task_id: Parameter of type None

message: Parameter of type None

config: Parameter of type None

user_info: Parameter of type None

Return Value

Returns unspecified type

Required Imports

from flask import Flask
from flask import render_template
from flask import request
from flask import jsonify
from flask import session

Usage Example

# Example usage:
# result = process_chat_request_background(task_id, message, config)

Similar Components

AI-powered semantic similarity - components with related functionality:

  • function process_chat_background 73.9% similar

    Processes chat requests asynchronously in a background thread, managing RAG engine interactions, progress updates, and session state for various query modes including basic, extensive, full_reading, and deep_reflection.

    From: /tf/active/vicechatdev/docchat/app.py
  • function api_chat_v1 61.7% similar

    Handle chat API requests with support for long-running tasks

    From: /tf/active/vicechatdev/vice_ai/app.py
  • function process_full_reading_background 56.7% similar

    Asynchronous background task processor that executes a full reading mode RAG (Retrieval-Augmented Generation) query, tracks progress, and stores results in session history.

    From: /tf/active/vicechatdev/docchat/app.py
  • function api_chat 54.2% similar

    Flask API endpoint that handles chat requests asynchronously, processing user queries through a RAG (Retrieval-Augmented Generation) engine with support for multiple modes, memory, web search, and custom configurations.

    From: /tf/active/vicechatdev/docchat/app.py
  • function get_or_create_session 49.1% similar

    Retrieves an existing chat session by ID or creates a new one if it doesn't exist, with thread-safe access and persistent storage support.

    From: /tf/active/vicechatdev/docchat/app.py
← Back to Browse