function process_chat_request_background
Maturity: 25
Process chat request in background thread
File:
/tf/active/vicechatdev/vice_ai/app.py
Lines:
274 - 628
274 - 628
Complexity:
moderate
moderate
Purpose
Process chat request in background thread
Source Code
def process_chat_request_background(task_id, message, config, user_info):
"""Process chat request in background thread"""
try:
logger.info(f"đ Starting background task {task_id}")
logger.info(f"đ§ Received configuration: {config}")
update_task_progress(task_id, "Configuring chat engine...")
# Configure chat engine based on user selections
if hasattr(chat_engine, 'flow_control'):
# Database search (for internal documents)
chat_engine.flow_control['enable_search'] = config.get('enable_search', True)
# Web search (for external web content) - map the frontend's enable_search to web search
chat_engine.flow_control['enable_web_search'] = config.get('enable_search', True)
logger.info(f"đ Search configuration - Database: {chat_engine.flow_control['enable_search']}, Web: {chat_engine.flow_control['enable_web_search']}")
chat_engine.flow_control['enable_memory'] = config.get('enable_memory', True)
chat_engine.flow_control['enable_extensive_search'] = config.get('enable_extensive_search', False)
chat_engine.flow_control['extensive_search_chunks'] = config.get('extensive_chunks', 200)
chat_engine.flow_control['target_summary_tokens'] = config.get('summary_tokens', 5000)
chat_engine.flow_control['detail_level'] = config.get('detail_level', 'detailed')
chat_engine.flow_control['enable_keyword_filtering'] = config.get('enable_keyword_filtering', False)
chat_engine.flow_control['manual_keywords'] = config.get('manual_keywords', '')
chat_engine.flow_control['enable_reference_filtering'] = config.get('enable_reference_filtering', True)
chat_engine.flow_control['relevance_threshold'] = config.get('relevance_threshold', 0.7)
# Handle chat history for context-aware processing
if config.get('enable_memory', False) and config.get('chat_history'):
chat_history = config.get('chat_history', [])
logger.info(f"đ Received chat history with {len(chat_history)} messages for context-aware processing")
# Temporarily populate the chat memory with the provided history
# Clear existing memory first
if hasattr(chat_engine, 'chat_memory'):
logger.info(f"đ Chat memory before clearing: {len(chat_engine.chat_memory.messages)} messages")
chat_engine.chat_memory.messages = []
# Add the history messages in pairs (user, assistant)
added_exchanges = 0
# Group messages into user-assistant pairs
# Find all user messages and their corresponding assistant responses
user_messages = [msg for msg in chat_history if msg.get('role') == 'user']
assistant_messages = [msg for msg in chat_history if msg.get('role') == 'assistant']
logger.info(f"đ Found {len(user_messages)} user messages and {len(assistant_messages)} assistant messages")
# Pair them up (assuming they alternate in conversation)
max_pairs = min(len(user_messages), len(assistant_messages))
for i in range(max_pairs):
user_msg = user_messages[i]
assistant_msg = assistant_messages[i]
if user_msg and assistant_msg:
chat_engine.chat_memory.save_context(
{"role": "user", "content": user_msg.get('content', '')},
{"role": "assistant", "content": assistant_msg.get('content', '')}
)
added_exchanges += 1
logger.info(f" đŦ Added exchange {added_exchanges}: '{user_msg.get('content', '')[:50]}...' -> '{assistant_msg.get('content', '')[:50]}...'")
else:
logger.warning(f" â ī¸ Missing message in pair {i+1}")
if len(user_messages) != len(assistant_messages):
logger.warning(f" â ī¸ Uneven number of user ({len(user_messages)}) and assistant ({len(assistant_messages)}) messages")
logger.info(f"â
Populated chat memory with {added_exchanges} conversation exchanges (total messages: {len(chat_engine.chat_memory.messages)})")
# Verify memory state after population
if hasattr(chat_engine.chat_memory, 'get_formatted_history'):
formatted_history = chat_engine.chat_memory.get_formatted_history()
logger.info(f"đ Formatted history length: {len(formatted_history)} characters")
else:
logger.warning("â ī¸ Chat engine does not have chat_memory attribute")
elif config.get('enable_memory', False):
logger.info("đ Memory enabled but no chat history provided from frontend")
# Set instruction template if provided
template = config.get('instruction_template')
if template and hasattr(chat_engine, 'set_instruction_template'):
chat_engine.set_instruction_template(template)
elif template and hasattr(chat_engine, 'current_instruction_template'):
chat_engine.current_instruction_template = template
update_task_progress(task_id, "Setting up data sources...")
# Add data sources if specified
collections = config.get('collections', [])
if hasattr(chat_engine, 'data_handles'):
# Always clear existing data sources first, regardless of whether new collections are selected
try:
chat_engine.data_handles.clear_data()
logger.info("Background task: Cleared existing data sources")
except Exception as e:
logger.warning(f"Background task: Could not clear existing data sources: {e}")
# Add selected collections (only if collections are selected)
if collections:
for collection in collections:
available_collections = getattr(chat_engine, 'available_collections', [])
if collection in available_collections:
try:
processing_steps = ["similarity", "extend_query"]
if config.get('enable_keyword_filtering', False):
processing_steps.append("keyword_filter")
chat_engine.data_handles.add_data(
name=f"Internal data store: {collection}",
type="chromaDB",
data=collection,
filters="",
processing_steps=processing_steps,
inclusions=10,
instructions=""
)
logger.info(f"Background task: â
Added ChromaDB collection: {collection}")
except Exception as e:
logger.error(f"Background task: â Failed to add collection {collection}: {e}")
else:
logger.info("Background task: âšī¸ No collections selected - data sources cleared")
# Handle uploaded documents
uploaded_docs = config.get('uploaded_documents', [])
if uploaded_docs and hasattr(chat_engine, 'data_handles'):
logger.info(f"đ Processing {len(uploaded_docs)} uploaded documents")
for doc_info in uploaded_docs:
try:
# Get document content from storage
user_documents = get_user_documents(user_info.get('email', 'unknown'))
# Check if required fields exist
if 'id' not in doc_info:
logger.error(f"Background task: â Document missing 'id' field: {doc_info}")
continue
doc_id = doc_info['id']
doc_type = doc_info.get('type', 'uploaded')
# Handle different types of documents
if doc_type == 'reference' and doc_id.startswith('ref_') and not doc_id.startswith('ref_block_'):
# This is a ChromaDB reference document - skip it since it's already been processed
# These come from AI responses that cite ChromaDB collection documents
logger.info(f"Background task: âšī¸ Skipping ChromaDB reference document: {doc_id} (already processed in context)")
continue
elif doc_type == 'reference_block' or doc_id.startswith('ref_block_'):
# This is a reference block - should be processed as uploaded document
if doc_id in user_documents:
doc_data = user_documents[doc_id]
text_content = doc_data['text_content']
filename = doc_data['metadata'].get('original_filename', 'Unknown Block')
# Add as text data source
processing_steps = ["similarity"]
if config.get('enable_keyword_filtering', False):
processing_steps.append("keyword_filter")
chat_engine.data_handles.add_data(
name=f"Reference block: {filename}",
type="text",
data=text_content,
filters="",
processing_steps=processing_steps,
inclusions=5,
instructions=f"This content is from the reference block '{filename}'"
)
logger.info(f"Background task: â
Added reference block: {filename}")
else:
logger.warning(f"Background task: â ī¸ Reference block {doc_id} not found in user storage")
elif doc_type == 'reference' and 'uuid' in doc_info:
# This is a Neo4j reference document with UUID - try to retrieve from Neo4j
document_uuid = doc_info.get('uuid')
if document_uuid and hasattr(chat_engine, 'extensive_search_manager'):
try:
document_content = chat_engine.extensive_search_manager.get_document_by_uuid(document_uuid)
if document_content:
# Add as text data source
processing_steps = ["similarity"]
if config.get('enable_keyword_filtering', False):
processing_steps.append("keyword_filter")
# Extract filename from content or use UUID
filename = document_content.get('doc_name', f"Document {document_uuid[:8]}")
text_content = document_content.get('content', '')
chat_engine.data_handles.add_data(
name=f"Reference document: {filename}",
type="text",
data=text_content,
filters="",
processing_steps=processing_steps,
inclusions=5,
instructions=f"This content is from the Neo4j reference document '{filename}' with UUID {document_uuid}"
)
logger.info(f"Background task: â
Added Neo4j reference document: {filename} (UUID: {document_uuid})")
else:
logger.warning(f"Background task: â ī¸ Neo4j document {document_uuid} not found or empty")
except Exception as e:
logger.error(f"Background task: â Failed to retrieve Neo4j document {document_uuid}: {e}")
else:
logger.warning(f"Background task: â ī¸ Neo4j reference document missing UUID or extensive_search_manager not available")
elif doc_id in user_documents:
# This is a regular uploaded document
doc_data = user_documents[doc_id]
text_content = doc_data['text_content']
filename = doc_data['metadata'].get('original_filename', 'Unknown')
# Add as text data source
processing_steps = ["similarity"]
if config.get('enable_keyword_filtering', False):
processing_steps.append("keyword_filter")
chat_engine.data_handles.add_data(
name=f"Uploaded document: {filename}",
type="text",
data=text_content,
filters="",
processing_steps=processing_steps,
inclusions=5, # Fewer inclusions for uploaded docs
instructions=f"This content is from the uploaded document '{filename}'"
)
logger.info(f"Background task: â
Added uploaded document: {filename}")
else:
logger.warning(f"Background task: â ī¸ Document {doc_id} not found in user storage")
except Exception as e:
logger.error(f"Background task: â Failed to add uploaded document {doc_info.get('name', 'unknown')}: {e}")
logger.error(f"Background task: Document info: {doc_info}")
import traceback
logger.error(f"Background task: Traceback: {traceback.format_exc()}")
update_task_progress(task_id, "Generating response... This may take several minutes for extensive searches.")
# Generate response using chat engine
if hasattr(chat_engine, 'response_callback'):
raw_response = chat_engine.response_callback(message)
# Handle different response types
if hasattr(raw_response, '_repr_markdown_'):
try:
response = raw_response._repr_markdown_()
except:
response = str(raw_response)
elif hasattr(raw_response, 'object'):
if hasattr(raw_response.object, '_repr_markdown_'):
response = raw_response.object._repr_markdown_()
else:
response = str(raw_response.object)
elif hasattr(raw_response, 'value'):
response = str(raw_response.value)
elif str(type(raw_response)).startswith('<class \'panel.'):
try:
if hasattr(raw_response, '_repr_markdown_'):
response = raw_response._repr_markdown_()
elif hasattr(raw_response, 'object'):
response = str(raw_response.object)
else:
response = str(raw_response)
except:
response = str(raw_response)
else:
response = str(raw_response)
# Clean up the response if it contains Markdown object representations
if isinstance(response, str) and 'Markdown(' in response:
import re
match = re.search(r'Markdown\((.*)\)', response, re.DOTALL)
if match:
response = match.group(1).strip('\'"')
update_task_progress(task_id, "Post-processing response...")
# Post-process the response to fix file:// links and enhance document references
if isinstance(response, str):
import re
# Fix file:// links and enhance document references
file_link_pattern = r'<a href=[\'"]file:///[^\'">]*[/\\]([^\'">]+)[\'"][^>]*>([^<]+)</a>'
def create_document_link(match):
filename = match.group(1)
link_text = match.group(2)
return f'đ **[{link_text}]** (Document: {filename})'
response = re.sub(file_link_pattern, create_document_link, response)
md_file_link_pattern = r'\[([^\]]+)\]\(file:///[^)]*[/\\]([^)]+)\)'
def create_md_document_link(match):
link_text = match.group(1)
filename = match.group(2)
return f'đ **[{link_text}]** (Document: {filename})'
response = re.sub(md_file_link_pattern, create_md_document_link, response)
collection_ref_pattern = r'Document from ChromaDB collection [\'"]([^\'"]+)[\'"]:\s*([^<\n]+)'
def enhance_collection_reference(match):
collection = match.group(1)
document_info = match.group(2).strip()
return f'đ **Collection: {collection}** â đ [{document_info}]'
response = re.sub(collection_ref_pattern, enhance_collection_reference, response)
local_link_pattern = r'<a href=[\'"](?!https?://)([^\'">]*)[\'"][^>]*>([^<]+)</a>'
response = re.sub(local_link_pattern, r'đ **\2**', response)
doc_pattern = r'(\d+(?:\.\d+)*\s+)([A-Z0-9_-]+\.(?:pdf|docx?|xlsx?|pptx?))'
response = re.sub(doc_pattern, r'đ **\1\2**', response, flags=re.IGNORECASE)
response = re.sub(r'^(References?)\s*$', r'## đ \1', response, flags=re.MULTILINE)
numbered_ref_pattern = r'^\[(\d+)\]:\s*'
response = re.sub(numbered_ref_pattern, r'**[\1]:** ', response, flags=re.MULTILINE)
logger.info(f"â
Task {task_id} completed: {len(response)} characters")
# Get available references from the chat engine after response generation
available_references = []
if hasattr(chat_engine, 'get_available_references'):
try:
available_references = chat_engine.get_available_references(response_text=response)
logger.info(f"đ Background task: Found {len(available_references)} available references for frontend")
except Exception as e:
logger.warning(f"â ī¸ Background task: Could not retrieve available references: {e}")
# Fallback to get all references without filtering
try:
available_references = chat_engine.get_available_references()
logger.info(f"đ Background task fallback: Found {len(available_references)} total references")
except:
available_references = []
# Create result with metadata
result = {
'response': response,
'timestamp': datetime.now().isoformat(),
'user': user_info['name'],
'available_references': available_references,
'debug_info': {
'collections_requested': collections,
'available_collections': getattr(chat_engine, 'available_collections', []),
'has_data_handles': hasattr(chat_engine, 'data_handles'),
'database_search_enabled': config.get('enable_search', True),
'web_search_enabled': config.get('enable_search', True),
'task_id': task_id,
'references_count': len(available_references)
}
}
complete_task(task_id, result)
else:
raise Exception("Chat engine response_callback method not available")
except Exception as e:
logger.error(f"â Task {task_id} failed: {e}")
fail_task(task_id, str(e))
Parameters
| Name | Type | Default | Kind |
|---|---|---|---|
task_id |
- | - | positional_or_keyword |
message |
- | - | positional_or_keyword |
config |
- | - | positional_or_keyword |
user_info |
- | - | positional_or_keyword |
Parameter Details
task_id: Parameter of type None
message: Parameter of type None
config: Parameter of type None
user_info: Parameter of type None
Return Value
Returns unspecified type
Required Imports
from flask import Flask
from flask import render_template
from flask import request
from flask import jsonify
from flask import session
Usage Example
# Example usage:
# result = process_chat_request_background(task_id, message, config)
Similar Components
AI-powered semantic similarity - components with related functionality:
-
function process_chat_background 73.9% similar
-
function api_chat_v1 61.7% similar
-
function process_full_reading_background 56.7% similar
-
function api_chat 54.2% similar
-
function get_or_create_session 49.1% similar