class DocChatRAG
Main RAG engine with three operating modes: 1. Basic RAG (similarity search) 2. Extensive (full document retrieval with preprocessing) 3. Full Reading (process all documents)
/tf/active/vicechatdev/docchat/rag_engine.py
330 - 2989
moderate
Purpose
Main RAG engine with three operating modes: 1. Basic RAG (similarity search) 2. Extensive (full document retrieval with preprocessing) 3. Full Reading (process all documents)
Source Code
class DocChatRAG:
"""
Main RAG engine with three operating modes:
1. Basic RAG (similarity search)
2. Extensive (full document retrieval with preprocessing)
3. Full Reading (process all documents)
"""
# Default prompt templates (can be overridden via config or at runtime)
DEFAULT_PROMPT_TEMPLATES = {
"expert_system": {
"instructions": """[System Instructions]:
You are an {role}. You excel at:
{expertise}
{domain_context} including:
- Document context from retrieved sources
- Optional web search results for supplementary information
- Previous conversation history for contextual continuity
- Custom instructions for task-specific guidance
Your task is to provide a comprehensive, accurate response by following the step-by-step instructions below for each information section.""",
"output_constraints": """[Output Constraints]:
- Your response must be in **strict Markdown format** without code block markers
- Use clear hierarchical structure with appropriate headers (##, ###)
- **Use only the provided context** — do not generate information from external knowledge unless explicitly requested
- **Cite sources inline** by inserting block numbers in square brackets `[Block X]` immediately after referenced information
- You can cite multiple sources like `[Block 1, Block 2]` or ranges like `[Blocks 1-3]`
- Include relevant quotes or data points from sources when they strengthen your answer
- Use tables, lists, and formatting to enhance readability
- If information is incomplete or uncertain, explicitly acknowledge gaps
- Maintain a {output_style}, objective tone appropriate for research and analysis"""
},
"step_instructions": {
"document_context": "Process the [Document Context] section by analyzing the retrieved information from internal documents. Identify key facts, relationships, and insights relevant to the user query. Pay special attention to document attribution (source filenames) when citing information.",
"web_search": "Review the [Web Search Results] section for supplementary or fact-checking information. Cross-reference with document context and note any confirmations or contradictions. Web results are marked with URLs for verification.",
"chat_history": "Examine the [Previous Conversation] section to understand the ongoing dialogue context. Use this to interpret references, pronouns, or abbreviated queries. Build upon previous exchanges to provide coherent, contextually-aware responses.",
"custom_instructions": "Follow the [Custom Instructions] section carefully. These are user-provided guidelines specific to this query and take precedence over general instructions. Adapt your response style, depth, and focus according to these directives."
}
}
def __init__(self, collection_name: str = None, api_key: str = None,
system_role: str = None, system_expertise: str = None,
system_domain_context: str = None, custom_system_instructions: str = None):
"""
Initialize RAG engine
Args:
collection_name: ChromaDB collection name
api_key: OpenAI API key
system_role: Custom role description (overrides config)
system_expertise: Custom expertise description (overrides config)
system_domain_context: Custom domain context (overrides config)
custom_system_instructions: Complete custom system instructions (overrides everything)
"""
self.collection_name = collection_name or config.CHROMA_COLLECTION_NAME
self.api_key = api_key or config.OPENAI_API_KEY
# Store customizable prompt components
self.system_role = custom_system_instructions or system_role or config.SYSTEM_ROLE
self.system_expertise = system_expertise or config.SYSTEM_EXPERTISE
self.system_domain_context = system_domain_context or config.SYSTEM_DOMAIN_CONTEXT
self.custom_system_instructions = custom_system_instructions or config.CUSTOM_SYSTEM_INSTRUCTIONS
self.output_style = config.OUTPUT_STYLE
# Build prompt templates dynamically
self.PROMPT_TEMPLATES = self._build_prompt_templates()
# Small LLM usage tracking (like vice_ai)
self.small_llm_usage = {
"query_expansion": 0,
"context_aware_expansion": 0,
"document_extraction": 0,
"summarization": 0,
"total_calls": 0
}
# Large LLM usage tracking
self.large_llm_usage = 0
# Initialize ChromaDB - External connection to oneco_chroma (same pattern as vice_ai)
logger.info(f"Connecting to ChromaDB at {config.CHROMA_HOST}:{config.CHROMA_PORT}")
self.chroma_client = chromadb.HttpClient(
host=config.CHROMA_HOST,
port=config.CHROMA_PORT
)
# Initialize embedding function
self.embedding_function = DocChatEmbeddingFunction(
api_key=self.api_key,
embed_model_name=config.EMBEDDING_MODEL,
llm_model_name=config.SMALL_LLM_MODEL
)
# Get or create collection with HNSW configuration
# In ChromaDB 1.3+, HNSW with cosine similarity is the default
self.collection = self.chroma_client.get_or_create_collection(
name=self.collection_name,
embedding_function=self.embedding_function,
metadata={"hnsw:space": "cosine", "description": "DocChat RAG collection"}
)
logger.info(f"Connected to collection: {self.collection_name}")
# Cache for all source paths (invalidated when documents are indexed)
self._source_cache = None
self._source_cache_count = None
# Initialize LLMs - Support multi-provider
self.main_llm = get_llm_instance(
model_name=config.LLM_MODEL,
temperature=config.TEMPERATURE,
max_tokens=config.MAX_OUTPUT_TOKENS
)
self.small_llm = get_llm_instance(
model_name=config.SMALL_LLM_MODEL,
temperature=0,
max_tokens=config.MAX_OUTPUT_TOKENS
)
# Initialize extractor for extensive/full reading modes
self.extractor = QueryBasedExtractor(
api_key=self.api_key,
model_name=config.SMALL_LLM_MODEL,
parent_rag=self # Pass reference for usage tracking
)
# Initialize reranker
self.reranker = CrossEncoder('cross-encoder/ms-marco-MiniLM-L-6-v2')
# Initialize web search (optional)
self.web_search = None
if config.SERPER_API_KEY:
try:
self.web_search = GoogleSerperAPIWrapper(serper_api_key=config.SERPER_API_KEY)
logger.info("Web search initialized")
except Exception as e:
logger.warning(f"Web search not available: {e}")
# Initialize chat memory
from langchain.memory import ConversationBufferMemory
self.chat_memory = ConversationBufferMemory(
memory_key="chat_history",
return_messages=True
)
# Token counter
self.tokenizer = tiktoken.get_encoding("cl100k_base")
def _build_prompt_templates(self):
"""
Build prompt templates dynamically from config or custom settings
Returns:
Dictionary of prompt templates
"""
# If complete custom system instructions provided, use that
if self.custom_system_instructions:
return {
"expert_system": {
"instructions": self.custom_system_instructions,
"output_constraints": self.DEFAULT_PROMPT_TEMPLATES["expert_system"]["output_constraints"].format(
output_style=self.output_style
)
},
"step_instructions": self.DEFAULT_PROMPT_TEMPLATES["step_instructions"]
}
# Otherwise, build from components
# Format expertise as bullet list
if isinstance(self.system_expertise, list):
expertise_text = "\n".join([f"- {item}" for item in self.system_expertise])
else:
expertise_text = self.system_expertise
instructions = self.DEFAULT_PROMPT_TEMPLATES["expert_system"]["instructions"].format(
role=self.system_role,
expertise=expertise_text,
domain_context=self.system_domain_context
)
output_constraints = self.DEFAULT_PROMPT_TEMPLATES["expert_system"]["output_constraints"].format(
output_style=self.output_style
)
return {
"expert_system": {
"instructions": instructions,
"output_constraints": output_constraints
},
"step_instructions": self.DEFAULT_PROMPT_TEMPLATES["step_instructions"]
}
# Response callback for streaming
self.response_callback: Optional[Callable] = None
# Configuration flags
self.enable_web_search = False
self.enable_memory = True
def set_model(self, model_name: str):
"""
Dynamically switch the main LLM model during runtime.
Args:
model_name: Name of the model to switch to
"""
logger.info(f"Switching main LLM model to: {model_name}")
self.main_llm = get_llm_instance(
model_name=model_name,
temperature=config.TEMPERATURE,
max_tokens=config.MAX_OUTPUT_TOKENS
)
logger.info(f"Model switched successfully to: {model_name}")
def count_tokens(self, text: str) -> int:
"""Count tokens in text"""
return len(self.tokenizer.encode(text))
def set_response_callback(self, callback: Callable):
"""Set callback for streaming responses"""
self.response_callback = callback
def _rerank_results(self, query: str, results: List[Dict]) -> List[Dict]:
"""Rerank results using cross-encoder"""
if not results:
return results
# Prepare pairs for reranking
pairs = [[query, r['text']] for r in results]
# Get reranking scores
scores = self.reranker.predict(pairs)
# Add scores and sort
for i, result in enumerate(results):
result['rerank_score'] = float(scores[i])
results.sort(key=lambda x: x['rerank_score'], reverse=True)
return results
def _deduplicate_chunks(self, chunks: List[Dict], similarity_threshold: float = 0.85) -> List[Dict]:
"""
Remove duplicate or highly similar chunks to improve diversity
Uses simple text similarity to detect near-duplicates
Args:
chunks: List of chunk dictionaries with 'text' field
similarity_threshold: Threshold for considering chunks as duplicates (0-1)
Returns:
Deduplicated list of chunks
"""
if len(chunks) <= 1:
return chunks
# Simple deduplication: remove chunks with very similar text
deduplicated = []
seen_texts = []
for chunk in chunks:
chunk_text = chunk['text'].lower().strip()
# Check if too similar to any seen chunk
is_duplicate = False
for seen_text in seen_texts:
# Simple Jaccard similarity for speed
words1 = set(chunk_text.split())
words2 = set(seen_text.split())
if len(words1) == 0 or len(words2) == 0:
continue
intersection = len(words1 & words2)
union = len(words1 | words2)
similarity = intersection / union if union > 0 else 0
if similarity >= similarity_threshold:
is_duplicate = True
logger.debug(f"Removing duplicate chunk (similarity: {similarity:.2f})")
break
if not is_duplicate:
deduplicated.append(chunk)
seen_texts.append(chunk_text)
logger.info(f"Deduplication: {len(chunks)} -> {len(deduplicated)} chunks")
return deduplicated
def _format_response_with_references(self, response_text: str, context_blocks: List[Dict], web_references: List[Dict] = None) -> Dict[str, Any]:
"""
Format response with inline references like vice_ai
Args:
response_text: The LLM response
context_blocks: List of context blocks with metadata
web_references: List of web search results with URLs
Returns:
Dictionary with formatted response and references
"""
# Create blocks dictionary for process_references
blocks_dict = {}
for i, block in enumerate(context_blocks):
block_num = i + 1
# Get source path - try both 'source' and 'file_path'
source_path = block.get('metadata', {}).get('source') or block.get('metadata', {}).get('file_path', 'Unknown')
blocks_dict[block_num] = {
'type': 'document',
'path': source_path,
'description': block.get('file_name', 'Unknown'),
'content': block.get('text', '')
}
# Add web references to blocks_dict
if web_references:
for web_ref in web_references:
block_num = web_ref['block_num']
blocks_dict[block_num] = {
'type': 'web',
'url': web_ref['url'],
'title': web_ref['title'],
'description': web_ref['title'],
'content': web_ref['snippet']
}
# Process references in the response text
updated_text, references = self._process_references(response_text, blocks_dict)
return {
'response': updated_text,
'references': references,
'has_references': len(references) > 0
}
def _process_references(self, text: str, blocks_dict: Dict[int, Dict]) -> tuple:
"""
Process references in text, converting [Block X] to numerical citations [1, 2, 3]
Based on vice_ai's process_references method
Args:
text: Text containing block references like [Block 1], [Blocks 2-4]
blocks_dict: Dictionary mapping block numbers to block data
Returns:
tuple: (updated_text with numerical citations, list of reference objects)
"""
import re
logger.info(f"🔍 _process_references called with {len(blocks_dict)} blocks")
logger.info(f"🔍 Text length: {len(text)} chars, First 300 chars: {text[:300]}")
# Pattern to find block references - matches multiple formats:
# [Block X], [Blocks X-Y], [X], [X,Y], [X, Y, Z], etc.
# First try full [Block X] format
block_format_pattern = r'\[((?:[Bb]locks?\s+\d+(?:\s*[,-]\s*(?:[Bb]locks?\s*)?\d+)*)|(?:[Bb]locks?\s+\d+))\]'
# Also try simple [X] or [X,Y,Z] format - numbers with optional commas/spaces
simple_format_pattern = r'\[(\d+(?:\s*,\s*\d+)*)\]'
# Try block format first
ref_matches = re.findall(block_format_pattern, text)
using_simple_format = False
# If no matches, try simple [X] or [X,Y,Z] format
if not ref_matches:
ref_matches = re.findall(simple_format_pattern, text)
using_simple_format = True
logger.info(f"🔍 Using simple [X] or [X,Y,Z] citation format")
all_refs_pattern = simple_format_pattern if using_simple_format else block_format_pattern
logger.info(f"🔍 Found {len(ref_matches)} citations: {ref_matches[:10]}")
if not ref_matches:
# No block references found, return empty references
logger.warning("⚠️ No block references found in response text - LLM did not cite sources")
return text, []
# Create mapping of original references to their positions
original_refs = {}
for match in re.finditer(all_refs_pattern, text):
original_refs[match.group(0)] = match.span()
# Extract block numbers from each reference
reference_catalog = {}
for ref_text in ref_matches:
if using_simple_format:
# Simple [X] or [X,Y,Z] format - parse comma-separated numbers
# ref_text could be "1" or "1,2,3" or "1, 2, 3"
block_numbers = [int(num.strip()) for num in ref_text.split(',')]
reference_catalog[f"[{ref_text}]"] = block_numbers
else:
# [Block X] format - handle ranges with hyphens (e.g., "Blocks 2-11")
expanded_numbers = []
# First check for ranges with hyphens
range_matches = re.findall(r'(\d+)\s*-\s*(\d+)', ref_text)
for start, end in range_matches:
start_num, end_num = int(start), int(end)
expanded_numbers.extend(range(start_num, end_num + 1))
# Then add individual numbers not in ranges
cleaned_text = re.sub(r'\d+\s*-\s*\d+', '', ref_text)
individual_numbers = [int(num) for num in re.findall(r'\d+', cleaned_text)]
expanded_numbers.extend(individual_numbers)
# Remove duplicates and store
block_numbers = sorted(list(set(expanded_numbers)))
reference_catalog[f"[{ref_text}]"] = block_numbers
# Collect all unique block numbers
all_block_numbers = []
for numbers in reference_catalog.values():
for num in numbers:
if num not in all_block_numbers:
all_block_numbers.append(num)
# Create references for all found block numbers
references = []
citation_map = {}
for i, block_num in enumerate(all_block_numbers, 1):
if block_num not in blocks_dict:
logger.warning(f"Block {block_num} referenced but not found in blocks_dict")
continue
block_data = blocks_dict[block_num]
ref_id = str(i)
# Create reference object based on type
if block_data.get('type') == 'web':
# Web search result
ref_obj = {
'id': ref_id,
'type': 'web',
'title': block_data.get('title', 'Web Page'),
'url': block_data.get('url', ''),
'text': block_data.get('content', ''),
'file_name': block_data.get('title', 'Web Page')
}
else:
# Document reference
ref_obj = {
'id': ref_id,
'type': 'file',
'file_name': block_data.get('description', 'Unknown'),
'source': block_data.get('path', ''),
'text': block_data.get('content', '')[:500], # Preview
'full_text': block_data.get('content', ''),
'metadata': {'source': block_data.get('path', '')}
}
references.append(ref_obj)
citation_map[block_num] = ref_id
# Replace block references with numerical citations
updated_text = text
sorted_refs = sorted(original_refs.keys(), key=len, reverse=True)
for original_ref in sorted_refs:
block_numbers = reference_catalog.get(original_ref, [])
if not block_numbers:
continue
# Create list of citation IDs
ref_ids = []
for block_num in block_numbers:
if block_num in citation_map:
ref_ids.append(citation_map[block_num])
if ref_ids:
# Sort numerically and create citation
sorted_ref_ids = sorted(ref_ids, key=int)
new_ref = f"[{', '.join(sorted_ref_ids)}]"
updated_text = updated_text.replace(original_ref, new_ref)
return updated_text, references
def _optimize_query(self, query: str, chat_history: List[Dict] = None) -> str:
"""Optimize user query for better retrieval"""
# If no history, return original query
if not chat_history or len(chat_history) == 0:
return query
# Create optimization prompt
history_text = "\n".join([
f"{'User' if msg['role'] == 'user' else 'Assistant'}: {msg['content']}"
for msg in chat_history[-4:] # Last 4 messages for context
])
prompt = f"""Given the conversation history and the new user query, reformulate the query to be
more specific and standalone for document retrieval.
CONVERSATION HISTORY:
{history_text}
NEW QUERY:
{query}
OPTIMIZED STANDALONE QUERY (single line, no explanation):
"""
response = self.small_llm.invoke(prompt)
optimized = response.content.strip()
logger.info(f"Original query: {query}")
logger.info(f"Optimized query: {optimized}")
return optimized
def get_all_sources(self) -> set:
"""
Get all unique source paths from the collection.
Uses caching to avoid repeated queries.
Returns:
Set of all source paths
"""
# Check if cache is valid
current_count = self.collection.count()
if self._source_cache is not None and self._source_cache_count == current_count:
return self._source_cache
# Fetch all sources
all_data = self.collection.get()
if not all_data['metadatas']:
self._source_cache = set()
self._source_cache_count = current_count
return self._source_cache
# Extract unique source paths
all_sources = set()
for metadata in all_data['metadatas']:
# Try 'source' first, then 'file_path' as fallback
source = metadata.get('source') or metadata.get('file_path')
if source:
all_sources.add(source)
# Debug: Log what fields we found
if all_data['metadatas']:
sample_metadata = all_data['metadatas'][0]
logger.info(f"Sample metadata fields: {list(sample_metadata.keys())}")
# Update cache
self._source_cache = all_sources
self._source_cache_count = current_count
logger.info(f"Cached {len(all_sources)} unique source paths")
return all_sources
def get_matching_sources(self, source_filters: List[str]) -> List[str]:
"""
Get all document source paths that match the filters.
This is used to build a proper where clause for ChromaDB.
Args:
source_filters: List of file paths or folder paths
Returns:
List of matching source paths (for use in where clause)
"""
if not source_filters:
return None
# Get all sources (uses cache)
all_sources = self.get_all_sources()
if not all_sources:
return []
logger.debug(f"🔍 SOURCE FILTER DEBUG:")
logger.debug(f" Filters provided: {source_filters}")
logger.debug(f" Total sources in DB: {len(all_sources)}")
# Convert set to list for slicing
all_sources_list = list(all_sources)
logger.debug(f" Sample sources from DB: {all_sources_list[:5] if len(all_sources_list) > 5 else all_sources_list}")
# Filter sources that match our filters
matching_sources = [src for src in all_sources if matches_source_filter(src, source_filters)]
logger.debug(f" ✅ Matched sources: {matching_sources}")
logger.info(f"Source filter: {len(source_filters)} filters matched {len(matching_sources)} documents")
return matching_sources
def _detect_and_translate_query(self, query: str) -> Dict[str, Any]:
"""
Detect the language of the query and generate translations in supported languages.
Args:
query: Original user query
Returns:
Dictionary with detected_language, original_query, and translations dict
"""
languages = config.SUPPORTED_LANGUAGES
language_names = [config.LANGUAGE_NAMES.get(lang, lang) for lang in languages]
logger.info(f"Detecting language and generating translations for: {languages}")
# Build dynamic translations dict example based on configured languages
translations_example = ',\n '.join(f'"{lang}": "{config.LANGUAGE_NAMES.get(lang, lang)} translation"' for lang in languages)
prompt = f"""You are a language detection and translation expert.
### User Query:
{query}
### Task:
1. Detect the primary language of the user query from these options: {', '.join(language_names)}
2. Generate accurate translations of the query into each of the following languages:
{chr(10).join(f' - {config.LANGUAGE_NAMES.get(lang, lang)} ({lang})' for lang in languages)}
### Important Instructions:
- Preserve the meaning, intent, and technical terms of the original query
- For technical or domain-specific terms, use appropriate terminology in each language
- If a term doesn't translate well, keep it in the original language
- Ensure translations are natural and idiomatic in each target language
- Return ONLY a JSON object with this exact structure (no markdown formatting):
{{
"detected_language": "language_code",
"translations": {{
{translations_example}
}}
}}
### Response (JSON only):"""
try:
response = self.small_llm.invoke(prompt)
content = response.content.strip()
# Remove markdown code blocks if present
if content.startswith('```'):
content = content.split('```')[1]
if content.startswith('json'):
content = content[4:]
content = content.strip()
import json
result = json.loads(content)
# Validate structure
if 'detected_language' not in result or 'translations' not in result:
raise ValueError("Invalid response structure")
# Filter translations to only include configured languages
# This prevents the LLM from returning extra languages we didn't ask for
filtered_translations = {}
for lang in languages:
if lang in result['translations']:
filtered_translations[lang] = result['translations'][lang]
else:
# Fallback to original query if translation missing
filtered_translations[lang] = query
logger.warning(f"Translation missing for {lang}, using original query")
result['translations'] = filtered_translations
logger.info(f"Detected language: {result['detected_language']}")
logger.info(f"Generated {len(result['translations'])} translations")
return result
except Exception as e:
logger.error(f"Failed to detect language and translate: {e}")
# Fallback: return original query for all languages
return {
'detected_language': 'en',
'translations': {lang: query for lang in languages}
}
def _extend_query(self, query: str, chat_history: List[Dict] = None) -> List[str]:
"""
Context-aware query expansion with multi-language support.
Leverages conversation history when available for better continuity.
Returns list of expanded query variants in multiple languages.
"""
logger.info("🔄 Extending query with multi-language variations...")
# Check if we have chat history for context-aware expansion
has_history = chat_history and len(chat_history) > 0
if has_history:
logger.info(f" 📚 Context-aware mode: Using {len(chat_history)} messages from chat history")
self.small_llm_usage["context_aware_expansion"] += 1
else:
logger.info(" 📝 Standard mode: No chat history available")
self.small_llm_usage["query_expansion"] += 1
self.small_llm_usage["total_calls"] += 1
logger.info(f" 🤖 Using small LLM ({config.SMALL_LLM_MODEL}) for query expansion")
logger.info(f" 📊 Small LLM usage: {self.small_llm_usage['total_calls']} total calls")
# Step 1: Detect language and get translations
translation_result = self._detect_and_translate_query(query)
detected_lang = translation_result['detected_language']
translations = translation_result['translations']
# Step 2: Generate expanded queries for each language
all_extended_queries = []
for lang_code, translated_query in translations.items():
lang_name = config.LANGUAGE_NAMES.get(lang_code, lang_code)
logger.info(f"Generating expanded queries in {lang_name}...")
if has_history:
# Format chat history with better structure
history_pairs = []
for i in range(0, len(chat_history) - 1, 2):
if i + 1 < len(chat_history):
user_msg = chat_history[i]
asst_msg = chat_history[i + 1]
if user_msg.get('role') == 'user' and asst_msg.get('role') == 'assistant':
# Truncate long assistant responses to keep context manageable
asst_content = asst_msg['content'][:500] + "..." if len(asst_msg['content']) > 500 else asst_msg['content']
history_pairs.append(f"User: {user_msg['content']}\nAssistant: {asst_content}")
history_text = "\n\n".join(history_pairs[-3:]) # Last 3 exchanges
# Context-aware prompt (enhanced from vice_ai style)
prompt = f"""You are an AI that enhances search queries to improve information retrieval in {lang_name}, taking into account the conversation history.
### Previous Conversation History:
{history_text}
### Current User Query:
{translated_query}
### Instructions:
- Generate exactly 3 expanded queries IN {lang_name.upper()}
- **Analyze the conversation history** to understand the ongoing discussion context
- The current query may be a **follow-up** that refers to previous topics, entities, or concepts
- If the query contains **pronouns** (it, this, that, they) or **short references**, expand them using context from history
- If the query is related to previous questions, create variations that **make the connection explicit**
- Each query should explore a different aspect using {lang_name} terminology
- Use synonyms and related terms natural to {lang_name}
- **Ensure queries are self-contained** and coherent even without the conversation history
- ONLY return the text of the queries, one per line
### Expanded Queries:"""
else:
prompt = f"""You are an AI that enhances search queries to improve information retrieval in {lang_name}.
### User Query:
{translated_query}
### Instructions:
- Generate exactly 3 expanded queries IN {lang_name.upper()}
- Each query should explore a different aspect or perspective
- Use synonyms, related terms natural to {lang_name}
- Ensure queries are relevant and coherent
- Avoid queries that are too similar
- ONLY return the text of the queries, one per line
### Expanded Queries:"""
try:
response = self.small_llm.invoke(prompt)
extended = [x.strip() for x in response.content.strip().split("\n") if x.strip()]
# Add language tag to queries for debugging/tracking
tagged_queries = [f"{q}" for q in extended[:3]] # Limit to 3 per language
all_extended_queries.extend(tagged_queries)
logger.info(f"Generated {len(tagged_queries)} queries in {lang_name}")
except Exception as e:
logger.error(f"Failed to generate queries in {lang_name}: {e}")
# Fallback: add original translated query
all_extended_queries.append(translated_query)
# Also include the original query and translations
all_extended_queries.insert(0, query) # Original query first
for lang_code, trans_query in translations.items():
if trans_query != query and trans_query not in all_extended_queries:
all_extended_queries.append(trans_query)
logger.info(f"Query extension complete. Generated {len(all_extended_queries)} total variants across {len(translations)} languages")
return all_extended_queries
def _generate_web_search_queries(self, query: str, chat_history: List[Dict] = None) -> List[str]:
"""
Generate optimized web search queries using 2-step prompting.
Returns list of 3 search queries.
"""
logger.info("Generating web search queries...")
# Format chat history if available
history_text = ""
if chat_history and len(chat_history) > 0:
# Format chat history - convert role/content to user/assistant pairs
history_pairs = []
for i in range(0, len(chat_history) - 1, 2): # Step by 2 to get pairs
if i + 1 < len(chat_history):
user_msg = chat_history[i]
asst_msg = chat_history[i + 1]
if user_msg.get('role') == 'user' and asst_msg.get('role') == 'assistant':
history_pairs.append(f"User: {user_msg['content']}\nAssistant: {asst_msg['content']}")
history_text = "\n\n".join(history_pairs[-3:]) # Last 3 exchanges
prompt = f"""[System Instructions]:
You are an advanced reasoning assistant. Your task is to process the following query and identify information gaps where real-time, up-to-date data from the web would significantly improve the answer.
{f"### Previous Conversation:{chr(10)}{history_text}{chr(10)}" if history_text else ""}
### Current User Query:
{query}
Your task is to craft an optimized, concise Google search strategy to retrieve the most relevant information. Your strategy should consist of exactly three carefully designed and complementary search queries.
[Output Constraints]:
- Do NOT output the answer or any explanations of your reasoning.
- Output only the final Google search queries in strict JSON format:
```json
{{
"search_queries": [
"First search query",
"Second search query",
"Third search query"
]
}}
```
- Ensure the search queries are clear, concise, and directly relevant to the information gaps identified in the query.
- Don't include any additional information or context.
- Ensure the query includes specific keywords, filters, or advanced search operators to maximize precision.
Respond with ONLY the JSON, nothing else."""
response = self.small_llm.invoke(prompt)
# Parse JSON response
try:
# Extract JSON from response
content = response.content.strip()
if "```json" in content:
content = content.split("```json")[1].split("```")[0].strip()
elif "```" in content:
content = content.split("```")[1].split("```")[0].strip()
import json
data = json.loads(content)
search_queries = data.get("search_queries", [])
logger.info(f"Generated {len(search_queries)} web search queries")
return search_queries
except Exception as e:
logger.warning(f"Failed to parse search queries: {e}")
return [query] # Fallback to original query
def score_reference_relevance(self, final_answer: str, reference_documents: List[Dict],
relevance_threshold: float = 0.3) -> List[Dict]:
"""
Score the relevance of each reference document against the final answer.
Filters out documents below the relevance threshold.
Args:
final_answer: The generated answer text
reference_documents: List of document dictionaries with 'text' and 'file_name'
relevance_threshold: Minimum score to include a reference (0.0-1.0)
Returns:
List of document dictionaries with added 'relevance_score' field, filtered by threshold
"""
logger.info(f"🎯 Scoring reference relevance against final answer...")
logger.info(f" • Evaluating {len(reference_documents)} reference documents")
logger.info(f" • Relevance threshold: {relevance_threshold}")
relevant_references = []
# Create a short summary of the final answer for comparison
answer_summary = final_answer[:1000] + "..." if len(final_answer) > 1000 else final_answer
for i, doc in enumerate(reference_documents):
doc_name = doc.get('file_name', 'Unknown')
logger.info(f" Scoring document {i+1}/{len(reference_documents)}: {doc_name[:50]}...")
# Create excerpt from document for scoring
content = doc.get('text') or doc.get('content', '')
content_excerpt = content[:800] + "..." if len(content) > 800 else content
prompt = f"""You are a document relevance scorer. Determine how relevant a reference document is to a final answer.
FINAL ANSWER (excerpt):
{answer_summary}
REFERENCE DOCUMENT (excerpt):
{content_excerpt}
Rate the relevance on a scale of 0.0 to 1.0:
- 0.0: Completely irrelevant
- 0.3: Somewhat relevant, background information
- 0.5: Moderately relevant, supporting information
- 0.7: Highly relevant, key information used
- 1.0: Extremely relevant, essential for answer
Consider:
- Does the document contain information that directly supports the answer?
- Are there shared topics, concepts, or findings?
- Would removing this reference make the answer less accurate?
Respond with ONLY a number between 0.0 and 1.0."""
try:
response = self.small_llm.invoke(prompt)
score_text = response.content.strip()
# Extract number from response
import re
number_match = re.search(r'(\d+\.?\d*)', score_text)
if number_match:
score = float(number_match.group(1))
score = max(0.0, min(1.0, score)) # Clamp to 0.0-1.0
else:
logger.warning(f" ⚠️ Could not parse score from: '{score_text}', using 0.5")
score = 0.5
# Add score to document
doc_with_score = doc.copy()
doc_with_score['relevance_score'] = score
if score >= relevance_threshold:
relevant_references.append(doc_with_score)
logger.info(f" ✅ Score: {score:.2f} (included)")
else:
logger.info(f" ❌ Score: {score:.2f} (excluded - below threshold)")
except Exception as e:
logger.error(f" Error scoring document: {e}")
# On error, include with default score
doc_with_score = doc.copy()
doc_with_score['relevance_score'] = 0.5
if 0.5 >= relevance_threshold:
relevant_references.append(doc_with_score)
logger.info(f" 📊 References after relevance filtering: {len(relevant_references)}/{len(reference_documents)}")
return relevant_references
def mode_1_basic_rag(self, query: str, top_k: int = 5,
chat_history: List[Dict] = None,
use_reranking: bool = True,
source_filters: List[str] = None,
manual_keywords: List[str] = None,
custom_instructions: str = None,
output_language: str = 'en') -> Dict[str, Any]:
"""
Mode 1: Basic RAG with similarity search
Args:
query: User query
top_k: Number of chunks to retrieve (will be adjusted for multi-language search)
chat_history: Previous conversation for query optimization
use_reranking: Whether to rerank results
source_filters: List of file paths to restrict search to
manual_keywords: Additional keywords to enhance search
custom_instructions: Optional custom instructions to guide the LLM
output_language: Language code for the response (e.g., 'en', 'nl', 'fr')
Returns:
Dictionary with response and context
"""
logger.info(f"Mode 1 - Basic RAG: {query}")
# Adjust top_k for multi-language search
num_languages = len(config.SUPPORTED_LANGUAGES)
adjusted_top_k = config.get_adjusted_top_k(top_k, num_languages)
logger.info(f"Multi-language search: {num_languages} languages, adjusted top_k from {top_k} to {adjusted_top_k}")
# Expand query into multiple variants for better retrieval
extended_queries = self._extend_query(query, chat_history)
extended_queries.append(query) # Add original query
# Add manual keywords as additional query variants
if manual_keywords and len(manual_keywords) > 0:
logger.info(f"Adding {len(manual_keywords)} manual keywords: {manual_keywords}")
# Create query variants with keywords
for keyword in manual_keywords:
# Add standalone keyword
extended_queries.append(keyword)
# Add keyword combined with original query
extended_queries.append(f"{query} {keyword}")
logger.info(f"Using {len(extended_queries)} query variants for retrieval")
# Build where clause for source filtering
# If source_filters contains folders, we need to resolve them to actual file paths
where_clause = None
if source_filters and len(source_filters) > 0:
logger.info(f"Applying source filters: {source_filters}")
# Get all matching sources (handles both files and folders)
matching_sources = self.get_matching_sources(source_filters)
if matching_sources:
# Use 'file_path' field instead of 'source' (based on actual metadata schema)
where_clause = {"file_path": {"$in": matching_sources}}
logger.info(f"Source filter: {len(source_filters)} filters -> {len(matching_sources)} documents")
logger.info(f"Matching documents: {matching_sources}")
else:
logger.warning("Source filter matched no documents!")
else:
logger.info("No source filters applied - searching all documents")
# Retrieve relevant chunks with overselection strategy
# Retrieve more chunks initially to account for deduplication and filtering
overselection_factor = 3 if use_reranking else 1.5
initial_retrieval = int(adjusted_top_k * overselection_factor)
# Query with all variants and collect unique results
all_retrieved = []
seen_ids = set()
logger.info(f"Retrieving with {len(extended_queries)} query variants (k={initial_retrieval} each)")
for idx, query_variant in enumerate(extended_queries):
query_params = {
"query_texts": [query_variant],
"n_results": initial_retrieval
}
if where_clause:
query_params["where"] = where_clause
variant_results = self.collection.query(**query_params)
# Add unique results
if variant_results['documents'] and variant_results['documents'][0]:
for i, doc_text in enumerate(variant_results['documents'][0]):
doc_id = variant_results['ids'][0][i]
if doc_id not in seen_ids:
seen_ids.add(doc_id)
all_retrieved.append({
'id': doc_id,
'text': doc_text,
'metadata': variant_results['metadatas'][0][i],
'distance': variant_results['distances'][0][i] if 'distances' in variant_results else 0
})
logger.info(f"Retrieved {len(all_retrieved)} unique chunks from {len(extended_queries)} query variants")
# Apply manual keyword filtering if provided
if manual_keywords and len(manual_keywords) > 0:
logger.info(f"🔍 KEYWORD FILTERING: Applying with keywords: {manual_keywords}")
filtered_chunks = []
# Step 1: Try documents containing ALL keywords
all_keywords_match = []
for chunk in all_retrieved:
chunk_text_lower = chunk['text'].lower()
if all(keyword.lower() in chunk_text_lower for keyword in manual_keywords):
all_keywords_match.append(chunk)
logger.info(f" • Chunks with ALL keywords: {len(all_keywords_match)}")
filtered_chunks.extend(all_keywords_match)
# Step 2: If not enough results, try documents with ANY keyword
if len(all_keywords_match) < top_k:
logger.info(f" • Looking for chunks with ANY keyword (need {top_k - len(all_keywords_match)} more)")
any_keyword_match = []
seen_ids_in_filtered = {c['id'] for c in filtered_chunks}
for chunk in all_retrieved:
if chunk['id'] in seen_ids_in_filtered:
continue # Skip already included
chunk_text_lower = chunk['text'].lower()
if any(keyword.lower() in chunk_text_lower for keyword in manual_keywords):
any_keyword_match.append(chunk)
logger.info(f" • Additional chunks with ANY keyword: {len(any_keyword_match)}")
filtered_chunks.extend(any_keyword_match)
logger.info(f" ✅ Total chunks after keyword filtering: {len(filtered_chunks)}")
# Check if keyword filtering eliminated all documents
if len(filtered_chunks) == 0:
logger.warning(f" ❌ KEYWORD FILTERING: No chunks match keywords '{', '.join(manual_keywords)}'")
logger.warning(f" 🚫 Respecting filtering constraints - returning limited results")
# Don't completely fail - will fall back to web search if enabled
else:
# Replace all_retrieved with filtered results
all_retrieved = filtered_chunks
logger.info(f" 📊 Keyword filtering reduced {len(seen_ids)} -> {len(all_retrieved)} chunks")
# Web search augmentation (if enabled)
web_context = ""
web_references = [] # Track web search references
if self.enable_web_search and self.web_search:
try:
logger.info("Web search enabled, generating search queries...")
web_search_queries = self._generate_web_search_queries(query, chat_history)
logger.info(f"Executing {len(web_search_queries)} web searches...")
web_results_list = []
web_ref_start = len(all_retrieved) + 1 # Start numbering after document blocks
for i, search_query in enumerate(web_search_queries, 1):
logger.info(f"Web search {i}/{len(web_search_queries)}: {search_query}")
# Get structured results with URLs
search_output = self.web_search.results(search_query)
# Extract URLs and snippets
if isinstance(search_output, dict) and 'organic' in search_output:
for result in search_output['organic'][:3]: # Top 3 results per query
title = result.get('title', 'Web Page')
url = result.get('link', '')
snippet = result.get('snippet', '')
# Add to web references list
web_references.append({
'type': 'web',
'title': title,
'url': url,
'snippet': snippet,
'block_num': web_ref_start + len(web_references)
})
# Add to context
web_results_list.append(
f"[Block {web_ref_start + len(web_references) - 1}] {title}\n{snippet}\nURL: {url}"
)
if web_results_list:
web_context = f"\n\n## Web Search Results:\n" + "\n\n".join(web_results_list) + "\n\n"
logger.info(f"Web search completed, added {len(web_references)} web references")
except Exception as e:
logger.warning(f"Web search failed: {e}")
if not all_retrieved:
# No document results - use web search as fallback if available
if web_context:
logger.info("No document results, using web search only...")
web_prompt = f"""Answer the following question using the web search results below.
Question: {query}
{web_context}
Please provide a clear and concise answer:"""
response = self.main_llm.invoke(web_prompt)
return {
'response': response.content,
'references': [],
'context': [],
'mode': 'basic_rag',
'source': 'web_search'
}
return {
'response': "I couldn't find any relevant information in the documents.",
'context': [],
'mode': 'basic_rag'
}
# Prepare results for reranking
retrieved = []
for item in all_retrieved:
retrieved.append({
'text': item['text'],
'metadata': item['metadata'],
'similarity': 1 - item['distance'], # Convert distance to similarity
'rank': len(retrieved)
})
# Deduplicate chunks to improve diversity
retrieved = self._deduplicate_chunks(retrieved, similarity_threshold=0.85)
# Rerank if enabled
if use_reranking and len(retrieved) > 0:
retrieved = self._rerank_results(query, retrieved)
# Loopback mechanism: if we don't have enough chunks after deduplication, retrieve more
attempts = 0
max_attempts = 3
current_n_results = initial_retrieval
while len(retrieved) < top_k and attempts < max_attempts:
attempts += 1
logger.info(f"Loopback attempt {attempts}: have {len(retrieved)} chunks, need {top_k}")
# Retrieve more chunks
current_n_results = int(current_n_results * 1.5)
query_params["n_results"] = current_n_results
additional_results = self.collection.query(**query_params)
if not additional_results['documents'][0]:
break
# Process additional results
additional_retrieved = []
for i, (doc, meta, distance) in enumerate(zip(
additional_results['documents'][0],
additional_results['metadatas'][0],
additional_results['distances'][0]
)):
additional_retrieved.append({
'text': doc,
'metadata': meta,
'similarity': 1 - distance,
'rank': i
})
# Deduplicate
additional_retrieved = self._deduplicate_chunks(additional_retrieved, similarity_threshold=0.85)
# Rerank if enabled
if use_reranking and len(additional_retrieved) > 0:
additional_retrieved = self._rerank_results(query, additional_retrieved)
retrieved = additional_retrieved
# Take top_k after all processing
retrieved = retrieved[:top_k]
logger.info(f"Final retrieval: {len(retrieved)} chunks")
# Build context
context_blocks = []
for i, item in enumerate(retrieved):
context_blocks.append({
'block_number': i + 1,
'text': item['text'],
'file_name': item['metadata'].get('file_name', 'Unknown'),
'metadata': item['metadata']
})
# Create prompt
context_text = "\n\n".join([
f"[Block {b['block_number']}] (Source: {b['file_name']})\n{b['text']}"
for b in context_blocks
])
# Add web search context if available
if web_context:
context_text = context_text + web_context
prompt = self._create_response_prompt(query, context_text, chat_history, custom_instructions, output_language)
# Generate response
response = self.main_llm.invoke(prompt)
# Format with references (pass web_references too)
formatted = self._format_response_with_references(response.content, context_blocks, web_references)
return {
'response': formatted['response'],
'references': formatted['references'],
'context': context_blocks,
'mode': 'basic_rag',
'num_chunks': len(context_blocks),
'web_results': len(web_references)
}
def mode_2_extensive(self, query: str, top_k: int = 10,
chat_history: List[Dict] = None,
source_filters: List[str] = None,
manual_keywords: List[str] = None,
enable_reference_filtering: bool = True,
reference_threshold: float = 0.3,
custom_instructions: str = None,
output_language: str = 'en') -> Dict[str, Any]:
"""
Mode 2: Extensive mode - retrieve full documents and preprocess
Args:
query: User query
top_k: Number of documents to retrieve
chat_history: Previous conversation
source_filters: List of file paths to restrict search to
manual_keywords: Additional keywords to enhance search
enable_reference_filtering: Apply relevance scoring to filter references
reference_threshold: Minimum relevance score (0.0-1.0) for references
custom_instructions: Optional custom instructions to guide the LLM
output_language: Language code for the response
Returns:
Dictionary with response and context
"""
logger.info(f"Mode 2 - Extensive: {query}")
# Optimize query
search_query = self._optimize_query(query, chat_history)
# Enhance query with manual keywords
if manual_keywords and len(manual_keywords) > 0:
logger.info(f"Enhancing extensive search with {len(manual_keywords)} keywords: {manual_keywords}")
keywords_str = " ".join(manual_keywords)
search_query = f"{search_query} {keywords_str}"
# Build where clause for source filtering
where_clause = None
if source_filters and len(source_filters) > 0:
matching_sources = self.get_matching_sources(source_filters)
if matching_sources:
where_clause = {"file_path": {"$in": matching_sources}}
logger.info(f"Source filter: {len(source_filters)} filters -> {len(matching_sources)} documents")
# Retrieve more chunks to identify documents
query_params = {
"query_texts": [search_query],
"n_results": top_k * 5 # Get more chunks to cover multiple documents
}
if where_clause:
query_params["where"] = where_clause
results = self.collection.query(**query_params)
if not results['documents'][0]:
return {
'response': "I couldn't find any relevant documents.",
'context': [],
'mode': 'extensive'
}
# Group by document
doc_chunks = {}
for doc, meta in zip(results['documents'][0], results['metadatas'][0]):
doc_id = meta.get('doc_id')
if doc_id not in doc_chunks:
doc_chunks[doc_id] = {
'chunks': [],
'file_name': meta.get('file_name', 'Unknown')
}
doc_chunks[doc_id]['chunks'].append({
'text': doc,
'metadata': meta
})
# Get top documents
top_doc_ids = list(doc_chunks.keys())[:top_k]
# Retrieve ALL chunks for these documents
full_documents = []
for doc_id in top_doc_ids:
all_chunks = self.collection.get(
where={"doc_id": doc_id}
)
# Reconstruct document
doc_text = "\n\n".join(all_chunks['documents'])
full_documents.append({
'text': doc_text,
'file_name': doc_chunks[doc_id]['file_name'],
'doc_id': doc_id
})
# Apply manual keyword filtering on full documents (if provided)
if manual_keywords and len(manual_keywords) > 0:
logger.info(f"🔍 EXTENSIVE MODE - KEYWORD FILTERING: Applying to {len(full_documents)} full documents")
logger.info(f" Keywords: {manual_keywords}")
filtered_docs = []
# Step 1: Try documents containing ALL keywords
all_keywords_docs = []
for doc in full_documents:
doc_text_lower = doc['text'].lower()
if all(keyword.lower() in doc_text_lower for keyword in manual_keywords):
all_keywords_docs.append(doc)
logger.info(f" • Documents with ALL keywords: {len(all_keywords_docs)}")
filtered_docs.extend(all_keywords_docs)
# Step 2: If not enough results, try documents with ANY keyword
if len(all_keywords_docs) < top_k:
logger.info(f" • Looking for documents with ANY keyword (need {top_k - len(all_keywords_docs)} more)")
any_keyword_docs = []
seen_doc_ids = {d['doc_id'] for d in filtered_docs}
for doc in full_documents:
if doc['doc_id'] in seen_doc_ids:
continue # Skip already included
doc_text_lower = doc['text'].lower()
if any(keyword.lower() in doc_text_lower for keyword in manual_keywords):
any_keyword_docs.append(doc)
logger.info(f" • Additional documents with ANY keyword: {len(any_keyword_docs)}")
filtered_docs.extend(any_keyword_docs)
logger.info(f" ✅ Total documents after keyword filtering: {len(filtered_docs)}")
# Check if keyword filtering eliminated all documents
if len(filtered_docs) == 0:
logger.warning(f" ❌ KEYWORD FILTERING: No documents match keywords '{', '.join(manual_keywords)}'")
return {
'response': f"I couldn't find any documents matching the keywords: {', '.join(manual_keywords)}",
'context': [],
'mode': 'extensive',
'metadata': {
'keyword_filter_applied': True,
'keywords': manual_keywords,
'documents_before_filter': len(full_documents),
'documents_after_filter': 0
}
}
else:
# Replace full_documents with filtered results
full_documents = filtered_docs
logger.info(f" 📊 Keyword filtering: {len(top_doc_ids)} -> {len(full_documents)} documents")
# Extract relevant information from each document
logger.info(f"Extracting from {len(full_documents)} documents...")
extracted_context = self.extractor.extract_from_multiple_documents(
full_documents, query
)
# Create prompt
prompt = self._create_response_prompt(query, extracted_context, chat_history, custom_instructions, output_language)
# Generate response
response = self.main_llm.invoke(prompt)
# Web search augmentation (if enabled) - for fact-checking and enhancement
web_references = []
if self.enable_web_search and self.web_search:
try:
logger.info("🌐 Web search enabled in extensive mode, generating search queries for fact-checking...")
web_search_queries = self._generate_web_search_queries(query, chat_history)
logger.info(f"Executing {len(web_search_queries)} web searches...")
web_ref_start = len(full_documents) + 1 # Start numbering after document blocks
for i, search_query in enumerate(web_search_queries, 1):
logger.info(f"Web search {i}/{len(web_search_queries)}: {search_query}")
# Get structured results with URLs
search_output = self.web_search.results(search_query)
# Extract URLs and snippets
if isinstance(search_output, dict) and 'organic' in search_output:
for result in search_output['organic'][:3]: # Top 3 results per query
title = result.get('title', 'Web Page')
url = result.get('link', '')
snippet = result.get('snippet', '')
# Add to web references list
web_references.append({
'type': 'web',
'title': title,
'url': url,
'snippet': snippet,
'block_num': web_ref_start + len(web_references)
})
if web_references:
logger.info(f"✅ Web search completed, added {len(web_references)} web references")
except Exception as e:
logger.warning(f"⚠️ Web search failed: {e}")
# Apply reference relevance filtering (post-processing) if enabled
if enable_reference_filtering:
logger.info(f"🎯 Applying post-processing reference relevance filtering (threshold: {reference_threshold})...")
filtered_documents = self.score_reference_relevance(
response.content,
full_documents,
relevance_threshold=reference_threshold
)
if len(filtered_documents) == 0:
logger.warning("All references were filtered out by relevance scoring. Using original documents.")
filtered_documents = full_documents
else:
logger.info(f"Reference filtering: {len(full_documents)} -> {len(filtered_documents)} documents")
else:
logger.info("🔓 Reference filtering disabled - using all documents")
filtered_documents = full_documents
# Format with filtered references (including web references)
formatted = self._format_response_with_references(response.content, filtered_documents, web_references)
return {
'response': formatted['response'],
'references': formatted['references'],
'context': filtered_documents, # Return filtered documents
'extracted_context': extracted_context,
'mode': 'extensive',
'num_documents': len(full_documents),
'num_relevant_documents': len(filtered_documents),
'metadata': {
'documents_retrieved': len(full_documents),
'documents_after_relevance_filter': len(filtered_documents),
'relevance_filtering_applied': enable_reference_filtering,
'web_results': len(web_references)
}
}
def mode_3_full_reading(self, query: str,
chat_history: List[Dict] = None,
progress_callback: Callable = None,
source_filters: List[str] = None,
skip_extraction: bool = False,
enable_reference_filtering: bool = True,
reference_threshold: float = 0.3,
custom_instructions: str = None,
output_language: str = 'en') -> Dict[str, Any]:
"""
Mode 3: Full reading - process ALL documents in collection
Args:
query: User query
chat_history: Previous conversation
progress_callback: Callback for progress updates
source_filters: List of file paths to restrict search to
skip_extraction: If True, skip small LLM preprocessing and send full docs to main LLM
(falls back to extraction only if token limit exceeded)
enable_reference_filtering: Apply relevance scoring to filter references
reference_threshold: Minimum relevance score (0.0-1.0) for references
custom_instructions: Optional custom instructions to guide the LLM
output_language: Language code for the response
Returns:
Dictionary with response and context
"""
logger.info(f"Mode 3 - Full Reading: {query} (skip_extraction={skip_extraction})")
if progress_callback:
progress_callback("Retrieving all documents...")
# Build where clause for source filtering
where_clause = None
if source_filters and len(source_filters) > 0:
matching_sources = self.get_matching_sources(source_filters)
if matching_sources:
where_clause = {"file_path": {"$in": matching_sources}}
logger.info(f"Source filter: {len(source_filters)} filters -> {len(matching_sources)} documents")
# Get all documents (optionally filtered by source)
if where_clause:
all_data = self.collection.get(where=where_clause)
else:
all_data = self.collection.get()
if not all_data['ids']:
return {
'response': "The document collection is empty.",
'context': [],
'mode': 'full_reading'
}
# Group by document
doc_chunks = {}
for doc, meta in zip(all_data['documents'], all_data['metadatas']):
doc_id = meta.get('doc_id')
if doc_id not in doc_chunks:
doc_chunks[doc_id] = {
'chunks': [],
'file_name': meta.get('file_name', 'Unknown')
}
doc_chunks[doc_id]['chunks'].append(doc)
logger.info(f"Processing {len(doc_chunks)} documents...")
# Process each document
all_extractions = []
total_docs = len(doc_chunks)
for i, (doc_id, doc_info) in enumerate(doc_chunks.items()):
if progress_callback:
progress_callback(f"Reading document {i+1}/{total_docs}: {doc_info['file_name']}")
# Reconstruct full document
doc_text = "\n\n".join(doc_info['chunks'])
if skip_extraction:
# Skip extraction - use full document text
extracted = doc_text
logger.info(f"✅ SKIP_EXTRACTION=True: Using full document text for {doc_info['file_name']} ({self.count_tokens(doc_text)} tokens)")
else:
# Extract relevant information using small LLM
logger.info(f"⚙️ SKIP_EXTRACTION=False: Extracting from {doc_info['file_name']} using small LLM")
extracted = self.extractor.extract_from_document(doc_text, query)
if extracted and extracted.strip():
all_extractions.append({
'file_name': doc_info['file_name'],
'extracted': extracted,
'doc_id': doc_id
})
if not all_extractions:
return {
'response': "None of the documents contain relevant information for your query.",
'context': [],
'mode': 'full_reading',
'num_documents': total_docs
}
# Consolidate all extractions
if progress_callback:
progress_callback("Consolidating information from all documents...")
# Create numbered blocks for citation
numbered_blocks = []
for i, ext in enumerate(all_extractions, 1):
numbered_blocks.append(f"[Block {i}] ### From: {ext['file_name']}\n\n{ext['extracted']}")
combined_context = "\n\n---\n\n".join(numbered_blocks)
# Check if combined context fits in token limit
# Use model-specific limits
MODEL_LIMITS = {
'gpt-4o': 128000,
'gpt-4o-mini': 128000,
'azure-gpt-4o': 128000,
'gpt-5': 200000,
'gpt-5-mini': 128000,
'gpt-5-pro': 200000,
'gpt-4': 8192,
'gpt-3.5-turbo': 16385,
'claude-3-5-sonnet-20241022': 200000,
'claude-sonnet-4-5-20250929': 200000
}
model_name = getattr(self.main_llm, 'model_name', config.LLM_MODEL)
max_model_tokens = MODEL_LIMITS.get(model_name, 128000)
# Use half the model's capacity for context (reserve rest for prompt structure and output)
max_context_tokens = max_model_tokens // 2
context_tokens = self.count_tokens(combined_context)
logger.info(f"Combined context: {context_tokens} tokens (limit: {max_context_tokens} for {model_name})")
# If too large, handle based on skip_extraction setting
if context_tokens > max_context_tokens:
if skip_extraction:
# Was using full docs but hit token limit - fall back to extraction
logger.warning(f"Token limit exceeded ({context_tokens} > {max_context_tokens}). Falling back to extraction...")
if progress_callback:
progress_callback("Content too large, extracting relevant parts...")
# Re-process with extraction enabled
all_extractions = []
for i, (doc_id, doc_info) in enumerate(doc_chunks.items()):
doc_text = "\n\n".join(doc_info['chunks'])
extracted = self.extractor.extract_from_document(doc_text, query)
if extracted and extracted.strip():
all_extractions.append({
'file_name': doc_info['file_name'],
'extracted': extracted,
'doc_id': doc_id
})
# Create numbered blocks for citation
numbered_blocks = []
for i, ext in enumerate(all_extractions, 1):
numbered_blocks.append(f"[Block {i}] ### From: {ext['file_name']}\n\n{ext['extracted']}")
combined_context = "\n\n---\n\n".join(numbered_blocks)
# If still too large after extraction, consolidate
if self.count_tokens(combined_context) > max_context_tokens:
if progress_callback:
progress_callback("Synthesizing final answer...")
combined_context = self.extractor._consolidate_extractions(
combined_context, query
)
else:
# Already using extraction, do final consolidation
if progress_callback:
progress_callback("Synthesizing final answer...")
combined_context = self.extractor._consolidate_extractions(
combined_context, query
)
# Create prompt
prompt = self._create_response_prompt(query, combined_context, chat_history, custom_instructions, output_language)
# Generate response
if progress_callback:
progress_callback("Generating final response...")
response = self.main_llm.invoke(prompt)
# Web search augmentation (if enabled) - for fact-checking and enhancement
web_references = []
if self.enable_web_search and self.web_search:
try:
if progress_callback:
progress_callback("Performing web search for fact-checking...")
logger.info("🌐 Web search enabled in full reading mode, generating search queries for fact-checking...")
web_search_queries = self._generate_web_search_queries(query, chat_history)
logger.info(f"Executing {len(web_search_queries)} web searches...")
web_ref_start = len(all_extractions) + 1 # Start numbering after document blocks
for i, search_query in enumerate(web_search_queries, 1):
logger.info(f"Web search {i}/{len(web_search_queries)}: {search_query}")
# Get structured results with URLs
search_output = self.web_search.results(search_query)
# Extract URLs and snippets
if isinstance(search_output, dict) and 'organic' in search_output:
for result in search_output['organic'][:3]: # Top 3 results per query
title = result.get('title', 'Web Page')
url = result.get('link', '')
snippet = result.get('snippet', '')
# Add to web references list
web_references.append({
'type': 'web',
'title': title,
'url': url,
'snippet': snippet,
'block_num': web_ref_start + len(web_references)
})
if web_references:
logger.info(f"✅ Web search completed, added {len(web_references)} web references")
except Exception as e:
logger.warning(f"⚠️ Web search failed: {e}")
# Apply reference relevance filtering (post-processing) if enabled
if enable_reference_filtering:
if progress_callback:
progress_callback("Filtering references by relevance...")
logger.info(f"🎯 Applying post-processing reference relevance filtering for full reading mode (threshold: {reference_threshold})...")
context_blocks = [{'file_name': ext['file_name'], 'text': ext['extracted'], 'metadata': {'doc_id': ext['doc_id']}}
for ext in all_extractions]
filtered_blocks = self.score_reference_relevance(
response.content,
context_blocks,
relevance_threshold=reference_threshold
)
if len(filtered_blocks) == 0:
logger.warning("All references were filtered out by relevance scoring. Using original documents.")
filtered_blocks = context_blocks
else:
logger.info(f"Reference filtering: {len(context_blocks)} -> {len(filtered_blocks)} documents")
else:
logger.info("🔓 Reference filtering disabled - using all documents")
context_blocks = [{'file_name': ext['file_name'], 'text': ext['extracted'], 'metadata': {'doc_id': ext['doc_id']}}
for ext in all_extractions]
filtered_blocks = context_blocks
# Format with filtered references (including web references)
formatted = self._format_response_with_references(response.content, filtered_blocks, web_references)
return {
'response': formatted['response'],
'references': formatted['references'],
'context': all_extractions,
'consolidated_context': combined_context,
'mode': 'full_reading',
'num_documents': total_docs,
'num_relevant_documents': len(filtered_blocks),
'metadata': {
'documents_read': total_docs,
'documents_after_relevance_filter': len(filtered_blocks),
'relevance_filtering_applied': enable_reference_filtering,
'web_results': len(web_references)
}
}
def _create_response_prompt(self, query: str, context: str,
chat_history: List[Dict] = None,
custom_instructions: str = None,
output_language: str = 'en',
web_context: str = None) -> str:
"""
Create structured, expert-level prompt for final response generation.
Uses vice_ai-style structured sections with step-by-step processing instructions.
Args:
query: User query
context: Document context
chat_history: Previous conversation
custom_instructions: Optional custom instructions from user (markdown formatted)
output_language: Language code for the response (e.g., 'en', 'nl', 'fr')
web_context: Optional web search results
Returns:
Properly sized prompt that fits within model limits
"""
# Track large LLM usage
self.large_llm_usage += 1
logger.info(f"🎯 Creating structured response prompt (Large LLM call #{self.large_llm_usage})")
# Model-specific token limits (input + output)
MODEL_LIMITS = {
'gpt-4o': 128000,
'gpt-4o-mini': 128000,
'azure-gpt-4o': 128000,
'gpt-5': 200000,
'gpt-5-mini': 128000,
'gpt-5-pro': 200000,
'gpt-4': 8192,
'gpt-3.5-turbo': 16385,
'claude-3-5-sonnet-20241022': 200000,
'claude-sonnet-4-5-20250929': 200000
}
# Get current model name
model_name = getattr(self.main_llm, 'model_name', config.LLM_MODEL)
max_model_tokens = MODEL_LIMITS.get(model_name, 128000)
# Reserve tokens for output and safety margin
reserved_for_output = config.MAX_OUTPUT_TOKENS # 4096
safety_margin = 1000 # Extra buffer
max_input_tokens = max_model_tokens - reserved_for_output - safety_margin
logger.info(f" Model: {model_name}, Max input tokens: {max_input_tokens}")
# Build history section with enhanced formatting
history_section = ""
history_tokens = 0
if chat_history and len(chat_history) > 0:
logger.info(f" 📚 Integrating {len(chat_history)} chat history messages into main prompt")
# Truncate very long messages in history to prevent token explosion
max_history_msg_tokens = 500 # Limit each history message to 500 tokens
history_messages = []
for idx, msg in enumerate(chat_history[-6:], 1): # Last 6 messages
role_label = 'User' if msg['role'] == 'user' else 'Assistant'
content = msg['content']
# Truncate long messages
msg_tokens = self.count_tokens(content)
if msg_tokens > max_history_msg_tokens:
content = self._truncate_to_tokens(content, max_history_msg_tokens)
logger.info(f" ✂️ Truncated history message {idx} from {msg_tokens} to {max_history_msg_tokens} tokens")
history_messages.append(f"{role_label}: {content}")
history_text = "\n\n".join(history_messages) # Double newline for better separation
history_section = history_text
history_tokens = self.count_tokens(history_section)
# Additional safeguard: cap total history tokens
max_total_history_tokens = 3000 # Maximum tokens for entire history section
if history_tokens > max_total_history_tokens:
logger.warning(f" ⚠️ History too large ({history_tokens} tokens), truncating to {max_total_history_tokens}")
history_section = self._truncate_to_tokens(history_section, max_total_history_tokens)
history_tokens = self.count_tokens(history_section)
logger.info(f" Chat history tokens: {history_tokens}")
# Build custom instructions section with prominence
custom_instructions_section = ""
custom_instructions_tokens = 0
if custom_instructions and custom_instructions.strip():
logger.info(f" 📋 Including custom instructions with high prominence")
custom_instructions_section = custom_instructions.strip()
custom_instructions_tokens = self.count_tokens(custom_instructions_section)
logger.info(f" Custom instructions tokens: {custom_instructions_tokens}")
# Get language name for instructions
language_name = config.LANGUAGE_NAMES.get(output_language, 'English')
language_instruction = f"- **CRITICAL: You MUST provide your entire answer in {language_name}. Do not use English unless specifically asked.**\n" if output_language != 'en' else ""
# Build structured prompt with expert-level instructions (vice_ai style)
# Step 1: System instructions
prompt_parts = []
prompt_parts.append(self.PROMPT_TEMPLATES["expert_system"]["instructions"])
prompt_parts.append("\n\n")
# Step 2: Build step-by-step processing instructions
processing_steps = []
step_num = 1
# Always have document context
processing_steps.append(f"**Step {step_num}** - [Document Context]: {self.PROMPT_TEMPLATES['step_instructions']['document_context']}")
step_num += 1
# Add custom instructions step if provided
if custom_instructions_section:
processing_steps.append(f"**Step {step_num}** - [Custom Instructions]: {self.PROMPT_TEMPLATES['step_instructions']['custom_instructions']}")
step_num += 1
# Add web search step if provided
if web_context:
processing_steps.append(f"**Step {step_num}** - [Web Search Results]: {self.PROMPT_TEMPLATES['step_instructions']['web_search']}")
step_num += 1
# Add chat history step if available
if history_section:
processing_steps.append(f"**Step {step_num}** - [Previous Conversation]: {self.PROMPT_TEMPLATES['step_instructions']['chat_history']}")
step_num += 1
prompt_parts.append("[Processing Steps]:\n")
prompt_parts.append("\n".join(processing_steps))
prompt_parts.append("\n\n")
# Step 3: Output constraints
prompt_parts.append(self.PROMPT_TEMPLATES["expert_system"]["output_constraints"])
if language_instruction:
prompt_parts.append(f"\n{language_instruction}")
prompt_parts.append("\n\n")
# Step 4: Data sections (structured and labeled)
data_section_num = 1
# Always include document context
prompt_parts.append(f"[Data Section {data_section_num}] - Document Context:\n")
prompt_parts.append("---\n")
prompt_parts.append(context)
prompt_parts.append("\n---\n\n")
data_section_num += 1
# Include custom instructions if provided
if custom_instructions_section:
prompt_parts.append(f"[Data Section {data_section_num}] - Custom Instructions:\n")
prompt_parts.append("---\n")
prompt_parts.append(custom_instructions_section)
prompt_parts.append("\n---\n\n")
data_section_num += 1
# Include web search results if provided
if web_context:
prompt_parts.append(f"[Data Section {data_section_num}] - Web Search Results:\n")
prompt_parts.append("---\n")
prompt_parts.append(web_context)
prompt_parts.append("\n---\n\n")
data_section_num += 1
# Include chat history if available
if history_section:
prompt_parts.append(f"[Data Section {data_section_num}] - Previous Conversation:\n")
prompt_parts.append("---\n")
prompt_parts.append(history_section)
prompt_parts.append("\n---\n\n")
data_section_num += 1
# Step 5: User query
prompt_parts.append(f"[User Query]:\n{query}\n\n")
prompt_parts.append("[Your Expert Response]:\n")
# Combine all parts
base_prompt_template = "".join(prompt_parts)
# Count tokens for the constructed prompt template
prompt_tokens = self.count_tokens(base_prompt_template)
logger.info(f" 📊 Token budget analysis:")
logger.info(f" Total prompt: {prompt_tokens} tokens")
logger.info(f" Model limit: {max_input_tokens} tokens")
logger.info(f" History: {history_tokens} tokens")
logger.info(f" Custom instructions: {custom_instructions_tokens} tokens")
# Check if prompt fits within limits
if prompt_tokens > max_input_tokens:
logger.warning(f" ⚠️ Prompt exceeds limit: {prompt_tokens} > {max_input_tokens}")
logger.warning(f" Applying intelligent truncation strategy...")
# Calculate how much we need to reduce
overflow = prompt_tokens - max_input_tokens
target_reduction = int(overflow * 1.2) # Reduce 20% more for safety
# Try intelligent summarization with small LLM
logger.info(f" 🤖 Using small LLM for context summarization (target reduction: {target_reduction} tokens)")
self.small_llm_usage["summarization"] += 1
self.small_llm_usage["total_calls"] += 1
try:
# Calculate available space for context after summarization
context_tokens = self.count_tokens(context)
available_for_context = context_tokens - target_reduction
if available_for_context < 1000:
# Emergency: very little space, just truncate
logger.warning(f" Emergency truncation: available space < 1000 tokens")
context = self._truncate_to_tokens(context, available_for_context)
else:
# Smart truncation: use small LLM to summarize
summarization_prompt = f"""Summarize the following context to approximately {available_for_context} tokens while preserving key information relevant to the query: "{query}"
Context:
{context}
Summarized context (max {available_for_context} tokens):"""
summary = self.small_llm.invoke(summarization_prompt)
summarized_context = summary.content
# Verify summary fits
summary_tokens = self.count_tokens(summarized_context)
if summary_tokens <= available_for_context:
context = summarized_context
logger.info(f" ✅ Context summarized: {context_tokens} -> {summary_tokens} tokens")
else:
# Fallback: hard truncate
context = self._truncate_to_tokens(context, available_for_context)
logger.warning(f" ⚠️ Summary still too large ({summary_tokens} tokens), using hard truncation")
except Exception as e:
logger.error(f" ❌ Summarization failed: {e}, using hard truncation")
context = self._truncate_to_tokens(context, available_for_context)
# Rebuild prompt with truncated context
prompt_parts = []
prompt_parts.append(self.PROMPT_TEMPLATES["expert_system"]["instructions"])
prompt_parts.append("\n\n[Processing Steps]:\n")
prompt_parts.append("\n".join(processing_steps))
prompt_parts.append("\n\n")
prompt_parts.append(self.PROMPT_TEMPLATES["expert_system"]["output_constraints"])
if language_instruction:
prompt_parts.append(f"\n{language_instruction}")
prompt_parts.append("\n\n")
# Data sections with truncated context
data_section_num = 1
prompt_parts.append(f"[Data Section {data_section_num}] - Document Context:\n---\n")
prompt_parts.append(context)
prompt_parts.append("\n---\n\n")
data_section_num += 1
if custom_instructions_section:
prompt_parts.append(f"[Data Section {data_section_num}] - Custom Instructions:\n---\n")
prompt_parts.append(custom_instructions_section)
prompt_parts.append("\n---\n\n")
data_section_num += 1
if web_context:
prompt_parts.append(f"[Data Section {data_section_num}] - Web Search Results:\n---\n")
prompt_parts.append(web_context)
prompt_parts.append("\n---\n\n")
data_section_num += 1
if history_section:
prompt_parts.append(f"[Data Section {data_section_num}] - Previous Conversation:\n---\n")
prompt_parts.append(history_section)
prompt_parts.append("\n---\n\n")
data_section_num += 1
prompt_parts.append(f"[User Query]:\n{query}\n\n")
prompt_parts.append("[Your Expert Response]:\n")
prompt = "".join(prompt_parts)
else:
# Prompt fits, use as-is
prompt = base_prompt_template
# Final safety check
final_tokens = self.count_tokens(prompt)
if final_tokens > max_input_tokens:
logger.error(
f" 🚨 CRITICAL: Prompt still exceeds limit ({final_tokens} > {max_input_tokens}). "
f"Emergency truncation required."
)
# Last resort: truncate entire prompt
prompt = self._truncate_to_tokens(prompt, max_input_tokens)
final_tokens = self.count_tokens(prompt)
logger.info(f" ✅ Final prompt ready: {final_tokens} tokens (limit: {max_input_tokens})")
logger.info(f" 📊 Usage summary - Small LLM: {self.small_llm_usage['total_calls']} calls, Large LLM: {self.large_llm_usage} calls")
return prompt
def _truncate_to_tokens(self, text: str, max_tokens: int) -> str:
"""
Truncate text to fit within token limit
Args:
text: Text to truncate
max_tokens: Maximum tokens
Returns:
Truncated text
"""
tokens = self.tokenizer.encode(text)
if len(tokens) <= max_tokens:
return text
# Truncate tokens and decode back
truncated_tokens = tokens[:max_tokens]
truncated_text = self.tokenizer.decode(truncated_tokens)
return truncated_text + "\n\n[... context truncated due to length ...]"
def query_deep_reflection(self, query: str,
base_mode: str = "extensive",
reflection_iterations: int = 2,
use_reranking: bool = True,
top_k: int = 5,
enable_memory: bool = True,
source_filters: List[str] = None,
manual_keywords: List[str] = None,
enable_web_search: bool = False,
enable_reference_filtering: bool = True,
reference_threshold: float = 0.3,
custom_instructions: str = None,
output_language: str = 'en',
chat_history: List[Dict] = None,
progress_callback: Callable = None,
skip_extraction: bool = False,
override_sources: bool = False) -> Dict[str, Any]:
"""
Mode 4 - Deep Reflection: Iterative refinement with multi-pass analysis
Process:
1. Run initial query cycle using selected base mode (basic/extensive/full_reading)
2. Analyze the response and generate refinement queries
3. Execute refinement queries to gather additional context
4. Synthesize all information into improved answer
5. Repeat for specified iterations
Args:
query: User's question
base_mode: Underlying mode to use ('basic', 'extensive', 'full_reading')
reflection_iterations: Number of refinement cycles (1-5)
... (other standard parameters)
Returns:
Dictionary with refined response and aggregated references
"""
logger.info(f"Mode 4 - Deep Reflection: {query} (base_mode={base_mode}, iterations={reflection_iterations})")
# Log override_sources setting
if override_sources:
logger.info(f"🌐 Override sources enabled: refinement queries will use basic RAG on full dataset")
else:
logger.info(f"📚 Override sources disabled: refinement queries will use base_mode={base_mode} with source filters")
# Validate iterations and base mode
reflection_iterations = max(1, min(5, reflection_iterations))
if base_mode not in ['basic', 'extensive', 'full_reading']:
logger.warning(f"Invalid base_mode '{base_mode}', defaulting to 'extensive'")
base_mode = 'extensive'
# Track all context and references across iterations
all_context = []
all_references = []
iteration_insights = []
# Initial query - use selected base mode
logger.info(f"🔄 Reflection Iteration 1/{reflection_iterations + 1}: Initial query (base_mode={base_mode})")
if base_mode == 'basic':
current_result = self.mode_1_basic_rag(
query=query,
use_reranking=use_reranking,
top_k=top_k,
chat_history=chat_history if enable_memory else None,
source_filters=source_filters,
manual_keywords=manual_keywords,
custom_instructions=custom_instructions,
output_language=output_language
)
elif base_mode == 'extensive':
current_result = self.mode_2_extensive(
query=query,
top_k=top_k,
chat_history=chat_history if enable_memory else None,
source_filters=source_filters,
manual_keywords=manual_keywords,
enable_reference_filtering=False, # Disable filtering during iterations
custom_instructions=custom_instructions,
output_language=output_language
)
else: # full_reading
current_result = self.mode_3_full_reading(
query=query,
chat_history=chat_history if enable_memory else None,
progress_callback=progress_callback,
source_filters=source_filters,
skip_extraction=skip_extraction,
enable_reference_filtering=False,
custom_instructions=custom_instructions,
output_language=output_language
)
# Store initial results
current_answer = current_result['response']
all_context.extend(current_result.get('context', []))
all_references.extend(current_result.get('references', []))
iteration_insights.append({
'iteration': 1,
'answer': current_answer,
'num_documents': current_result.get('num_documents', 0)
})
logger.info(f" ✅ Initial answer generated ({len(current_answer)} chars, {len(all_context)} docs)")
if progress_callback:
progress_callback(f"✅ Iteration 1/{reflection_iterations + 1} complete - Generated initial answer ({len(all_context)} docs)")
# Iterative refinement
for iteration in range(reflection_iterations):
iteration_num = iteration + 2
logger.info(f"🔄 Reflection Iteration {iteration_num}/{reflection_iterations + 1}: Refinement cycle")
if progress_callback:
progress_callback(f"🔄 Starting iteration {iteration_num}/{reflection_iterations + 1} - Self-critiquing previous answer...")
# Generate refinement queries based on self-critique of current answer
refinement_queries = self._generate_refinement_queries_with_critique(
query, current_answer, chat_history, iteration_insights, custom_instructions
)
logger.info(f" Generated {len(refinement_queries)} refinement queries")
if progress_callback:
progress_callback(f"📝 Iteration {iteration_num}/{reflection_iterations + 1} - Generated {len(refinement_queries)} refinement queries")
# Execute refinement queries to gather additional context
refinement_context = []
for i, ref_query in enumerate(refinement_queries, 1):
logger.info(f" Executing refinement query {i}/{len(refinement_queries)}: {ref_query}")
if progress_callback:
progress_callback(f"🔍 Iteration {iteration_num}/{reflection_iterations + 1} - Executing query {i}/{len(refinement_queries)}: {ref_query[:60]}...")
# Determine mode and filters for refinement queries
if override_sources:
# Override: Use basic RAG on full dataset (no source filters, no keywords)
logger.info(f" 🌐 Override sources enabled - using basic RAG on full dataset")
ref_result = self.mode_1_basic_rag(
query=ref_query,
use_reranking=use_reranking,
top_k=top_k,
chat_history=None,
source_filters=None,
manual_keywords=[],
custom_instructions=None,
output_language=output_language
)
# Retrieve additional context using the selected base mode
elif base_mode == 'basic':
ref_result = self.mode_1_basic_rag(
query=ref_query,
use_reranking=use_reranking,
top_k=top_k,
chat_history=None, # Don't use memory for refinement queries
source_filters=source_filters,
manual_keywords=manual_keywords,
custom_instructions=None,
output_language=output_language
)
elif base_mode == 'extensive':
ref_result = self.mode_2_extensive(
query=ref_query,
top_k=top_k,
chat_history=None,
source_filters=source_filters,
manual_keywords=manual_keywords,
enable_reference_filtering=False,
custom_instructions=None,
output_language=output_language
)
else: # full_reading
ref_result = self.mode_3_full_reading(
query=ref_query,
chat_history=None,
progress_callback=None,
source_filters=source_filters,
skip_extraction=skip_extraction,
enable_reference_filtering=False,
custom_instructions=None,
output_language=output_language
)
refinement_context.extend(ref_result.get('context', []))
logger.info(f" Retrieved {len(ref_result.get('context', []))} additional documents")
# Deduplicate context across iterations
unique_context = self._deduplicate_context_blocks(all_context + refinement_context)
logger.info(f" Combined context: {len(all_context)} + {len(refinement_context)} -> {len(unique_context)} unique docs")
all_context = unique_context
# Synthesize refined answer with all accumulated context
refined_answer = self._synthesize_refined_answer(
original_query=query,
previous_answer=current_answer,
iteration_insights=iteration_insights,
all_context=all_context,
custom_instructions=custom_instructions,
output_language=output_language,
chat_history=chat_history
)
logger.info(f" ✅ Refined answer generated ({len(refined_answer)} chars)")
if progress_callback:
progress_callback(f"✅ Iteration {iteration_num}/{reflection_iterations + 1} complete - Refined answer with {len(unique_context)} total docs")
# Update for next iteration
current_answer = refined_answer
iteration_insights.append({
'iteration': iteration_num,
'answer': refined_answer,
'num_documents': len(all_context),
'refinement_queries': refinement_queries
})
logger.info(f"🎯 Deep Reflection complete: {reflection_iterations + 1} total iterations")
if progress_callback:
progress_callback(f"🎯 Deep Reflection complete! Processed {reflection_iterations + 1} iterations with {len(all_context)} documents")
# Apply final reference relevance filtering if enabled
# Skip filtering for basic mode to keep it fast
if enable_reference_filtering and base_mode != 'basic':
logger.info(f"🎯 Applying final reference relevance filtering (threshold: {reference_threshold})...")
filtered_context = self.score_reference_relevance(
current_answer,
all_context,
relevance_threshold=reference_threshold
)
if len(filtered_context) == 0:
logger.warning("All references filtered out. Using all context.")
filtered_context = all_context
else:
logger.info(f"Reference filtering: {len(all_context)} -> {len(filtered_context)} documents")
else:
filtered_context = all_context
# Format final response with references
formatted = self._format_response_with_references(current_answer, filtered_context, [])
return {
'response': formatted['response'],
'references': formatted['references'],
'context': filtered_context,
'mode': 'deep_reflection',
'num_documents': len(all_context),
'num_relevant_documents': len(filtered_context),
'iterations': reflection_iterations + 1,
'iteration_insights': iteration_insights,
'metadata': {
'total_iterations': reflection_iterations + 1,
'documents_retrieved': len(all_context),
'documents_after_relevance_filter': len(filtered_context),
'relevance_filtering_applied': enable_reference_filtering
}
}
def _generate_refinement_queries_with_critique(self, original_query: str, current_answer: str,
chat_history: List[Dict] = None,
iteration_insights: List[Dict] = None,
custom_instructions: str = None) -> List[str]:
"""
Generate refinement queries by first self-critiquing the current answer to identify gaps.
Args:
original_query: The user's original question
current_answer: The current answer that needs refinement
chat_history: Optional conversation history
iteration_insights: History of previous iterations and queries
custom_instructions: Optional user instructions to guide query generation
Returns:
List of 2-4 refinement queries
"""
# Debug: Log what we're working with
logger.info(f"🔍 _generate_refinement_queries called")
logger.info(f"🔍 iteration_insights length: {len(iteration_insights) if iteration_insights else 0}")
if iteration_insights:
for i, insight in enumerate(iteration_insights):
logger.info(f"🔍 Insight {i}: iteration={insight.get('iteration', 'N/A')}, has_queries={('refinement_queries' in insight)}")
# Build context about previous queries
previous_queries_text = ""
if iteration_insights and len(iteration_insights) > 1:
prev_queries = []
for insight in iteration_insights[1:]: # Skip initial iteration
if 'refinement_queries' in insight:
prev_queries.extend(insight['refinement_queries'])
if prev_queries:
previous_queries_text = f"\n\nPrevious queries already executed (DO NOT repeat these):\n" + "\n".join([f"- {q}" for q in prev_queries])
logger.info(f"🔍 Found {len(prev_queries)} previous queries to exclude")
else:
logger.info(f"🔍 No previous queries found in insights")
# Build custom instructions context
custom_context = ""
if custom_instructions and custom_instructions.strip():
custom_context = f"\n\n[User Instructions]\n{custom_instructions.strip()}\n\nIMPORTANT: Generate queries that align with the user's instructions above."
# Build conditional text for custom instructions
user_instr_analysis = "\n6. **User Instructions**: How can we better satisfy the specific requirements in the user's instructions?" if custom_instructions else ""
user_instr_queries = " and align with user's instructions" if custom_instructions else ""
prompt = f"""You are in an ITERATIVE REFINEMENT process to build a comprehensive answer. This is iteration {len(iteration_insights) + 1}.
GOAL OF ITERATION: The current answer is incomplete. Your job is to identify specific gaps and generate queries that will retrieve additional information to fill those gaps. Each iteration should make the answer more complete, detailed, and well-supported.
Original Question: {original_query}{custom_context}
Current Answer (Iteration {len(iteration_insights)}):
{current_answer}{previous_queries_text}
CRITICAL ANALYSIS TASK:
Analyze the current answer thoroughly and identify:
1. **Missing Facts/Data**: What specific numbers, dates, names, or measurements are absent?
2. **Unsupported Claims**: What statements lack citations or evidence?
3. **Incomplete Coverage**: What aspects of the original question are not addressed?
4. **Lack of Detail**: Where could the answer be more specific or comprehensive?
5. **Missing Context**: What background or related information would enhance understanding?{user_instr_analysis}
QUERY GENERATION TASK:
Based on your analysis above, generate 2-4 HIGHLY SPECIFIC search queries that will retrieve the missing information.
QUERY QUALITY REQUIREMENTS:
- Each query must target a SPECIFIC gap you identified
- Be precise - ask for exact data types, concepts, or evidence needed
- Focus on information that will ADD NEW VALUE to the current answer
- Ensure queries are DIFFERENT from previous ones{user_instr_queries}
- Prioritize queries that will most significantly improve the answer
Return ONLY the search queries, one per line, without numbering or explanations."""
try:
logger.info(f"🔍 Sending prompt to LLM (first 500 chars): {prompt[:500]}...")
response = self.small_llm.invoke(prompt)
queries = [q.strip() for q in response.content.strip().split('\n') if q.strip()]
logger.info(f"🔍 LLM returned {len(queries)} queries:")
for i, q in enumerate(queries[:4], 1):
logger.info(f"🔍 Query {i}: {q}")
# Ensure we have 2-4 queries
if len(queries) < 2:
queries.append(f"Additional details about: {original_query}")
if len(queries) < 2:
queries.append(f"Examples and evidence for: {original_query}")
return queries[:4] # Max 4 queries
except Exception as e:
logger.warning(f"Failed to generate refinement queries: {e}")
# Fallback queries
return [
f"Additional information about: {original_query}",
f"Supporting evidence for: {original_query}"
]
def _synthesize_refined_answer(self, original_query: str, previous_answer: str,
iteration_insights: List[Dict], all_context: List[Dict],
custom_instructions: str = None, output_language: str = 'en',
chat_history: List[Dict] = None) -> str:
"""
Synthesize a refined answer incorporating all gathered context and previous insights.
Args:
original_query: The user's original question
previous_answer: The answer from the previous iteration
iteration_insights: History of all iteration answers
all_context: All accumulated context from all iterations
custom_instructions: Optional custom instructions
output_language: Target language for response
chat_history: Optional conversation history
Returns:
Refined answer text
"""
# Prepare context text with block numbers
context_text = "\n\n---\n\n".join([
f"[Block {i+1}] - {block.get('file_name', 'Unknown')}\n{block.get('text') or block.get('content', '')}"
for i, block in enumerate(all_context[:50]) # Limit to top 50 docs
])
# Build chat history context
chat_context = ""
if chat_history and len(chat_history) > 0:
recent_exchanges = []
for i in range(max(0, len(chat_history) - 4), len(chat_history), 2):
if i + 1 < len(chat_history):
user_msg = chat_history[i].get('content', '')[:200]
asst_msg = chat_history[i + 1].get('content', '')[:200]
recent_exchanges.append(f"User: {user_msg}\nAssistant: {asst_msg}")
if recent_exchanges:
chat_context = f"\n\n[Previous Conversation Context]\n" + "\n\n".join(recent_exchanges) + "\n"
# Build custom instructions context
custom_context = ""
if custom_instructions and custom_instructions.strip():
custom_context = f"\n\n[User Instructions - FOLLOW THESE CAREFULLY]\n{custom_instructions.strip()}\n"
# Build conditional text
user_instr_requirement = "\n\n6. **FOLLOW USER INSTRUCTIONS**: Ensure your answer aligns with the specific requirements above" if custom_instructions else ""
# Build prompt
iteration_note = "You have completed the initial retrieval." if len(iteration_insights) == 1 else f"You are in refinement iteration {len(iteration_insights)}. Previous iterations identified gaps which you've now retrieved additional information for."
prompt = f"""ITERATIVE REFINEMENT - SYNTHESIS TASK (Iteration {len(iteration_insights)})
{iteration_note}
Your goal: Create a comprehensive, well-supported answer by SYNTHESIZING information from ALL {len(all_context)} available documents.
Original Question: {original_query}{chat_context}{custom_context}
Previous Answer (Iteration {len(iteration_insights)}):
{previous_answer}
ALL AVAILABLE CONTEXT ({len(all_context)} documents - YOU MUST USE INFORMATION FROM ACROSS THIS FULL RANGE):
{context_text}
SYNTHESIS REQUIREMENTS:
1. **COMPREHENSIVE INTEGRATION**:
- Review ALL {len(all_context)} blocks of context above
- Extract relevant information from EVERY block that adds value
- Don't focus only on Block 1 or recent blocks - USE THE FULL RANGE
2. **SUBSTANTIAL ENHANCEMENT**:
- Add significant new details, facts, and evidence from the documents
- Aim for at least 50% MORE content than the previous answer
- Don't just rephrase - ADD NEW INFORMATION
3. **EVIDENCE-BASED SYNTHESIS**:
- Support every claim with specific citations
- Combine related information from multiple blocks
- Show how different sources corroborate or complement each other
4. **ADDRESS GAPS**:
- Fill in the specific gaps that led to this refinement iteration
- Provide the missing details, evidence, or context identified
5. **MAINTAIN COHERENCE**:
- Organize information logically
- Create smooth transitions between topics
- Build a unified narrative from multiple sources{user_instr_requirement}
CITATION REQUIREMENTS (CRITICAL):
- MANDATORY: Cite documents from across the FULL RANGE (Blocks 1-{len(all_context)})
- Use format: [Block X] where X is the block number
- Examples: "Buffer composition affects stability [Block 5]" | "Studies indicate [Block 7, Block 15, Block 23]"
- When presenting a comprehensive point, cite MULTIPLE relevant blocks
- NEVER use [1], [2] format - ALWAYS [Block X]
- Distribute citations across many different blocks, not just a few
{f"OUTPUT LANGUAGE: {output_language}" if output_language != 'en' else ''}
Generate your comprehensive, well-cited answer now. Remember: USE ALL {len(all_context)} BLOCKS:"""
try:
response = self.main_llm.invoke(prompt)
return response.content.strip()
except Exception as e:
logger.error(f"Failed to synthesize refined answer: {e}")
return previous_answer # Fallback to previous answer
def _deduplicate_context_blocks(self, context_blocks: List[Dict]) -> List[Dict]:
"""
Remove duplicate context blocks based on file name and text similarity.
Args:
context_blocks: List of context dictionaries
Returns:
Deduplicated list of context blocks
"""
seen = {}
unique_blocks = []
for block in context_blocks:
# Create key from file name and text hash
file_name = block.get('file_name', 'unknown')
text = block.get('text') or block.get('content', '')
key = f"{file_name}_{hash(text)}"
if key not in seen:
seen[key] = True
unique_blocks.append(block)
return unique_blocks
def chat(self, query: str, mode: str = "basic",
chat_history: List[Dict] = None,
progress_callback: Callable = None,
source_filters: List[str] = None,
manual_keywords: List[str] = None,
enable_web_search: bool = None,
enable_memory: bool = None,
enable_reference_filtering: bool = True,
reference_threshold: float = 0.3,
custom_instructions: str = None,
output_language: str = 'en',
**kwargs) -> Dict[str, Any]:
"""
Main chat interface supporting all three modes
Args:
query: User query
mode: Operating mode - "basic", "extensive", or "full_reading"
chat_history: Previous conversation
progress_callback: Callback for progress updates
source_filters: List of file paths to restrict search to
manual_keywords: Additional keywords to enhance search
enable_web_search: Override for web search (fallback if no results)
enable_memory: Override for using chat history
enable_reference_filtering: Apply relevance scoring to filter references (extensive/full modes)
reference_threshold: Minimum relevance score (0.0-1.0) for references
custom_instructions: Optional custom instructions (markdown formatted) to guide the LLM
output_language: Language code for the response (e.g., 'en', 'nl', 'fr')
**kwargs: Additional mode-specific parameters
Returns:
Dictionary with response and metadata
"""
# Update config flags if overridden
if enable_web_search is not None:
self.enable_web_search = enable_web_search
if enable_memory is not None:
self.enable_memory = enable_memory
# Prepare chat history for memory
effective_history = chat_history if (self.enable_memory and chat_history) else None
if mode == "basic":
return self.mode_1_basic_rag(
query,
top_k=kwargs.get('top_k', config.DEFAULT_TOP_K),
chat_history=effective_history,
use_reranking=kwargs.get('use_reranking', True),
source_filters=source_filters,
manual_keywords=manual_keywords,
custom_instructions=custom_instructions,
output_language=output_language
)
elif mode == "extensive":
return self.mode_2_extensive(
query,
top_k=kwargs.get('top_k', config.EXTENSIVE_MODE_TOP_K),
chat_history=effective_history,
source_filters=source_filters,
manual_keywords=manual_keywords,
enable_reference_filtering=enable_reference_filtering,
reference_threshold=reference_threshold,
custom_instructions=custom_instructions,
output_language=output_language
)
elif mode == "full_reading":
return self.mode_3_full_reading(
query,
chat_history=effective_history,
progress_callback=progress_callback,
source_filters=source_filters,
skip_extraction=kwargs.get('skip_extraction', False),
enable_reference_filtering=enable_reference_filtering,
reference_threshold=reference_threshold,
custom_instructions=custom_instructions,
output_language=output_language
)
elif mode == "deep_reflection":
return self.query_deep_reflection(
query=query,
base_mode=kwargs.get('base_mode', 'extensive'),
reflection_iterations=kwargs.get('reflection_iterations', 2),
use_reranking=kwargs.get('use_reranking', True),
top_k=kwargs.get('top_k', config.EXTENSIVE_MODE_TOP_K),
enable_memory=self.enable_memory,
source_filters=source_filters,
manual_keywords=manual_keywords,
enable_web_search=self.enable_web_search,
enable_reference_filtering=enable_reference_filtering,
reference_threshold=reference_threshold,
custom_instructions=custom_instructions,
output_language=output_language,
chat_history=effective_history,
progress_callback=progress_callback,
skip_extraction=kwargs.get('skip_extraction', False),
override_sources=kwargs.get('override_sources', False)
)
else:
raise ValueError(f"Unknown mode: {mode}. Use 'basic', 'extensive', 'full_reading', or 'deep_reflection'")
Parameters
| Name | Type | Default | Kind |
|---|---|---|---|
bases |
- | - |
Parameter Details
bases: Parameter of type
Return Value
Returns unspecified type
Class Interface
Methods
__init__(self, collection_name, api_key, system_role, system_expertise, system_domain_context, custom_system_instructions)
Purpose: Initialize RAG engine Args: collection_name: ChromaDB collection name api_key: OpenAI API key system_role: Custom role description (overrides config) system_expertise: Custom expertise description (overrides config) system_domain_context: Custom domain context (overrides config) custom_system_instructions: Complete custom system instructions (overrides everything)
Parameters:
collection_name: Type: strapi_key: Type: strsystem_role: Type: strsystem_expertise: Type: strsystem_domain_context: Type: strcustom_system_instructions: Type: str
Returns: None
_build_prompt_templates(self)
Purpose: Build prompt templates dynamically from config or custom settings Returns: Dictionary of prompt templates
Returns: See docstring for return details
set_model(self, model_name)
Purpose: Dynamically switch the main LLM model during runtime. Args: model_name: Name of the model to switch to
Parameters:
model_name: Type: str
Returns: None
count_tokens(self, text) -> int
Purpose: Count tokens in text
Parameters:
text: Type: str
Returns: Returns int
set_response_callback(self, callback)
Purpose: Set callback for streaming responses
Parameters:
callback: Type: Callable
Returns: None
_rerank_results(self, query, results) -> List[Dict]
Purpose: Rerank results using cross-encoder
Parameters:
query: Type: strresults: Type: List[Dict]
Returns: Returns List[Dict]
_deduplicate_chunks(self, chunks, similarity_threshold) -> List[Dict]
Purpose: Remove duplicate or highly similar chunks to improve diversity Uses simple text similarity to detect near-duplicates Args: chunks: List of chunk dictionaries with 'text' field similarity_threshold: Threshold for considering chunks as duplicates (0-1) Returns: Deduplicated list of chunks
Parameters:
chunks: Type: List[Dict]similarity_threshold: Type: float
Returns: Returns List[Dict]
_format_response_with_references(self, response_text, context_blocks, web_references) -> Dict[str, Any]
Purpose: Format response with inline references like vice_ai Args: response_text: The LLM response context_blocks: List of context blocks with metadata web_references: List of web search results with URLs Returns: Dictionary with formatted response and references
Parameters:
response_text: Type: strcontext_blocks: Type: List[Dict]web_references: Type: List[Dict]
Returns: Returns Dict[str, Any]
_process_references(self, text, blocks_dict) -> tuple
Purpose: Process references in text, converting [Block X] to numerical citations [1, 2, 3] Based on vice_ai's process_references method Args: text: Text containing block references like [Block 1], [Blocks 2-4] blocks_dict: Dictionary mapping block numbers to block data Returns: tuple: (updated_text with numerical citations, list of reference objects)
Parameters:
text: Type: strblocks_dict: Type: Dict[int, Dict]
Returns: Returns tuple
_optimize_query(self, query, chat_history) -> str
Purpose: Optimize user query for better retrieval
Parameters:
query: Type: strchat_history: Type: List[Dict]
Returns: Returns str
get_all_sources(self) -> set
Purpose: Get all unique source paths from the collection. Uses caching to avoid repeated queries. Returns: Set of all source paths
Returns: Returns set
get_matching_sources(self, source_filters) -> List[str]
Purpose: Get all document source paths that match the filters. This is used to build a proper where clause for ChromaDB. Args: source_filters: List of file paths or folder paths Returns: List of matching source paths (for use in where clause)
Parameters:
source_filters: Type: List[str]
Returns: Returns List[str]
_detect_and_translate_query(self, query) -> Dict[str, Any]
Purpose: Detect the language of the query and generate translations in supported languages. Args: query: Original user query Returns: Dictionary with detected_language, original_query, and translations dict
Parameters:
query: Type: str
Returns: Returns Dict[str, Any]
_extend_query(self, query, chat_history) -> List[str]
Purpose: Context-aware query expansion with multi-language support. Leverages conversation history when available for better continuity. Returns list of expanded query variants in multiple languages.
Parameters:
query: Type: strchat_history: Type: List[Dict]
Returns: Returns List[str]
_generate_web_search_queries(self, query, chat_history) -> List[str]
Purpose: Generate optimized web search queries using 2-step prompting. Returns list of 3 search queries.
Parameters:
query: Type: strchat_history: Type: List[Dict]
Returns: Returns List[str]
score_reference_relevance(self, final_answer, reference_documents, relevance_threshold) -> List[Dict]
Purpose: Score the relevance of each reference document against the final answer. Filters out documents below the relevance threshold. Args: final_answer: The generated answer text reference_documents: List of document dictionaries with 'text' and 'file_name' relevance_threshold: Minimum score to include a reference (0.0-1.0) Returns: List of document dictionaries with added 'relevance_score' field, filtered by threshold
Parameters:
final_answer: Type: strreference_documents: Type: List[Dict]relevance_threshold: Type: float
Returns: Returns List[Dict]
mode_1_basic_rag(self, query, top_k, chat_history, use_reranking, source_filters, manual_keywords, custom_instructions, output_language) -> Dict[str, Any]
Purpose: Mode 1: Basic RAG with similarity search Args: query: User query top_k: Number of chunks to retrieve (will be adjusted for multi-language search) chat_history: Previous conversation for query optimization use_reranking: Whether to rerank results source_filters: List of file paths to restrict search to manual_keywords: Additional keywords to enhance search custom_instructions: Optional custom instructions to guide the LLM output_language: Language code for the response (e.g., 'en', 'nl', 'fr') Returns: Dictionary with response and context
Parameters:
query: Type: strtop_k: Type: intchat_history: Type: List[Dict]use_reranking: Type: boolsource_filters: Type: List[str]manual_keywords: Type: List[str]custom_instructions: Type: stroutput_language: Type: str
Returns: Returns Dict[str, Any]
mode_2_extensive(self, query, top_k, chat_history, source_filters, manual_keywords, enable_reference_filtering, reference_threshold, custom_instructions, output_language) -> Dict[str, Any]
Purpose: Mode 2: Extensive mode - retrieve full documents and preprocess Args: query: User query top_k: Number of documents to retrieve chat_history: Previous conversation source_filters: List of file paths to restrict search to manual_keywords: Additional keywords to enhance search enable_reference_filtering: Apply relevance scoring to filter references reference_threshold: Minimum relevance score (0.0-1.0) for references custom_instructions: Optional custom instructions to guide the LLM output_language: Language code for the response Returns: Dictionary with response and context
Parameters:
query: Type: strtop_k: Type: intchat_history: Type: List[Dict]source_filters: Type: List[str]manual_keywords: Type: List[str]enable_reference_filtering: Type: boolreference_threshold: Type: floatcustom_instructions: Type: stroutput_language: Type: str
Returns: Returns Dict[str, Any]
mode_3_full_reading(self, query, chat_history, progress_callback, source_filters, skip_extraction, enable_reference_filtering, reference_threshold, custom_instructions, output_language) -> Dict[str, Any]
Purpose: Mode 3: Full reading - process ALL documents in collection Args: query: User query chat_history: Previous conversation progress_callback: Callback for progress updates source_filters: List of file paths to restrict search to skip_extraction: If True, skip small LLM preprocessing and send full docs to main LLM (falls back to extraction only if token limit exceeded) enable_reference_filtering: Apply relevance scoring to filter references reference_threshold: Minimum relevance score (0.0-1.0) for references custom_instructions: Optional custom instructions to guide the LLM output_language: Language code for the response Returns: Dictionary with response and context
Parameters:
query: Type: strchat_history: Type: List[Dict]progress_callback: Type: Callablesource_filters: Type: List[str]skip_extraction: Type: boolenable_reference_filtering: Type: boolreference_threshold: Type: floatcustom_instructions: Type: stroutput_language: Type: str
Returns: Returns Dict[str, Any]
_create_response_prompt(self, query, context, chat_history, custom_instructions, output_language, web_context) -> str
Purpose: Create structured, expert-level prompt for final response generation. Uses vice_ai-style structured sections with step-by-step processing instructions. Args: query: User query context: Document context chat_history: Previous conversation custom_instructions: Optional custom instructions from user (markdown formatted) output_language: Language code for the response (e.g., 'en', 'nl', 'fr') web_context: Optional web search results Returns: Properly sized prompt that fits within model limits
Parameters:
query: Type: strcontext: Type: strchat_history: Type: List[Dict]custom_instructions: Type: stroutput_language: Type: strweb_context: Type: str
Returns: Returns str
_truncate_to_tokens(self, text, max_tokens) -> str
Purpose: Truncate text to fit within token limit Args: text: Text to truncate max_tokens: Maximum tokens Returns: Truncated text
Parameters:
text: Type: strmax_tokens: Type: int
Returns: Returns str
query_deep_reflection(self, query, base_mode, reflection_iterations, use_reranking, top_k, enable_memory, source_filters, manual_keywords, enable_web_search, enable_reference_filtering, reference_threshold, custom_instructions, output_language, chat_history, progress_callback, skip_extraction, override_sources) -> Dict[str, Any]
Purpose: Mode 4 - Deep Reflection: Iterative refinement with multi-pass analysis Process: 1. Run initial query cycle using selected base mode (basic/extensive/full_reading) 2. Analyze the response and generate refinement queries 3. Execute refinement queries to gather additional context 4. Synthesize all information into improved answer 5. Repeat for specified iterations Args: query: User's question base_mode: Underlying mode to use ('basic', 'extensive', 'full_reading') reflection_iterations: Number of refinement cycles (1-5) ... (other standard parameters) Returns: Dictionary with refined response and aggregated references
Parameters:
query: Type: strbase_mode: Type: strreflection_iterations: Type: intuse_reranking: Type: booltop_k: Type: intenable_memory: Type: boolsource_filters: Type: List[str]manual_keywords: Type: List[str]enable_web_search: Type: boolenable_reference_filtering: Type: boolreference_threshold: Type: floatcustom_instructions: Type: stroutput_language: Type: strchat_history: Type: List[Dict]progress_callback: Type: Callableskip_extraction: Type: booloverride_sources: Type: bool
Returns: Returns Dict[str, Any]
_generate_refinement_queries_with_critique(self, original_query, current_answer, chat_history, iteration_insights, custom_instructions) -> List[str]
Purpose: Generate refinement queries by first self-critiquing the current answer to identify gaps. Args: original_query: The user's original question current_answer: The current answer that needs refinement chat_history: Optional conversation history iteration_insights: History of previous iterations and queries custom_instructions: Optional user instructions to guide query generation Returns: List of 2-4 refinement queries
Parameters:
original_query: Type: strcurrent_answer: Type: strchat_history: Type: List[Dict]iteration_insights: Type: List[Dict]custom_instructions: Type: str
Returns: Returns List[str]
_synthesize_refined_answer(self, original_query, previous_answer, iteration_insights, all_context, custom_instructions, output_language, chat_history) -> str
Purpose: Synthesize a refined answer incorporating all gathered context and previous insights. Args: original_query: The user's original question previous_answer: The answer from the previous iteration iteration_insights: History of all iteration answers all_context: All accumulated context from all iterations custom_instructions: Optional custom instructions output_language: Target language for response chat_history: Optional conversation history Returns: Refined answer text
Parameters:
original_query: Type: strprevious_answer: Type: striteration_insights: Type: List[Dict]all_context: Type: List[Dict]custom_instructions: Type: stroutput_language: Type: strchat_history: Type: List[Dict]
Returns: Returns str
_deduplicate_context_blocks(self, context_blocks) -> List[Dict]
Purpose: Remove duplicate context blocks based on file name and text similarity. Args: context_blocks: List of context dictionaries Returns: Deduplicated list of context blocks
Parameters:
context_blocks: Type: List[Dict]
Returns: Returns List[Dict]
chat(self, query, mode, chat_history, progress_callback, source_filters, manual_keywords, enable_web_search, enable_memory, enable_reference_filtering, reference_threshold, custom_instructions, output_language) -> Dict[str, Any]
Purpose: Main chat interface supporting all three modes Args: query: User query mode: Operating mode - "basic", "extensive", or "full_reading" chat_history: Previous conversation progress_callback: Callback for progress updates source_filters: List of file paths to restrict search to manual_keywords: Additional keywords to enhance search enable_web_search: Override for web search (fallback if no results) enable_memory: Override for using chat history enable_reference_filtering: Apply relevance scoring to filter references (extensive/full modes) reference_threshold: Minimum relevance score (0.0-1.0) for references custom_instructions: Optional custom instructions (markdown formatted) to guide the LLM output_language: Language code for the response (e.g., 'en', 'nl', 'fr') **kwargs: Additional mode-specific parameters Returns: Dictionary with response and metadata
Parameters:
query: Type: strmode: Type: strchat_history: Type: List[Dict]progress_callback: Type: Callablesource_filters: Type: List[str]manual_keywords: Type: List[str]enable_web_search: Type: boolenable_memory: Type: boolenable_reference_filtering: Type: boolreference_threshold: Type: floatcustom_instructions: Type: stroutput_language: Type: str
Returns: Returns Dict[str, Any]
Required Imports
import logging
from typing import List
from typing import Dict
from typing import Any
from typing import Optional
Usage Example
# Example usage:
# result = DocChatRAG(bases)
Tags
Similar Components
AI-powered semantic similarity - components with related functionality:
-
function basic_rag_example 81.0% similar
-
function full_reading_example 75.4% similar
-
function extensive_mode_example 74.8% similar
-
function process_chat_background 68.7% similar
-
function init_engines 67.1% similar