class OneCo_hybrid_RAG_v3
A class named OneCo_hybrid_RAG
/tf/active/vicechatdev/vice_ai/hybrid_rag_engine.py
1205 - 4885
moderate
Purpose
No detailed description available
Source Code
class OneCo_hybrid_RAG ():
def __init__(self):
## Set API keys
self.set_api_keys()
## Define the flow control variables to be exposed and set default values
self.flow_control = {
"pre_model" : ["OpenAi","gpt-4o-mini",0],
"model" : ["OpenAi","gpt-4o",0],
"search_engine" : ["Serper","google"],
"enable_search" : True, # Database search (original search functionality)
"enable_web_search" : False, # Web search via GoogleSerperAPI
"enable_memory" : False,
"memory_max_size" : 3,
"enable_referencing" : True,
"enable_extensive_search" : False, # New extensive search option
"extensive_search_chunks" : 100, # Number of chunks for extensive search
"enable_keyword_filtering" : False, # Keyword filtering option (off by default)
"target_summary_tokens" : 8000, # Target tokens for comprehensive summary
"detail_level" : "Balanced", # Detail level: Summary, Balanced, Detailed, Comprehensive
"manual_keywords" : "", # Manual keywords for filtering
"enable_reference_filtering" : True, # Reference relevance filtering
"relevance_threshold" : 0.3, # Minimum relevance score for references
"detailed_instructions" : "", # Custom detailed instructions for the LLM
}
## Different type of data can be provided here and will be included in the flow
self.data_handles = SimpleDataHandle()
## Small LLM usage tracking
self.small_llm_usage = {
"keyword_extraction": 0,
"query_expansion": 0,
"document_summarization": 0,
"total_calls": 0
}
## Large LLM usage tracking
self.large_llm_usage = 0
## Define the UI elements to be exposed
self.chat_interface=pn.chat.ChatInterface(callback=self.response_callback,width=1200,callback_exception='verbose')
## Plan for chat memory
self.chat_memory = SimpleChatMemory(max_history=self.flow_control["memory_max_size"])
self.extended_query=None
# Set up the blocks_dict for references
self.blocks_dict = {}
self.block_counter = 1
# Explicitly set OpenAI API type for this class
os.environ["OPENAI_API_TYPE"] = "openai"
# Initialize extensive search manager
self.extensive_search_manager = None
# Initialize instruction templates
self.instruction_templates = {
"Default": """Please provide a comprehensive and well-structured response using the available data sources. Format your response in clear Markdown with appropriate headings and sections.""",
"Vaccine Development": """You are an expert in vaccine development and formulation design. Please provide a detailed scientific response that includes:
**Structure Requirements:**
- Use clear scientific headings and subheadings
- Include methodology sections where relevant
- Provide specific technical details and parameters
- Reference regulatory guidelines when applicable
**Content Requirements:**
- Focus on mechanism of action, formulation stability, and manufacturing considerations
- Include safety and efficacy data when available
- Discuss regulatory pathways and requirements
- Consider scalability and commercial viability
**Formatting:**
- Use bullet points for key findings
- Include tables for comparative data when appropriate
- Cite all sources with inline references
- End with a clear summary of key takeaways""",
"Scientific Report": """Generate a comprehensive scientific report with the following structure:
## Executive Summary
Brief overview of key findings and conclusions
## Introduction
Background and context for the analysis
## Methodology
Approach and data sources used
## Results and Analysis
Detailed findings with supporting data
## Discussion
Interpretation of results and implications
## Conclusions
Key takeaways and recommendations
## References
All cited sources
**Formatting Guidelines:**
- Use professional scientific language
- Include quantitative data where available
- Provide statistical analysis when relevant
- Maintain objective tone throughout""",
"Technical Analysis": """Provide a detailed technical analysis focusing on:
**Technical Specifications:**
- Detailed parameters and measurements
- Performance metrics and benchmarks
- Technical constraints and limitations
**Analysis Structure:**
- Problem definition and scope
- Technical approach and methodology
- Detailed results with supporting data
- Risk assessment and mitigation strategies
- Implementation recommendations
**Format Requirements:**
- Use technical terminology appropriately
- Include diagrams or flowcharts concepts when relevant
- Provide step-by-step procedures where applicable
- Include troubleshooting guidance
- Reference industry standards and best practices""",
"Legal Document Analysis": """You are a legal document analyst specializing in precise clause extraction and legal interpretation. Provide a comprehensive legal analysis with the following structure:
## Document Overview
- Document type and classification
- Parties involved and their roles
- Primary legal purpose and scope
## Key Provisions Analysis
**For each significant clause/provision:**
- **Literal Text**: Provide exact verbatim quotations from source documents
- **Section/Article Reference**: Include specific section numbers or clause identifiers
- **Legal Interpretation**: Explain the legal meaning and implications
- **Practical Impact**: Describe real-world consequences and obligations
## Temporal Provisions
**Extract all time-sensitive elements:**
- **Effective Dates**: When provisions take effect
- **Expiration Dates**: When provisions terminate
- **Renewal Terms**: Automatic or manual renewal conditions
- **Notice Periods**: Required advance notice for actions
- **Statute of Limitations**: Time limits for legal actions
- **Performance Deadlines**: Time-bound obligations
## Rights and Obligations Matrix
- **Party A Rights/Obligations**: Clearly delineated responsibilities
- **Party B Rights/Obligations**: Corresponding duties and entitlements
- **Conditional Provisions**: Rights/duties triggered by specific conditions
- **Default Provisions**: Consequences of non-compliance
## Risk Assessment
- **Legal Risks**: Potential liabilities and exposures
- **Compliance Requirements**: Mandatory actions to avoid violations
- **Dispute Resolution**: Mechanisms for handling conflicts
- **Termination Conditions**: Grounds for contract/agreement termination
## Critical Dates Calendar
Present chronological timeline of all important dates identified in the documents.
**Formatting Guidelines:**
- Use precise legal terminology
- Maintain strict accuracy in quotations
- Cross-reference related provisions
- Highlight potential ambiguities or conflicts
- Provide clear section headers for easy navigation
- Include exact citations and page references where available"""
}
# Load custom templates from file if available
self.load_custom_templates()
self.init_connections()
return
def get_instruction_template(self, template_name):
"""Get instruction template by name"""
return self.instruction_templates.get(template_name, "")
def save_instruction_template(self, template_name, instructions):
"""Save a custom instruction template"""
try:
# Add to memory
self.instruction_templates[template_name] = instructions
# Save to file
custom_templates_file = "custom_instruction_templates.json"
custom_templates = {}
# Load existing custom templates
try:
with open(custom_templates_file, 'r') as f:
custom_templates = json.load(f)
except FileNotFoundError:
pass
# Add new template
custom_templates[template_name] = instructions
# Save back to file
with open(custom_templates_file, 'w') as f:
json.dump(custom_templates, f, indent=2)
print(f"ā
Instruction template '{template_name}' saved successfully")
return True
except Exception as e:
print(f"ā Error saving template '{template_name}': {e}")
return False
def load_custom_templates(self):
"""Load custom instruction templates from file"""
try:
custom_templates_file = "custom_instruction_templates.json"
with open(custom_templates_file, 'r') as f:
custom_templates = json.load(f)
# Add custom templates to the main dictionary
self.instruction_templates.update(custom_templates)
print(f"š Loaded {len(custom_templates)} custom instruction templates")
except FileNotFoundError:
print("š No custom instruction templates file found - starting fresh")
except Exception as e:
print(f"ā Error loading custom templates: {e}")
def init_connections(self):
uri = config.DB_ADDR
user, password = config.DB_AUTH
self.driver = GraphDatabase.driver(uri, auth=(user, password))
self.session = self.driver.session(database=config.DB_NAME)
api_key = "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" # Replace with your actual API key
self.chroma_embedder=MyEmbeddingFunction("gpt-4o-mini","text-embedding-3-small",api_key)
self.chroma_client=chromadb.HttpClient(host='vice_chroma', port=8000)
# Get collection names (handle both ChromaDB v0.5.x and v0.6.0+ API changes)
try:
print(f"š Attempting to connect to ChromaDB at vice_chroma:8000...")
collections = self.chroma_client.list_collections()
print(f"ā
ChromaDB connection successful!")
print(f"š Raw collections response: {collections} (type: {type(collections)})")
# Handle ChromaDB API version differences
if collections and isinstance(collections[0], str):
# v0.6.0+: collections are already strings (collection names)
self.available_collections = list(collections)
print(f"ā
ChromaDB v0.6.0+ detected: Found {len(self.available_collections)} collections")
print(f" Collections: {self.available_collections}")
elif collections and hasattr(collections[0], 'name'):
# Older version (v0.5.x): collections are objects with .name attribute
self.available_collections = [col.name for col in collections]
print(f"ā
ChromaDB v0.5.x detected: Found {len(self.available_collections)} collections")
print(f" Collections: {self.available_collections}")
elif collections:
# Handle any other format by trying to convert
try:
self.available_collections = [str(col) for col in collections]
print(f"ā
ChromaDB: Found {len(self.available_collections)} collections (converted to strings)")
print(f" Collections: {self.available_collections}")
except:
self.available_collections = []
print("ā ChromaDB: Could not convert collection list")
else:
self.available_collections = []
print("ā ļø ChromaDB: No collections found - database may be empty")
except Exception as e:
print(f"ā ChromaDB Connection Error: {e}")
print(f" Error type: {type(e).__name__}")
# Suppress the specific v0.6.0 warning we know about
if "v0.6.0" not in str(e) and "list_collections only returns collection names" not in str(e):
print(f"Warning: Could not retrieve ChromaDB collections: {e}")
self.available_collections = []
# Initialize extensive search manager
self.extensive_search_manager = ExtensiveSearchManager(
session=self.session,
chroma_client=self.chroma_client,
api_key=api_key,
rag_instance=self # Pass the RAG instance for usage tracking
)
return
def run_query(self, query, params=None):
"""
Execute a Cypher query and return the result
Parameters
----------
query : str
The Cypher query to execute
params : dict, optional
Parameters for the query
Returns
-------
result
The query result
"""
if params is None:
params = {}
return self.session.run(query, params)
def evaluate_query(self, query, params=None):
"""
Execute a Cypher query and return a single result
Parameters
----------
query : str
The Cypher query to execute
params : dict, optional
Parameters for the query
Returns
-------
object
The single result value
"""
if params is None:
params = {}
result = self.session.run(query, params)
record = result.single()
if record:
return record[0]
return None
def push_changes(self, node):
"""
Push changes to a node to the database
Parameters
----------
node : dict or node-like object
Node with properties to update
"""
# Extract node properties, handling both dict-like and node-like objects
if hasattr(node, 'items'):
# Dict-like object
properties = {k: v for k, v in node.items() if k != 'labels'}
labels = node.get('labels', [])
uid = node.get('UID')
else:
# Node-like object from previous driver
properties = {k: node[k] for k in node.keys() if k != 'UID'}
labels = list(node.labels)
uid = node['UID']
# Construct labels string for Cypher
if labels:
labels_str = ':'.join(labels)
match_clause = f"MATCH (n:{labels_str} {{UID: $uid}})"
else:
match_clause = "MATCH (n {UID: $uid})"
# Update node properties
if properties:
set_clauses = [f"n.`{key}` = ${key}" for key in properties]
query = f"{match_clause} SET {', '.join(set_clauses)}"
params = {"uid": uid, **properties}
self.run_query(query, params)
return
def count_tokens(self,text):
encoding = tiktoken.get_encoding("cl100k_base")
return len(encoding.encode(text))
def set_api_keys(self):
## Public openAI key
os.environ["OPENAI_API_KEY"]='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'
## Serper API key
os.environ["SERPER_API_KEY"] = "9a1f42c99feee69526e216af14e07b64fb4b3bfb"
## AzureOpenAI endpoint
os.environ["AZURE_OPENAI_ENDPOINT"] = "https://vice-llm-2.openai.azure.com/openai/deployments/OneCo-gpt/chat/completions?api-version=2024-08-01-preview"
## AzureOpenAI key
os.environ["AZURE_OPENAI_API_KEY"] = "8DaDtzYz3HePiypmFb6JQmJd3zUCtyCQkiYE8bePRnpyk2YNkJZRJQQJ99BAACfhMk5XJ3w3AAABACOGyJVB"
## Anthropic API key
os.environ["ANTHROPIC_API_KEY"] = "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"
return
def extract_core_query(self,query_text):
"""
Extracts the core information-seeking question from a user query that may contain
both a question and processing instructions for the RAG system.
Args:
query_text: The original user query text
Returns:
dict: Contains the extracted information with keys:
- core_question: The actual information need/question
- instructions: Any processing instructions found
- is_complex: Boolean indicating if query contained instructions
"""
# Use the pre-model (smaller model) for this extraction task
llm = ChatOpenAI(
model=self.flow_control['pre_model'][1],
temperature=self.flow_control['pre_model'][2],
)
# Check if memory is enabled and we have chat history
has_history = self.flow_control.get("enable_memory", False) and len(self.chat_memory.messages) > 0
if has_history:
# Get formatted chat history for context-aware query analysis
chat_history = self.chat_memory.get_formatted_history()
prompt = f"""
You are an AI query analyzer with conversation context awareness. Your task is to analyze the following user query in the context of the conversation history and separate it into:
1. The core information-seeking question (what the user actually wants to know)
2. Any processing instructions (how the user wants information presented or processed)
### Previous Conversation History:
{chat_history}
### Current User Query:
{query_text}
Important: The current query may reference previous topics, use pronouns, or be a follow-up question.
Analyze it in context and expand any ambiguous references using the conversation history.
Output your analysis in strict JSON format:
```json
{{
"core_question": "The main question or information need (expanded with context if needed)",
"instructions": "Any processing instructions (or empty string if none)",
"is_complex": true/false (true if query contains instructions, false if it's just a question)
}}
```
Examples with context:
Previous: "User: Tell me about mRNA vaccines\nAssistant: mRNA vaccines work by..."
Current: "What about their side effects?"
Output:
```json
{{
"core_question": "What are the side effects of mRNA vaccines?",
"instructions": "",
"is_complex": false
}}
```
Previous: "User: Explain vaccine adjuvants\nAssistant: Vaccine adjuvants are..."
Current: "List the main types in a table format"
Output:
```json
{{
"core_question": "What are the main types of vaccine adjuvants?",
"instructions": "present in a table format",
"is_complex": true
}}
```
Only respond with the JSON output, nothing else.
"""
else:
prompt = f"""
You are an AI query analyzer. Your task is to analyze the following user query and separate it into:
1. The core information-seeking question (what the user actually wants to know)
2. Any processing instructions (how the user wants information presented or processed)
User query: {query_text}
Output your analysis in strict JSON format:
```json
{{
"core_question": "The main question or information need",
"instructions": "Any processing instructions (or empty string if none)",
"is_complex": true/false (true if query contains instructions, false if it's just a question)
}}
```
Examples:
Input: "Tell me about mRNA vaccines and format the answer with bullet points"
Output:
```json
{{
"core_question": "Tell me about mRNA vaccines",
"instructions": "format the answer with bullet points",
"is_complex": true
}}
```
Input: "What are the main types of vaccine adjuvants?"
Output:
```json
{{
"core_question": "What are the main types of vaccine adjuvants?",
"instructions": "",
"is_complex": false
}}
```
Only respond with the JSON output, nothing else.
"""
response = llm.invoke(prompt)
try:
# Extract JSON from response if needed
content = response.content
if '```json' in content:
content = content.split('```json')[1].split('```')[0].strip()
elif '```' in content:
content = content.split('```')[1].split('```')[0].strip()
result = json.loads(content)
# Log whether context was used
if has_history:
print(f" š Context-aware query analysis completed")
print(f" š Original query: '{query_text}'")
print(f" šÆ Enhanced core question: '{result.get('core_question', query_text)}'")
return result
except Exception as e:
# Fallback if parsing fails
print(f"Error parsing LLM response: {e}")
return {
"core_question": query_text,
"instructions": "",
"is_complex": False
}
def extract_serper_results(self, serper_response):
"""
Extract formatted search results and URLs from GoogleSerperAPI response.
Args:
serper_response: Raw response from GoogleSerperAPI (JSON object or string)
Returns:
tuple: (formatted_results, extracted_urls)
"""
search_results = ""
extracted_urls = []
try:
# Convert to dict if it's a string
if isinstance(serper_response, str):
try:
data = json.loads(serper_response)
except json.JSONDecodeError as e:
print(f"Error parsing Serper JSON: {e}")
return "Error processing search results.", []
else:
# It's already a dict/object
data = serper_response
# Add search query to the results
if 'searchParameters' in data and 'q' in data['searchParameters']:
search_query = data['searchParameters']['q']
search_results += f"### Search Results for: '{search_query}'\n\n"
else:
search_results += "### Search Results\n\n"
# Process organic search results
if 'organic' in data and isinstance(data['organic'], list):
for i, result in enumerate(data['organic']):
title = result.get('title', 'No title')
link = result.get('link', '')
snippet = result.get('snippet', '')
date = result.get('date', '')
# Format the result with block reference
block_num = self.block_counter + len(extracted_urls)
search_results += f"[block {block_num}] **{title}**\n"
if date:
search_results += f"*{date}*\n"
search_results += f"{snippet}\n"
search_results += f"URL: {link}\n\n"
# Add to extracted URLs
extracted_urls.append({
'title': title,
'url': link,
'snippet': snippet,
'date': date
})
# Process "People Also Ask" section
if 'peopleAlsoAsk' in data and isinstance(data['peopleAlsoAsk'], list):
search_results += "#### People Also Ask\n\n"
for i, qa in enumerate(data['peopleAlsoAsk']):
question = qa.get('question', '')
snippet = qa.get('snippet', '')
title = qa.get('title', '')
link = qa.get('link', '')
# Format the result with block reference
block_num = self.block_counter + len(extracted_urls)
search_results += f"[block {block_num}] **{question}**\n"
search_results += f"*{title}*\n"
search_results += f"{snippet}\n"
search_results += f"URL: {link}\n\n"
# Add to extracted URLs
extracted_urls.append({
'title': f"{question} - {title}",
'url': link,
'snippet': snippet
})
# If no results were found
if not extracted_urls:
search_results += "No search results were found.\n"
except Exception as e:
print(f"Error extracting Serper results: {e}")
search_results = "Error processing search results.\n"
return search_results, extracted_urls
def response_callback(self, query, progress_callback=None):
print("=" * 80)
print(f"š Starting response callback for query: '{query[:100]}{'...' if len(query) > 100 else ''}'")
print("=" * 80)
# Get detail level from flow control
detail_level = self.flow_control.get("detail_level", "Balanced")
print(f"š Detail level: {detail_level}")
# Reset small LLM usage tracking for this query
self.small_llm_usage = {
"keyword_extraction": 0,
"query_expansion": 0,
"document_summarization": 0,
"total_calls": 0
}
## We make a difference between the search enabled or disabled mode - the first will have 2 separate LLM calls.
## Common part - prepare the data
print("š Step 1: Extracting core query...")
query_analysis = self.extract_core_query(query)
print(f"ā
Core query extracted: {query_analysis}")
search_query = query_analysis["core_question"]
print(f"š Search query: {search_query}")
# Store the analysis for later use in processing
self.current_query_analysis = query_analysis
# Parse handler using the core question for retrieval
print("š Step 2: Gathering data from all sources...")
data_sections = self.parse_handler(search_query, detail_level)
print(f"ā
Data gathering complete. Found {len(data_sections)} data sections")
## prepare LLM following flow control
print("š¤ Step 3: Preparing main LLM...")
if self.flow_control['model'][0]=="OpenAi":
llm = ChatOpenAI(
model=self.flow_control['model'][1],
temperature=self.flow_control['model'][2],
timeout=None,
max_retries=2)
print(f"š¤ Main LLM: OpenAI {self.flow_control['model'][1]} (temp: {self.flow_control['model'][2]})")
elif self.flow_control['model'][0]=="Azure":
llm = AzureChatOpenAI(
azure_deployment=self.flow_control['model'][1],
api_version=self.flow_control['model'][3],
temperature=self.flow_control['model'][2],
max_tokens=2500,
timeout=None,
max_retries=2)
print(f"š¤ Main LLM: Azure {self.flow_control['model'][1]} (temp: {self.flow_control['model'][2]})")
elif self.flow_control['model'][0]=="Anthropic":
# Use native Anthropic SDK (not langchain) - same as SmartStat
try:
import anthropic
anthropic_key = os.environ.get("ANTHROPIC_API_KEY")
if not anthropic_key:
raise Exception("Anthropic API key not configured")
# Create a wrapper class that matches langchain's interface
class AnthropicLLM:
def __init__(self, model, temperature, api_key):
self.client = anthropic.Anthropic(api_key=api_key)
self.model = model
self.temperature = temperature
def invoke(self, prompt):
if isinstance(prompt, str):
messages = [{"role": "user", "content": prompt}]
else:
# Handle langchain message format
messages = [{"role": "user", "content": str(prompt)}]
response = self.client.messages.create(
model=self.model,
max_tokens=4096,
temperature=self.temperature,
messages=messages
)
# Create response object that matches langchain format
class Response:
def __init__(self, text):
self.content = text
return Response(response.content[0].text)
llm = AnthropicLLM(
model=self.flow_control['model'][1],
temperature=self.flow_control['model'][2],
api_key=anthropic_key
)
print(f"š¤ Main LLM: Anthropic {self.flow_control['model'][1]} (temp: {self.flow_control['model'][2]})")
except ImportError:
print("ā ļø Anthropic library not installed, falling back to OpenAI")
llm = ChatOpenAI(model='gpt-4o', temperature=0, timeout=None, max_retries=2)
else:
llm = ChatOpenAI(
model='gpt-4o',
temperature=0,
timeout=None,
max_retries=2)
print("š¤ Main LLM: OpenAI gpt-4o (default fallback)")
## Search enabled mode
self.search_results = ""
intermediate_answer = None # Store answer between cycles
if self.flow_control.get("enable_web_search", False):
num_queries = self.flow_control.get("web_search_queries", 3)
num_cycles = self.flow_control.get("web_search_cycles", 1)
print(f"š Step 4: Web search enabled - {num_queries} queries per cycle, {num_cycles} cycle(s)")
search_tool = GoogleSerperAPIWrapper()
web_ref_count = self.block_counter
all_cycle_results = []
# Iterative search cycles
for cycle in range(num_cycles):
print(f"\nš === Search Cycle {cycle + 1}/{num_cycles} ===")
# Report progress if callback provided
if progress_callback:
progress_callback({
'cycle': cycle + 1,
'total_cycles': num_cycles,
'progress': int(((cycle) / num_cycles) * 85) + 10, # 10-95% range
'message': f'Cycle {cycle + 1}/{num_cycles}: Generating answer...'
})
# Step 1: If not the first cycle, generate improved answer with accumulated web results
if cycle > 0:
print(f"š§ Step 4.{cycle+1}.1: Generating improved answer with cycle {cycle} results...")
# Re-generate answer with accumulated web search results
enriched_prompt = self.generate_prompt("Vaccine_base", data_sections, query)
intermediate_answer = llm.invoke(enriched_prompt)
print(f"ā
Improved answer generated ({len(intermediate_answer.content)} chars)")
print(f"š Answer preview: {intermediate_answer.content[:300]}...")
else:
# First cycle: Generate initial answer with available data
print(f"š§ Step 4.1: Generating initial answer with available data...")
initial_prompt = self.generate_prompt("Vaccine_base", data_sections, query)
intermediate_answer = llm.invoke(initial_prompt)
print(f"ā
Initial answer generated ({len(intermediate_answer.content)} chars)")
print(f"š Answer preview: {intermediate_answer.content[:300]}...")
# Step 2: Analyze the answer to identify what web searches could improve it
print(f"š§ Step 4.{cycle+1}.2: Analyzing answer to identify improvement opportunities...")
gap_analysis_prompt = f"""You are an expert research assistant analyzing an answer to identify areas where web search could add value.
Original User Query: {query}
Current Answer:
{intermediate_answer.content[:2000]}{"..." if len(intermediate_answer.content) > 2000 else ""}
Your task:
1. Analyze what information is provided in the current answer
2. Identify what additional information from the web would significantly improve this answer
3. Determine what specific, current, or specialized knowledge would add value
Provide a brief analysis (2-3 sentences) of:
- What aspects of the answer are well-covered
- What specific information gaps exist that web search could fill
- What areas would benefit from more current or specialized sources
Gap Analysis:"""
gap_analysis = llm.invoke(gap_analysis_prompt)
gap_text = gap_analysis.content.strip()
print(f"š Gap Analysis: {gap_text[:250]}...")
# Step 3: Generate search queries based on the gap analysis
print(f"š§ Step 4.{cycle+1}.3: Generating search queries to fill identified gaps ({num_queries} queries)...")
# Build context-aware prompt for cycle 2+
if cycle > 0:
# For subsequent cycles, provide answer and gap analysis as context
context_query = f"""Based on the current answer and gap analysis below, generate {num_queries} web search queries that will fill the identified information gaps.
[Current Answer]:
{intermediate_answer.content[:1500]}{"..." if len(intermediate_answer.content) > 1500 else ""}
[Gap Analysis]:
{gap_text}
[Task]: Generate {num_queries} specific, targeted web search queries that will address the gaps identified above."""
else:
# For first cycle, use original query
context_query = query
search_query_prompt = self.generate_prompt("Vaccine_google", data_sections, context_query)
# Dynamically update the number of queries requested
search_query_prompt = search_query_prompt.replace("exactly three", f"exactly {num_queries}")
search_query_prompt = search_query_prompt.replace('"First search query",\n "Second search query",\n "Third search query"',
',\n '.join([f'"Search query {i+1}"' for i in range(num_queries)]))
answer = llm.invoke(search_query_prompt)
print(f"ā
Search queries generated (response length: {len(answer.content)} chars)")
# Parse the JSON response - try multiple methods
search_queries = []
response_content = answer.content.strip()
try:
# Try direct JSON parse first
dict_content = json.loads(response_content)
search_queries = dict_content.get('search_queries', [])
print(f"ā
Parsed JSON directly: {len(search_queries)} queries")
except:
try:
# Try extracting JSON from markdown code block
if '```json' in response_content:
json_start = response_content.find('```json') + 7
json_end = response_content.find('```', json_start)
json_str = response_content[json_start:json_end].strip()
dict_content = json.loads(json_str)
search_queries = dict_content.get('search_queries', [])
print(f"ā
Parsed JSON from markdown: {len(search_queries)} queries")
elif '```' in response_content:
# Try plain code block
json_start = response_content.find('```') + 3
json_end = response_content.find('```', json_start)
json_str = response_content[json_start:json_end].strip()
dict_content = json.loads(json_str)
search_queries = dict_content.get('search_queries', [])
print(f"ā
Parsed JSON from code block: {len(search_queries)} queries")
else:
# Try to find JSON object in response
json_start = response_content.find('{')
json_end = response_content.rfind('}') + 1
if json_start >= 0 and json_end > json_start:
json_str = response_content[json_start:json_end]
dict_content = json.loads(json_str)
search_queries = dict_content.get('search_queries', [])
print(f"ā
Extracted and parsed JSON: {len(search_queries)} queries")
else:
raise Exception("No JSON object found")
except Exception as e:
# Fallback - use original query
print(f"ā ļø JSON parsing failed: {str(e)}")
print(f"ā ļø Response content: {response_content[:200]}...")
search_queries = [query]
# Limit to requested number of queries
search_queries = search_queries[:num_queries]
print(f"š Using {len(search_queries)} search queries for cycle {cycle+1}")
# Step 4: Execute the searches
print(f"š Step 4.{cycle+1}.4: Executing {len(search_queries)} web searches...")
# Report search progress
if progress_callback:
progress_callback({
'cycle': cycle + 1,
'total_cycles': num_cycles,
'progress': int(((cycle + 0.4) / num_cycles) * 85) + 10,
'message': f'Cycle {cycle + 1}/{num_cycles}: Executing {len(search_queries)} web searches...'
})
for i, s in enumerate(search_queries, 1):
print(f"š Search {i}/{len(search_queries)}: {s}")
# Parse Serper results to extract content and URLs
search_output = search_tool.results(s)
search_results, extracted_urls = self.extract_serper_results(search_output)
self.search_results = self.search_results + "\n" + search_results
print(f"ā
Search {i} complete: Found {len(extracted_urls)} URLs")
# Add extracted URLs to blocks_dict for reference
for url_info in extracted_urls:
title = url_info.get('title', 'Web Page')
url = url_info.get('url', '')
snippet = url_info.get('snippet', '')
# Add reference in blocks_dict
self.blocks_dict[self.block_counter] = {
"type": "web",
"id": f"web_{self.block_counter}",
"url": url,
"title": title,
"snippet": snippet,
"content": f"Web search result: {title}. {url}",
"cycle": cycle + 1
}
self.block_counter += 1
all_cycle_results.append({
'cycle': cycle + 1,
'queries': search_queries,
'results_count': len(search_queries)
})
print(f"ā
Cycle {cycle + 1} complete - {len(search_queries)} searches executed")
# After all cycles, generate final answer with all accumulated results
total_refs = self.block_counter - web_ref_count
print(f"\nš Web search complete: Total {total_refs} web references added across {num_cycles} cycle(s)")
print(f"š§ Generating final answer with all {num_cycles} cycle(s) of results...")
final_prompt = self.generate_prompt("Vaccine_base", data_sections, query)
answer = llm.invoke(final_prompt)
print(f"ā
Final answer generated ({len(answer.content)} chars)")
else:
print("š Web search disabled - skipping")
print(f"š Total blocks in reference dictionary: {len(self.blocks_dict)}")
## Generate response without web search
print("š§ Step 5: Generating final response with main LLM...")
prompt = self.generate_prompt("Vaccine_base", data_sections, query)
# Count tokens in prompt for monitoring
prompt_tokens = self.count_tokens(prompt)
print(f"š Final prompt size: {prompt_tokens:,} tokens")
answer = llm.invoke(prompt)
# Continue with answer processing (common for both web search enabled/disabled)
response_tokens = self.count_tokens(answer.content)
print(f"ā
Response generated ({response_tokens:,} tokens)")
# If reference formatting is enabled, apply it
if self.flow_control["enable_referencing"]:
print("š Step 6: Processing references and formatting...")
print(f" ⢠Blocks available in blocks_dict: {list(self.blocks_dict.keys())}")
print(f" ⢠Total blocks: {len(self.blocks_dict)}")
# Debug: Show first 200 chars of answer to see what citations might be there
print(f" ⢠Answer preview: {answer.content[:300]}...")
# Check if this is extensive search mode (comprehensive blocks already contain references)
has_extensive_blocks = any(
block_data.get("type", "").startswith("comprehensive_extensive")
for block_data in self.blocks_dict.values()
)
if has_extensive_blocks:
print(" š Extensive search mode detected: comprehensive blocks already contain reference lists")
# Collect all references from comprehensive blocks and create a single consolidated reference section
all_references = []
total_ref_count = 0
for block_id, block_data in self.blocks_dict.items():
if block_data.get("type", "").startswith("comprehensive_extensive"):
# Get the stored reference section and extract individual references
ref_section = block_data.get("reference_section", "")
ref_count = block_data.get("reference_count", 0)
total_ref_count += ref_count
if ref_section:
# Extract the reference lines (skip the header and separator lines)
ref_lines = ref_section.split('\n')
for line in ref_lines:
line = line.strip()
# Skip empty lines, separator lines, and header lines
if line and not line.startswith('---') and not line.startswith('**References:**'):
all_references.append(line)
# Create a single consolidated reference section if we have references
if all_references:
# Remove duplicates while preserving order, considering only the content after the number
seen_content = set()
unique_references = []
ref_counter = 1
for ref in all_references:
# Extract the content after the reference number (e.g., "[1]: Document A" -> "Document A")
if ']:' in ref:
content_part = ref.split(']:')[1].strip()
else:
content_part = ref.strip()
# Check if we've seen this content before
if content_part not in seen_content:
seen_content.add(content_part)
# Renumber the reference to maintain sequential numbering
unique_references.append(f"[{ref_counter}]: {content_part}")
ref_counter += 1
# Clean up the main answer to remove any [Block X] style references
import re
cleaned_answer = re.sub(r'\[Block \d+\]', '', answer.content)
# Only clean up excessive spaces (not newlines) to preserve markdown formatting
cleaned_answer = re.sub(r'[ \t]+', ' ', cleaned_answer) # Replace multiple spaces/tabs with single space
cleaned_answer = re.sub(r'[ \t]*\n[ \t]*', '\n', cleaned_answer) # Clean up spaces around newlines
cleaned_answer = cleaned_answer.strip()
# Create the consolidated reference section
consolidated_references = "\n\n---\n\n**References:**\n" + "\n".join(unique_references)
formatted_answer = cleaned_answer + consolidated_references
print(f"ā
References consolidated from {len([bd for bd in self.blocks_dict.values() if bd.get('type', '').startswith('comprehensive_extensive')])} comprehensive blocks")
print(f" ⢠Total individual references collected: {len(all_references)}")
print(f" ⢠Unique references after content-based deduplication: {len(unique_references)}")
print(f" ⢠Expected total references: {total_ref_count}")
print(f" ⢠Duplicates removed: {len(all_references) - len(unique_references)}")
print(f" ⢠[Block X] references cleaned from main content")
else:
formatted_answer = answer.content
print(f"ā ļø No reference sections found in comprehensive blocks")
print(f" ⢠Reference consolidation complete - single reference section created")
else:
# Standard mode: process citations and create reference section
print(" š Standard mode: processing citations and creating reference section")
ref_manager = ReferenceManager(default_style="apa")
processed_text, references_section = ref_manager.process_references(
answer.content,
self.blocks_dict,
style="apa"
)
formatted_answer = processed_text + "\n\n" + references_section
print(f"ā
References processed and formatted")
print(f" ⢠References section length: {len(references_section)} characters")
else:
formatted_answer = answer.content
print("š Reference formatting disabled")
self.chat_memory.save_context(
{"role": "user", "content": query},
{"role": "assistant", "content": answer.content},
)
print("=" * 80)
print(f"šÆ Response complete! Final response: {len(formatted_answer):,} characters")
print(f"š¾ Total blocks in context: {len(self.blocks_dict)}")
print(f"\nš¤ SMALL LLM USAGE SUMMARY:")
print(f" ⢠Keyword extraction calls: {self.small_llm_usage['keyword_extraction']}")
print(f" ⢠Query expansion calls: {self.small_llm_usage['query_expansion']}")
print(f" ⢠Document summarization calls: {self.small_llm_usage['document_summarization']}")
print(f" ⢠Total small LLM calls: {self.small_llm_usage['total_calls']}")
print("=" * 80)
# Store the response text for reference filtering
self.last_response_text = formatted_answer
return pn.pane.Markdown(formatted_answer)
def get_block_by_number(self, block_number):
"""
Retrieve a specific block by its number from the most recent response.
For extensive search mode, this may extract specific documents from comprehensive blocks.
Args:
block_number (int): The block number to retrieve
Returns:
dict: Block data including content, title, source info, etc., or None if not found
"""
try:
block_num = int(block_number)
if hasattr(self, 'blocks_dict') and block_num in self.blocks_dict:
block_data = self.blocks_dict[block_num]
# For comprehensive extensive blocks, return the full comprehensive summary
if block_data.get('type') == 'comprehensive_extensive':
return {
'content': block_data.get('comprehensive_summary', block_data.get('content', '')),
'title': f'Comprehensive Summary (Block {block_num})',
'source': f'Extensive search from {block_data.get("documents_processed", 0)} documents',
'collection': block_data.get('collection', ''),
'type': 'comprehensive_extensive',
'reference_count': block_data.get('reference_count', 0)
}
else:
# Standard block - return as is
return block_data
else:
print(f"š Block {block_num} not found in blocks_dict. Available blocks: {list(getattr(self, 'blocks_dict', {}).keys())}")
return None
except (ValueError, TypeError):
print(f"ā ļø Invalid block number: {block_number}")
return None
def extract_referenced_blocks_from_response(self, response_text):
"""
Extract which block numbers are actually referenced in the response text.
Args:
response_text (str): The final formatted response text
Returns:
set: Set of block numbers that are referenced in the response
"""
referenced_blocks = set()
if not response_text:
return referenced_blocks
# Pattern to match block references like [block 1], [Block 2], [1], etc.
patterns = [
r'\[block\s+(\d+)\]', # [block 1]
r'\[Block\s+(\d+)\]', # [Block 1]
r'\[(\d+)\]:', # [1]: (in reference sections)
r'\[(\d+)\](?!\s*:)', # [1] (but not [1]:)
]
for pattern in patterns:
matches = re.findall(pattern, response_text, re.IGNORECASE)
for match in matches:
try:
block_num = int(match)
referenced_blocks.add(block_num)
except ValueError:
continue
return referenced_blocks
def get_available_references(self, response_text=None):
"""
Get list of available references that can be used in the UI selection box.
Now filters to only show blocks that are actually referenced in the response.
Args:
response_text (str, optional): The response text to filter by. If None, uses stored response or returns all blocks.
Returns:
list: List of reference dictionaries with id, type, title, and source information
"""
references = []
seen_references = set() # Track unique references to prevent duplicates
if not hasattr(self, 'blocks_dict') or not self.blocks_dict:
return references
# Determine which blocks are actually referenced in the response
if response_text:
referenced_blocks = self.extract_referenced_blocks_from_response(response_text)
elif hasattr(self, 'last_response_text') and self.last_response_text:
# Use the stored response text if available
referenced_blocks = self.extract_referenced_blocks_from_response(self.last_response_text)
else:
# If no response text provided, include all blocks (fallback behavior)
referenced_blocks = set(self.blocks_dict.keys())
print(f"š Reference filtering: {len(referenced_blocks)} blocks referenced out of {len(self.blocks_dict)} total blocks")
print(f" ⢠Referenced blocks: {sorted(referenced_blocks)}")
for block_num, block_data in self.blocks_dict.items():
# Skip blocks that aren't referenced in the response
if block_num not in referenced_blocks:
print(f" ⢠Skipping unreferenced block {block_num}")
continue
block_type = block_data.get('type', '')
if block_type == 'comprehensive_extensive':
# Extensive search mode: Extract individual documents from the reference section
print(f"š Processing extensive search block {block_num} with {block_data.get('reference_count', 0)} references")
# Get the reference section which contains the actual filtered documents
reference_section = block_data.get('reference_section', '')
# Parse individual references from the reference section
if reference_section:
# Enhanced parsing with multiple patterns and better title extraction
lines = reference_section.split('\n')
for line in lines:
line = line.strip()
if not line or line.startswith('---') or line.startswith('**References'):
continue
# Try multiple patterns for extensive search references
ref_match = None
# Pattern 1: [1]: Full Document Title [UUID: uuid-here]
pattern1 = re.match(r'\[(\d+)\]:\s*(.+?)\s*\[UUID:\s*([^\]]+)\]', line)
if pattern1:
ref_num, title, uuid = pattern1.groups()
ref_match = {
'ref_num': ref_num,
'title': title.strip(),
'uuid': uuid.strip(),
'source': 'Neo4j Database'
}
# Pattern 2: [1]: <a href='file:///...'>filename</a>
elif '<a href=' in line:
pattern2 = re.match(r'\[(\d+)\]:\s*<a\s+href=[\'"]file:///([^\'"]+)[\'"][^>]*>([^<]+)</a>', line)
if pattern2:
ref_num, file_path, display_name = pattern2.groups()
ref_match = {
'ref_num': ref_num,
'title': display_name.strip(),
'file_path': file_path.strip(),
'source': file_path.strip()
}
# Pattern 3: [1]: Simple title without special formatting
elif line.startswith('[') and ']:' in line:
pattern3 = re.match(r'\[(\d+)\]:\s*(.+?)(?:\s*\(Collection:\s*([^)]+)\))?$', line)
if pattern3:
ref_num, title, collection = pattern3.groups()
# Clean title - remove any trailing collection info
title = title.strip()
if title: # Only add if title is not empty
ref_match = {
'ref_num': ref_num,
'title': title,
'collection': collection.strip() if collection else block_data.get('collection', ''),
'source': collection.strip() if collection else block_data.get('collection', 'Extensive Search')
}
# If we found a match, create the reference
if ref_match:
# Create unique identifier for deduplication
unique_key = f"{ref_match['title']}_{ref_match.get('uuid', ref_match.get('file_path', ref_match['ref_num']))}"
if unique_key not in seen_references:
seen_references.add(unique_key)
ref_info = {
'id': f'ref_extensive_{block_num}_{ref_match["ref_num"]}',
'block_number': int(ref_match['ref_num']) if ref_match['ref_num'].isdigit() else 0,
'type': 'reference',
'title': ref_match['title'],
'source': ref_match.get('source', 'Extensive Search'),
'uuid': ref_match.get('uuid', ''),
'content_preview': f'Reference from extensive search: {ref_match["title"][:150]}...',
'doc_name': ref_match['title'],
'metadata': {
'collection': ref_match.get('collection', block_data.get('collection', '')),
'extensive_search': True,
'original_block': block_num,
'reference_number': ref_match['ref_num'],
'file_path': ref_match.get('file_path', '')
}
}
references.append(ref_info)
print(f" ⢠Added extensive reference {ref_match['ref_num']}: '{ref_match['title']}'")
else:
print(f" ⢠Could not parse reference line: '{line}'")
# Fallback: if no references were parsed but we have filtered_documents, use those
if not any(r.get('metadata', {}).get('original_block') == block_num for r in references):
filtered_documents = block_data.get('filtered_documents', [])
if filtered_documents:
print(f" ⢠No references parsed from section, using {len(filtered_documents)} filtered documents")
for i, doc in enumerate(filtered_documents[:10], 1): # Limit to first 10
title = doc.get('title', f'Document {i}')
unique_key = f"{title}_{doc.get('doc_uuid', i)}"
if unique_key not in seen_references:
seen_references.add(unique_key)
ref_info = {
'id': f'ref_extensive_{block_num}_{i}',
'block_number': i, # Use reference number instead of block number
'reference_number': i,
'type': 'reference',
'title': title,
'source': doc.get('source', 'Extensive Search'),
'uuid': doc.get('doc_uuid', ''),
'content_preview': doc.get('content_preview', 'Document content...'),
'doc_name': title,
'metadata': {
'extensive_search': True,
'original_block': block_num,
'reference_index': i,
'collection': block_data.get('collection', ''),
}
}
references.append(ref_info)
print(f" ⢠Added filtered document {i}: '{title}'")
print(f"ā
Extracted {len([r for r in references if r.get('metadata', {}).get('original_block') == block_num])} individual references from extensive search block")
else:
# Standard search mode: Use block data directly with better titles
title = block_data.get('title', f'Reference Block {block_num}')
doc_name = block_data.get('doc_name', '')
source = block_data.get('source', 'Unknown')
# Improve title for standard mode - use document name if available
if doc_name and doc_name != 'Unknown Document':
if title == f'Reference Block {block_num}' or title.startswith('Document from ChromaDB'):
title = doc_name
elif title.startswith('Literature from ChromaDB'):
title = f'Literature: {doc_name}'
elif title.startswith('Reference from ChromaDB'):
title = f'Reference: {doc_name}'
# Extract collection name from source for better display
collection_info = ''
if 'ChromaDB:' in source:
collection_name = source.replace('ChromaDB:', '').strip()
collection_info = f' (Collection: {collection_name})'
elif block_data.get('collection'):
collection_info = f' (Collection: {block_data.get("collection")})'
# Create unique identifier for deduplication
unique_key = f"{title}_{block_num}_{source}"
if unique_key not in seen_references:
seen_references.add(unique_key)
ref_info = {
'id': f'ref_block_{block_num}',
'block_number': block_num,
'type': 'reference_block',
'title': title + collection_info,
'source': source,
'content_preview': (block_data.get('content', '')[:200] + '...') if len(block_data.get('content', '')) > 200 else block_data.get('content', ''),
'doc_name': doc_name,
'metadata': {
'collection': block_data.get('collection', ''),
'chunk_id': block_data.get('chunk_id', ''),
'similarity_score': block_data.get('similarity_score', 0),
'block_type': block_type,
'extensive_search': False,
'file_path': block_data.get('path', '')
}
}
references.append(ref_info)
print(f" ⢠Added standard reference block {block_num}: '{title}'")
# Also check for Neo4j documents if extensive_search_manager is available
if hasattr(self, 'extensive_search_manager') and hasattr(self.extensive_search_manager, 'get_recently_accessed_documents'):
try:
neo4j_docs = self.extensive_search_manager.get_recently_accessed_documents()
for doc in neo4j_docs:
if 'uuid' in doc:
# Create unique identifier for deduplication
unique_key = f"{doc.get('doc_name', 'Unknown')}_{doc['uuid']}"
if unique_key not in seen_references:
seen_references.add(unique_key)
ref_info = {
'id': f'ref_neo4j_{doc["uuid"]}',
'uuid': doc['uuid'],
'type': 'reference',
'title': doc.get('doc_name', f'Document {doc["uuid"][:8]}'),
'source': 'Neo4j Database',
'content_preview': doc.get('content_preview', 'Neo4j document...'),
'doc_name': doc.get('doc_name', f'Document {doc["uuid"][:8]}'),
'metadata': {
'neo4j_document': True,
'uuid': doc['uuid']
}
}
references.append(ref_info)
print(f" ⢠Added Neo4j document: '{doc.get('doc_name', 'Unknown')}'")
except Exception as e:
print(f"Warning: Could not retrieve Neo4j documents: {e}")
print(f"ā
Total unique references prepared for frontend: {len(references)}")
return references
def get_embedding(self,text):
"""Generate an embedding for the given text using OpenAI's text-embedding-ada-002 model."""
response = openai.embeddings.create(
model="text-embedding-3-small",
input=text
)
return response.data[0].embedding
def extract_for_queries(self,text, queries, max_tokens=5000, api_key=None):
"""
Extract information from text based on queries.
Args:
text: Text to extract from
queries: List of queries to guide extraction
max_tokens: Maximum tokens in the output
api_key: API key for the LLM service
Returns:
Extracted text relevant to the queries
"""
api_key = "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" # Replace with your actual API key
extractor = QueryBasedExtractor(
max_output_tokens=max_tokens,
api_key=api_key,
model_name="gpt-4o-mini" # Or another small model
)
return extractor.extract(text, queries)
def parse_handler(self, query, detail_level="Balanced"):
data_sections = {}
# Create blocks_dict directly in the format needed by ReferenceManager
self.blocks_dict = {} # Replace self.inline_refs with self.blocks_dict
self.block_counter = 1 # Start block numbering from 1 to match example
for key in self.data_handles.handlers.keys():
if self.data_handles.handlers[key]["type"] == "text":
data_sections[key] = f"[block {self.block_counter}] {self.data_handles.handlers[key]['data']}"
# Create block entry in proper format
self.blocks_dict[self.block_counter] = {
"type": "generic",
"id": f"text_{self.block_counter}",
"content": f"Text content: {self.data_handles.handlers[key]['data'][:100]}..."
}
elif self.data_handles.handlers[key]["type"] == "dataframe":
data_sections[key] = f"[block {self.block_counter}] {self.extract_for_queries(self.data_handles.handlers[key]['data'].to_markdown(), [query])}"
# Create block entry for dataframe
self.blocks_dict[self.block_counter] = {
"type": "generic",
"id": f"dataframe_{self.block_counter}",
"content": f"Dataframe content from {key}"
}
elif self.data_handles.handlers[key]["type"] == "vectorstore":
data_sections[key] = self.collect_text_blocks(self.data_handles.handlers[key], query)
elif self.data_handles.handlers[key]["type"] == "db_search":
data_sections[key] = self.collect_data_from_neo4j(self.data_handles.handlers[key], query, detail_level=detail_level)
elif self.data_handles.handlers[key]["type"] == "chromaDB":
data_sections[key] = self.collect_data_from_chroma(self.data_handles.handlers[key], query, detail_level)
self.block_counter += 1
return data_sections
def reformat_data(self, data, min_document_length=30, similarity_threshold=0.95, use_crossencoder=False, inclusions=10):
"""
Reformat and filter data to be grouped by ID, excluding too-short documents
and documents that are too similar to each other. Optionally applies crossencoder ranking.
Args:
data: Original data structure
min_document_length: Minimum character length for documents to include (default: 30)
similarity_threshold: Threshold for document similarity (default: 0.95, higher means more similar)
use_crossencoder: Whether to apply crossencoder reranking (default: False)
inclusions: Number of documents to return after filtering (default: 10)
Returns:
List of selected documents (not dictionary)
"""
from sentence_transformers import CrossEncoder
import numpy as np
result = {}
selected_docs = []
selected_embeddings = []
# Unpack the nested lists for easier access
ids_list = data['ids'][0]
documents_list = data['documents'][0]
metadatas_list = data['metadatas'][0]
embeddings_array = data['embeddings'][0]
# First pass: filter by document length and organize data
candidates = []
for i, id_val in enumerate(ids_list):
# Check if document meets length requirement and does not exceed a max toaken lenght
if len(documents_list[i]) >= min_document_length and self.count_tokens(documents_list[i]) <= 10000:
candidates.append({
'id': id_val,
'document': documents_list[i],
'metadata': metadatas_list[i],
'embedding': embeddings_array[i].tolist() if embeddings_array is not None else None
})
# If we don't have enough candidates, return all we have
if len(candidates) <= inclusions:
return [(doc['metadata'],doc['document']) for doc in candidates]
# Second pass: filter by similarity
for candidate in candidates:
candidate_embedding = np.array(candidate['embedding'])
# Normalize embedding
norm = np.linalg.norm(candidate_embedding)
if norm > 0:
candidate_embedding = candidate_embedding / norm
# Check if candidate is too similar to any already selected document
is_redundant = False
for sel_emb in selected_embeddings:
similarity = np.dot(candidate_embedding, sel_emb)
if similarity >= similarity_threshold:
is_redundant = True
break
if not is_redundant:
# Add to result dictionary
result[candidate['id']] = {
'document': candidate['document'],
'metadata': candidate['metadata'],
'embedding': candidate['embedding']
}
# Add to selected lists for similarity checks
selected_docs.append(candidate)
selected_embeddings.append(candidate_embedding)
# If we've collected enough documents and don't need crossencoder, we can stop
#if len(selected_docs) >= inclusions * 2 and not use_crossencoder:
# break
# If using crossencoder for reranking
if use_crossencoder and len(selected_docs) > inclusions:
query = data.get('query_text', '')
if not query: # If no query provided, use a placeholder
query = "default query" # Ideally this should be passed in
cross_model = CrossEncoder('BAAI/bge-reranker-base')
query_doc_pairs = [(query, doc['document']) for doc in selected_docs]
scores = cross_model.predict(query_doc_pairs)
# Zip documents with their scores and sort by score (highest first)
doc_score_pairs = list(zip(selected_docs, scores))
ranked_docs = sorted(doc_score_pairs, key=lambda x: x[1], reverse=True)
# Take the top 'inclusions' documents after reranking
selected_docs = [doc for doc, _ in ranked_docs[:inclusions]]
elif len(selected_docs) > inclusions:
# If not using crossencoder but have too many docs, just take the first 'inclusions'
selected_docs = selected_docs[:inclusions]
# Return just the document text for further processing
#print("returning ",[(doc['metadata'],doc['document']) for doc in selected_docs])
return [(doc['metadata'],self.extract_for_queries(doc['document'],self.extended_query)) for doc in selected_docs]
def score_reference_relevance(self, final_answer, reference_documents, relevance_threshold=0.3):
"""
Score the relevance of each reference document against the final answer using a small LLM.
Args:
final_answer: The generated answer text
reference_documents: List of (metadata, content) tuples for reference documents
relevance_threshold: Minimum score to include a reference (0.0-1.0)
Returns:
List of (metadata, content, score) tuples for documents above threshold
"""
print(f"šÆ Scoring reference relevance against final answer...")
print(f" ⢠Evaluating {len(reference_documents)} reference documents")
print(f" ⢠Relevance threshold: {relevance_threshold}")
# Use small LLM for scoring
llm = ChatOpenAI(
model=self.flow_control['pre_model'][1],
temperature=0
)
relevant_references = []
# Create a short summary of the final answer for comparison (first 1000 chars)
answer_summary = final_answer[:1000] + "..." if len(final_answer) > 1000 else final_answer
for i, (metadata, content) in enumerate(reference_documents):
print(f" Scoring document {i+1}/{len(reference_documents)}: {metadata.get('title', 'Unknown')[:50]}...")
# Create a short excerpt from document content for scoring
content_excerpt = content[:800] + "..." if len(content) > 800 else content
prompt = f"""
You are a document relevance scorer. Your task is to determine how relevant a reference document is to a final answer.
FINAL ANSWER (excerpt):
{answer_summary}
REFERENCE DOCUMENT (excerpt):
{content_excerpt}
Rate the relevance of this reference document to the final answer on a scale of 0.0 to 1.0:
- 0.0: Completely irrelevant, no useful information for the answer
- 0.3: Somewhat relevant, provides background or tangential information
- 0.5: Moderately relevant, provides supporting information
- 0.7: Highly relevant, provides key information used in the answer
- 1.0: Extremely relevant, essential for the answer
Consider:
- Does the document contain information that directly supports the answer?
- Are there shared topics, concepts, or findings?
- Would removing this reference make the answer less accurate or complete?
Respond with ONLY a number between 0.0 and 1.0, nothing else.
"""
try:
response = llm.invoke(prompt)
score_text = response.content.strip()
# Extract number from response, handling various formats
import re
# Look for decimal numbers in the response
number_match = re.search(r'(\d+\.?\d*)', score_text)
if number_match:
score = float(number_match.group(1))
# Ensure score is between 0.0 and 1.0
score = max(0.0, min(1.0, score))
else:
print(f" ā ļø Could not parse score from: '{score_text}', using 0.5")
score = 0.5
if score >= relevance_threshold:
relevant_references.append((metadata, content, score))
print(f" ā
Score: {score:.2f} (included)")
else:
print(f" ā Score: {score:.2f} (filtered out)")
except Exception as e:
print(f" ā ļø Error scoring document: {e}")
print(f" ā ļø Response was: '{response.content if 'response' in locals() else 'No response'}'")
# Default to including if scoring fails
score = 0.5
relevant_references.append((metadata, content, score))
print(f" ā
Using default score: {score:.2f} (included)")
print(f" ā
Reference relevance scoring complete")
print(f" ⢠Documents evaluated: {len(reference_documents)}")
print(f" ⢠Documents above threshold: {len(relevant_references)}")
print(f" ⢠Documents filtered out: {len(reference_documents) - len(relevant_references)}")
# Sort by relevance score (highest first)
relevant_references.sort(key=lambda x: x[2], reverse=True)
return relevant_references
def extract_filter_keywords(self, query, n_keywords=2):
"""
Extract distinguishing keywords from a query for filtering search results.
Args:
query: The user's query text
n_keywords: Maximum number of keywords to extract
Returns:
List of keywords for filtering
"""
print(f"š Extracting filter keywords...")
print(f" š¤ Using small LLM ({self.flow_control['pre_model'][1]}) for keyword extraction")
self.small_llm_usage["keyword_extraction"] += 1
self.small_llm_usage["total_calls"] += 1
llm = ChatOpenAI(
model=self.flow_control['pre_model'][1],
temperature=0
)
# Make the instruction much more explicit about the exact count
prompt = f"""
You are a search optimization expert. Extract EXACTLY {n_keywords} specific distinguishing keyword(s) from this query:
"{query}"
Guidelines:
- You MUST return EXACTLY {n_keywords} keyword(s) - no more, no less
- Focus on proper nouns, company names, technical terms, and specific concepts
- Select word(s) that would differentiate this topic from related but irrelevant topics
- Choose word(s) that could filter out incorrect contexts (wrong companies, unrelated domains)
- Exclude common words like "the", "and", "of"
- Return ONLY a JSON array of strings with EXACTLY {n_keywords} string(s)
Example:
For "Impact of Pfizer's mRNA vaccine development on COVID-19 transmission" and n_keywords=1
Output: ["Pfizer"]
For "Impact of Pfizer's mRNA vaccine development on COVID-19 transmission" and n_keywords=3
Output: ["Pfizer", "mRNA", "COVID-19"]
Output format:
```json
["keyword1"{", keyword2" if n_keywords > 1 else ""}{", ..." if n_keywords > 2 else ""}]
```
Remember: I need EXACTLY {n_keywords} keyword(s). Count carefully before submitting.
"""
response = llm.invoke(prompt)
try:
# Extract JSON from response
content = response.content.strip()
if '```json' in content:
content = content.split('```json')[1].split('```')[0].strip()
elif '```' in content:
content = content.split('```')[1].split('```')[0].strip()
keywords = json.loads(content)
# Force the correct number of keywords
if len(keywords) > n_keywords:
keywords = keywords[:n_keywords]
elif len(keywords) < n_keywords and len(keywords) > 0:
# If we got fewer keywords than requested but at least one, duplicate the first one
while len(keywords) < n_keywords:
keywords.append(keywords[0])
keywords = [k.lower() for k in keywords] # Convert to lowercase for case-insensitive matching
print(f"ā
Filter keywords extracted: {keywords}")
return keywords
except Exception as e:
print(f"Error extracting keywords: {e}")
# Fall back to simple keyword extraction if LLM fails
words = query.lower().split()
stopwords = ['the', 'a', 'an', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for', 'with', 'about']
return [w for w in words if w not in stopwords and len(w) > 3][:n_keywords]
def collect_data_from_chroma(self, data, query, detail_level="Balanced"):
"""
Collect relevant documents from ChromaDB based on query with optimized workflow:
1) Combine results from all extended queries
2) Apply keyword filters across all results
3) Remove similar documents and apply cross-encoder if requested
4) Evaluate against target and add additional documents as needed
Args:
data: Configuration data for collection and processing
query: The user query
detail_level: Level of detail for the summary (Summary, Balanced, Detailed, Comprehensive)
Returns:
String with collected document blocks
"""
print("\n" + "=" * 60)
print(f"š CHROMADB COLLECTION: {data.get('data', 'Unknown')}")
print("=" * 60)
import re # For citation extraction in extensive search mode
# Start with empty collections
collected_blocks = []
candidate_docs = {
'ids': [],
'documents': [],
'metadatas': [],
'embeddings': []
}
# Extract filter keywords for hybrid search based on keyword filtering mode
filter_keywords = []
# Check if keyword filtering is enabled
keyword_filtering_enabled = self.flow_control.get("enable_keyword_filtering", False)
if keyword_filtering_enabled:
print("š Keyword filtering enabled - determining keyword source...")
# Check for manual keywords first
manual_keywords_str = self.flow_control.get("manual_keywords", "").strip()
if manual_keywords_str:
# Scenario 1: Manual keywords provided - use ONLY those
manual_keywords = [kw.strip() for kw in manual_keywords_str.split(',') if kw.strip()]
filter_keywords = manual_keywords
print(f"š Using manual keywords ONLY: {filter_keywords}")
else:
# Scenario 2: No manual keywords - use automatic LLM extraction
print("š¤ No manual keywords provided - using automatic LLM extraction...")
filter_keywords = self.extract_filter_keywords(query)
print(f"š Auto-extracted keywords: {filter_keywords}")
else:
print("āļø Keyword filtering disabled - skipping keyword extraction")
print(f"ā
Final filter keywords: {filter_keywords}")
# Configure retrieval parameters
use_crossencoder = "crossencoder" in data["processing_steps"]
target_docs = data["inclusions"]
initial_k = target_docs * 10 if use_crossencoder else target_docs * 3
print(f"š Retrieval Configuration:")
print(f" ⢠Target documents: {target_docs}")
print(f" ⢠Initial retrieval (k): {initial_k}")
print(f" ⢠Cross-encoder enabled: {use_crossencoder}")
print(f" ⢠Processing steps: {data['processing_steps']}")
# Generate extended queries if needed
if "extend_query" in data["processing_steps"]:
print("š Extending query with additional variations...")
# Debug memory state right before extend_query call
print(f"š Pre-extend_query debug:")
print(f" ⢠Memory enabled: {self.flow_control.get('enable_memory', False)}")
print(f" ⢠Chat memory exists: {hasattr(self, 'chat_memory')}")
if hasattr(self, 'chat_memory'):
print(f" ⢠Chat memory messages: {len(self.chat_memory.messages)}")
if len(self.chat_memory.messages) > 0:
print(f" ⢠First message preview: {str(self.chat_memory.messages[0])[:100]}...")
self.extend_query(query)
self.extended_query.append(query)
print(f"ā
Query extension complete. Total queries: {len(self.extended_query)}")
else:
self.extended_query = [query]
print(f"š Using single query (no extension)")
# Get ChromaDB collection
# Extract collection name from data - handle both string and collection object cases
collection_name = data["data"]
if isinstance(collection_name, str):
# v0.6.0+: collection name is already a string
pass # collection_name is already correct
elif not isinstance(collection_name, str):
# If it's not a string, convert it to string and extract name if needed
collection_str = str(collection_name)
if 'Collection(name=' in collection_str:
# Extract name from string representation like "Collection(name=wuxi)"
match = re.search(r'Collection\(name=([^)]+)\)', collection_str)
if match:
collection_name = match.group(1)
else:
collection_name = collection_str
else:
collection_name = collection_str
print(f"ā
ChromaDB collection '{collection_name}' loaded successfully")
client = self.chroma_client.get_collection(collection_name, embedding_function=self.chroma_embedder)
# STEP 1: Retrieve candidate documents from all extended queries
print(f"\nš QUERY PROCESSING:")
print(f" ⢠Processing {len(self.extended_query)} extended queries")
print(f" ⢠Retrieving up to {initial_k} candidates per query")
all_ids = set() # Track IDs to avoid duplicates
for idx, q in enumerate(self.extended_query):
print(f" Query {idx+1}/{len(self.extended_query)}: '{q[:50]}{'...' if len(q) > 50 else ''}'")
# Retrieve a larger batch of documents
retrieved_docs = client.query(
query_texts=[q],
n_results=initial_k,
include=["documents", "metadatas", "embeddings"]
)
# Only process if we got results
query_count = 0
if retrieved_docs['documents'] and len(retrieved_docs['documents'][0]) > 0:
# Add unique documents to our candidates pool
for i, doc_id in enumerate(retrieved_docs['ids'][0]):
if doc_id not in all_ids:
all_ids.add(doc_id)
candidate_docs['ids'].append(doc_id)
candidate_docs['documents'].append(retrieved_docs['documents'][0][i])
candidate_docs['metadatas'].append(retrieved_docs['metadatas'][0][i])
if retrieved_docs['embeddings'] and len(retrieved_docs['embeddings'][0]) > i:
candidate_docs['embeddings'].append(retrieved_docs['embeddings'][0][i])
query_count += 1
print(f" ā Retrieved {len(retrieved_docs['documents'][0]) if retrieved_docs['documents'] else 0} total, {query_count} unique candidates")
print(f"\nš CANDIDATE DOCUMENT STATISTICS:")
print(f" ⢠Total unique candidates collected: {len(candidate_docs['ids'])}")
# STEP 2: Apply keyword filtering (skip for extensive search mode - will be applied later on full documents)
print(f"\nš KEYWORD FILTERING:")
keyword_filtering_enabled = self.flow_control.get("enable_keyword_filtering", False)
enable_extensive_search = self.flow_control.get("enable_extensive_search", False)
filtered_docs = []
# For extensive search mode, skip chunk-level keyword filtering
if enable_extensive_search:
print(f" š Extensive search mode: Deferring keyword filtering until full documents are retrieved")
# Use all candidates for extensive search - keyword filtering will be applied later
for i, doc in enumerate(candidate_docs['documents']):
embedding = candidate_docs['embeddings'][i] if i < len(candidate_docs['embeddings']) else None
filtered_docs.append({
'id': candidate_docs['ids'][i],
'document': doc,
'metadata': candidate_docs['metadatas'][i],
'embedding': embedding
})
print(f" ā
Using all {len(filtered_docs)} candidates for extensive search (keyword filtering deferred)")
elif keyword_filtering_enabled and filter_keywords and "keyword_filter" in data.get("processing_steps", []):
print(f" ⢠Applying chunk-level keyword filter with keywords: {filter_keywords}")
# First try documents containing ALL keywords
all_keywords_docs = []
for i, doc in enumerate(candidate_docs['documents']):
doc_lower = doc.lower()
if all(keyword.lower() in doc_lower for keyword in filter_keywords):
embedding = candidate_docs['embeddings'][i] if i < len(candidate_docs['embeddings']) else None
all_keywords_docs.append({
'id': candidate_docs['ids'][i],
'document': doc,
'metadata': candidate_docs['metadatas'][i],
'embedding': embedding
})
print(f" ⢠Documents with ALL keywords: {len(all_keywords_docs)}")
filtered_docs.extend(all_keywords_docs)
# If we don't have enough with all keywords, try documents with ANY keyword
if len(all_keywords_docs) < target_docs:
print(f" ⢠Looking for documents with ANY keyword (need {target_docs - len(all_keywords_docs)} more)")
any_keyword_docs = []
for i, doc in enumerate(candidate_docs['documents']):
doc_id = candidate_docs['ids'][i]
# Skip if already included
if any(d['id'] == doc_id for d in filtered_docs):
continue
doc_lower = doc.lower()
if any(keyword.lower() in doc_lower for keyword in filter_keywords):
embedding = candidate_docs['embeddings'][i] if i < len(candidate_docs['embeddings']) else None
any_keyword_docs.append({
'id': doc_id,
'document': doc,
'metadata': candidate_docs['metadatas'][i],
'embedding': embedding
})
print(f" ⢠Additional documents with ANY keyword: {len(any_keyword_docs)}")
filtered_docs.extend(any_keyword_docs)
print(f" ā
Total documents after chunk-level keyword filtering: {len(filtered_docs)}")
# Check if keyword filtering eliminated all documents
if len(filtered_docs) == 0:
print(f" ā KEYWORD FILTERING: No documents match keyword criteria '{', '.join(filter_keywords)}'")
print(f" š« Respecting filtering constraints - no content will be generated from this collection.")
print(f"\nšÆ CHROMADB COLLECTION SUMMARY:")
print(f" ⢠Collection: {collection_name}")
print(f" ⢠Mode: Keyword filtering applied (chunk-level)")
print(f" ⢠Documents processed: 0 (filtered out by keyword criteria)")
print(f" ⢠Blocks created: 0")
print(f" ⢠Keywords required: {filter_keywords}")
print("=" * 60)
return "" # Return empty string - no content to process
else:
if not keyword_filtering_enabled:
print(f" ā ļø Keyword filtering disabled in settings")
else:
print(f" ā ļø No keyword filtering applied (keywords: {filter_keywords}, processing_steps: {data.get('processing_steps', [])})")
# Without keyword filtering, use all candidates
for i, doc in enumerate(candidate_docs['documents']):
embedding = candidate_docs['embeddings'][i] if i < len(candidate_docs['embeddings']) else None
filtered_docs.append({
'id': candidate_docs['ids'][i],
'document': doc,
'metadata': candidate_docs['metadatas'][i],
'embedding': embedding
})
print(f" ā
Using all {len(filtered_docs)} candidates (no filtering applied)")
# STEP 3: Process using similarity threshold to remove near-duplicates
print(f"\nš BASIC FILTERING:")
min_doc_length = 30
print(f" ⢠Minimum document length: {min_doc_length} characters")
#max_token_length = 10000 # Avoid documents that are too long
# Apply basic filtering (length, tokens)
candidates = [doc for doc in filtered_docs
if len(doc['document']) >= min_doc_length ]
print(f" ā
Candidates after length filtering: {len(candidates)} (removed {len(filtered_docs) - len(candidates)})")
# Apply similarity filtering to remove near-duplicates
print(f"\nš SIMILARITY DEDUPLICATION:")
print(f" ⢠Similarity threshold: 0.95 (documents above this are considered duplicates)")
selected_docs = []
selected_embeddings = []
similarity_threshold = 0.95
embeddings_processed = 0
embeddings_skipped = 0
duplicates_removed = 0
for candidate in candidates:
# Skip documents without embeddings
if candidate['embedding'] is None or not isinstance(candidate['embedding'], (list, np.ndarray)):
embeddings_skipped += 1
continue
embeddings_processed += 1
candidate_embedding = np.array(candidate['embedding'])
# Normalize embedding
norm = np.linalg.norm(candidate_embedding)
if norm > 0:
candidate_embedding = candidate_embedding / norm
# Check if candidate is too similar to any already selected document
is_redundant = False
for sel_emb in selected_embeddings:
similarity = np.dot(candidate_embedding, sel_emb)
if similarity >= similarity_threshold:
is_redundant = True
duplicates_removed += 1
break
if not is_redundant:
selected_docs.append(candidate)
selected_embeddings.append(candidate_embedding)
print(f" ⢠Embeddings processed: {embeddings_processed}")
print(f" ⢠Embeddings skipped (missing): {embeddings_skipped}")
print(f" ⢠Duplicates removed: {duplicates_removed}")
print(f" ā
Unique documents after similarity filtering: {len(selected_docs)}")
# STEP 4: Apply cross-encoder reranking if requested
print(f"\nšÆ FINAL DOCUMENT SELECTION:")
final_docs = []
# Determine target based on extensive search mode
enable_extensive_search = self.flow_control.get("enable_extensive_search", False)
extensive_search_chunks = self.flow_control.get("extensive_search_chunks", 100)
if enable_extensive_search:
# For extensive search, we need more documents to reach the chunk target
effective_target = min(len(selected_docs), extensive_search_chunks)
print(f" š Extensive search mode: targeting {effective_target} documents for processing")
else:
effective_target = target_docs
if use_crossencoder and len(selected_docs) > effective_target:
print(f" š Applying cross-encoder reranking to {len(selected_docs)} documents")
from sentence_transformers import CrossEncoder
cross_model = CrossEncoder('BAAI/bge-reranker-base')
# Create query-document pairs for the reranker
query_doc_pairs = [(query, doc['document']) for doc in selected_docs]
scores = cross_model.predict(query_doc_pairs)
print(f" ⢠Cross-encoder scores computed")
print(f" ⢠Score range: {min(scores):.4f} to {max(scores):.4f}")
# Sort by score (highest first)
doc_score_pairs = list(zip(selected_docs, scores))
ranked_docs = sorted(doc_score_pairs, key=lambda x: x[1], reverse=True)
# Select top documents after reranking
final_docs = [doc for doc, _ in ranked_docs[:effective_target]]
print(f" ā
Selected top {len(final_docs)} documents after cross-encoder reranking")
else:
# If not using cross-encoder or don't have enough docs, use all selected docs
final_docs = selected_docs[:effective_target]
print(f" š Selected {len(final_docs)} documents by similarity ranking (no cross-encoder)")
# STEP 5: If we still don't have enough documents, try unfiltered search
if len(final_docs) < target_docs and len(final_docs) < len(candidates):
additional_needed = target_docs - len(final_docs)
print(f" š Adding {additional_needed} more documents to reach target of {target_docs}")
# Find documents not already selected
remaining_docs = [doc for doc in candidates if doc['id'] not in [d['id'] for d in final_docs]]
# Add up to the target number
final_docs.extend(remaining_docs[:target_docs - len(final_docs)])
print(f" ā
Final document count: {len(final_docs)}")
# STEP 6: Process final documents and create blocks
print(f"\nš DOCUMENT PROCESSING:")
print(f" ⢠Processing {len(final_docs)} selected documents")
# Check if any documents remain for processing
if len(final_docs) == 0:
print(f" ā No documents available for processing after all filtering steps")
print(f" š« Collection '{collection_name}' has no content that matches the criteria")
print(f"\nšÆ CHROMADB COLLECTION SUMMARY:")
print(f" ⢠Collection: {collection_name}")
print(f" ⢠Mode: All filtering applied")
print(f" ⢠Documents processed: 0 (filtered out by various criteria)")
print(f" ⢠Blocks created: 0")
if filter_keywords:
print(f" ⢠Keywords required: {filter_keywords}")
print("=" * 60)
return "" # Return empty string - no content to process
# Check if extensive search is enabled
enable_extensive_search = self.flow_control.get("enable_extensive_search", False)
extensive_search_chunks = self.flow_control.get("extensive_search_chunks", 100)
print(f" ⢠Extensive search enabled: {enable_extensive_search}")
if enable_extensive_search:
print(f" ⢠Extensive search chunk limit: {extensive_search_chunks}")
if enable_extensive_search and self.extensive_search_manager:
print(f" š EXTENSIVE SEARCH PROCESSING:")
print(f" ⢠Processing {len(final_docs)} documents with full document retrieval")
# If extensive search requested, retrieve more chunks
if len(final_docs) < extensive_search_chunks:
additional_needed = extensive_search_chunks - len(final_docs)
print(f" ⢠Retrieving {additional_needed} additional chunks for extensive search (target: {extensive_search_chunks})")
# Get more documents for extensive search - prioritize by:
# 1. Documents that have multiple chunks (for better document completion)
# 2. Documents that passed keyword filtering but were filtered out by similarity
# 3. Remaining candidates by similarity score
remaining_docs = [doc for doc in candidates if doc['id'] not in [d['id'] for d in final_docs]]
# Try to group by document to get complete documents
doc_groups = {}
for doc in remaining_docs:
bibtex = doc['metadata'].get('bibtex', '')
if bibtex:
# Use document path as grouping key
doc_key = bibtex.split('/')[-1] if '/' in bibtex else bibtex
if doc_key not in doc_groups:
doc_groups[doc_key] = []
doc_groups[doc_key].append(doc)
# Prioritize document groups that have multiple chunks
prioritized_docs = []
multi_chunk_docs = 0
# First add documents with multiple chunks (complete documents)
for doc_key, doc_list in sorted(doc_groups.items(), key=lambda x: len(x[1]), reverse=True):
if len(doc_list) > 1: # Multiple chunks from same document
print(f" ⢠Adding {len(doc_list)} chunks from document: {doc_key}")
prioritized_docs.extend(doc_list)
multi_chunk_docs += 1
if len(prioritized_docs) >= additional_needed:
break
# Then add remaining single chunks if needed
if len(prioritized_docs) < additional_needed:
single_chunks = []
for doc_key, doc_list in doc_groups.items():
if len(doc_list) == 1:
single_chunks.extend(doc_list)
# Sort single chunks by position in original candidates (which are sorted by similarity)
remaining_needed = additional_needed - len(prioritized_docs)
prioritized_docs.extend(single_chunks[:remaining_needed])
final_docs.extend(prioritized_docs[:additional_needed])
print(f" ⢠Extended to {len(final_docs)} documents for extensive processing")
print(f" ⢠Added {multi_chunk_docs} complete documents and {len(prioritized_docs) - multi_chunk_docs} individual chunks")
# Process documents with improved source tracking and batch processing
target_summary_tokens = self.flow_control.get("target_summary_tokens", 8000)
print(f" ⢠Processing documents with batch consolidation...")
print(f" ⢠Target final summary size: ~{target_summary_tokens:,} tokens (suitable for main LLM)")
# Prepare documents for processing
documents_for_processing = []
full_docs_retrieved = 0
for idx, doc in enumerate(final_docs):
print(f" Processing document {idx+1}/{len(final_docs)}: {doc['id'][:20]}...")
# Try to get full document
full_document = self.extensive_search_manager.get_full_document(
doc['metadata'],
collection_name
)
current_content = ""
if full_document:
full_docs_retrieved += 1
current_content = full_document
print(f" ā
Full document retrieved: {self.count_tokens(full_document):,} tokens")
else:
# Fallback to original chunk content
current_content = doc['document']
print(f" ā ļø Using original chunk: {self.count_tokens(current_content):,} tokens")
# Prepare document metadata for source tracking
metadata = {
'id': doc['id'],
'source': collection_name,
'title': doc.get('metadata', {}).get('title', 'Unknown Document'),
'parent_name': doc.get('metadata', {}).get('parent_name', 'Unknown Source'),
'bibtex': doc.get('metadata', {}).get('bibtex', '')
}
# Improve title extraction - use filename if title is generic
if metadata['title'] == 'Unknown Document':
# Try to extract a better title from parent_name or bibtex
if metadata['parent_name'] and metadata['parent_name'] != 'Unknown Source':
# Use filename as title
filename = metadata['parent_name'].split('/')[-1] if '/' in metadata['parent_name'] else metadata['parent_name']
metadata['title'] = filename
elif metadata['bibtex']:
# Use filename from bibtex path
filename = metadata['bibtex'].split('/')[-1] if '/' in metadata['bibtex'] else metadata['bibtex']
metadata['title'] = filename
documents_for_processing.append((metadata, current_content))
print(f" ⢠Documents prepared: {len(documents_for_processing)}")
print(f" ⢠Full documents retrieved: {full_docs_retrieved}")
# APPLY KEYWORD FILTERING ON FULL DOCUMENTS FOR EXTENSIVE SEARCH MODE
keyword_filtered_documents = documents_for_processing
keyword_filtering_enabled = self.flow_control.get("enable_keyword_filtering", False)
if keyword_filtering_enabled and filter_keywords and "keyword_filter" in data.get("processing_steps", []):
print(f"\n š APPLYING KEYWORD FILTERING TO FULL DOCUMENTS:")
print(f" ⢠Filter keywords: {filter_keywords}")
print(f" ⢠Evaluating {len(documents_for_processing)} full documents")
# Apply keyword filtering to full document content
all_keywords_docs = []
any_keywords_docs = []
for metadata, full_content in documents_for_processing:
full_content_lower = full_content.lower()
doc_title = metadata.get('title', 'Unknown')[:50]
# Create searchable text that includes content, title, filename, and other metadata
searchable_text = full_content_lower
# Add title to searchable text
if metadata.get('title') and metadata.get('title') != 'Unknown Document':
searchable_text += " " + metadata.get('title', '').lower()
# Add parent_name (often contains filename/path info)
if metadata.get('parent_name') and metadata.get('parent_name') != 'Unknown Source':
searchable_text += " " + metadata.get('parent_name', '').lower()
# Add bibtex field (often contains filename)
if metadata.get('bibtex'):
searchable_text += " " + metadata.get('bibtex', '').lower()
# Add source information
if metadata.get('source'):
searchable_text += " " + metadata.get('source', '').lower()
# Check if document contains ALL keywords (in content or metadata)
if all(keyword.lower() in searchable_text for keyword in filter_keywords):
all_keywords_docs.append((metadata, full_content))
print(f" ā
ALL keywords found in: {doc_title}")
# Check if document contains ANY keyword (in content or metadata)
elif any(keyword.lower() in searchable_text for keyword in filter_keywords):
any_keywords_docs.append((metadata, full_content))
print(f" ā” SOME keywords found in: {doc_title}")
else:
print(f" ā NO keywords found in: {doc_title}")
# Prioritize documents with ALL keywords, then add ANY keywords if needed
keyword_filtered_documents = all_keywords_docs.copy()
# Calculate how many more documents we need
target_after_filtering = min(len(documents_for_processing), target_summary_tokens // 200) # Rough estimate
if len(keyword_filtered_documents) < target_after_filtering and any_keywords_docs:
additional_needed = target_after_filtering - len(keyword_filtered_documents)
keyword_filtered_documents.extend(any_keywords_docs[:additional_needed])
print(f" ⢠Added {min(additional_needed, len(any_keywords_docs))} documents with partial keyword matches")
print(f" š KEYWORD FILTERING RESULTS:")
print(f" ⢠Documents with ALL keywords: {len(all_keywords_docs)}")
print(f" ⢠Documents with ANY keywords: {len(any_keywords_docs)}")
print(f" ⢠Documents with NO keywords: {len(documents_for_processing) - len(all_keywords_docs) - len(any_keywords_docs)}")
print(f" ⢠Final filtered documents: {len(keyword_filtered_documents)}")
print(f" ⢠Documents removed by filtering: {len(documents_for_processing) - len(keyword_filtered_documents)}")
if len(keyword_filtered_documents) == 0:
print(f" ā KEYWORD FILTERING: No documents match keyword criteria '{', '.join(filter_keywords)}'")
print(f" š« Respecting filtering constraints - no content will be generated from this collection.")
# Don't fall back to original documents - respect the filtering
# keyword_filtered_documents remains empty
else:
if keyword_filtering_enabled:
print(f"\n ā ļø Keyword filtering enabled but conditions not met:")
print(f" ⢠Keywords available: {bool(filter_keywords)}")
print(f" ⢠Processing steps include keyword_filter: {'keyword_filter' in data.get('processing_steps', [])}")
else:
print(f"\n ā ļø Keyword filtering disabled for extensive search")
# Use keyword-filtered documents for further processing
documents_for_processing = keyword_filtered_documents
# Check if any documents remain after keyword filtering
if len(documents_for_processing) == 0:
print(f" ā No documents available for processing after keyword filtering")
print(f" š« Skipping collection '{collection_name}' - no matching content found")
print(f"\nšÆ CHROMADB COLLECTION SUMMARY:")
print(f" ⢠Collection: {collection_name}")
print(f" ⢠Mode: Keyword filtering applied")
print(f" ⢠Documents processed: 0 (filtered out by keyword criteria)")
print(f" ⢠Blocks created: 0")
print(f" ⢠Keywords required: {filter_keywords}")
print("=" * 60)
return "" # Return empty string - no content to process
# Process documents with NO citation processing in extensive search mode
comprehensive_summary, source_mapping, individual_summaries = self.extensive_search_manager.process_documents_with_source_tracking(
documents_for_processing,
query,
target_summary_tokens=target_summary_tokens,
batch_size=10, # Process 10 documents at a time with large LLM
use_inline_citations=False, # Extensive search mode: no inline citations
disable_citations=True, # Completely disable citation logic for extensive search
detail_level=detail_level
)
print(f" ⢠Comprehensive summary generated (no citation processing)")
print(f" ⢠Summary tokens: {self.count_tokens(comprehensive_summary):,}")
# APPLY REFERENCE RELEVANCE FILTERING IF ENABLED
filtered_documents_for_processing = documents_for_processing
if self.flow_control.get("enable_reference_filtering", True):
relevance_threshold = self.flow_control.get("relevance_threshold", 0.3)
print(f" šÆ Applying reference relevance filtering...")
# Score documents against the comprehensive summary
relevant_docs_with_scores = self.score_reference_relevance(
comprehensive_summary,
documents_for_processing,
relevance_threshold
)
# Extract just the metadata and content (remove scores for compatibility)
filtered_documents_for_processing = [(metadata, content) for metadata, content, score in relevant_docs_with_scores]
print(f" ⢠References before filtering: {len(documents_for_processing)}")
print(f" ⢠References after filtering: {len(filtered_documents_for_processing)}")
print(f" ⢠References removed: {len(documents_for_processing) - len(filtered_documents_for_processing)}")
else:
print(f" ā ļø Reference relevance filtering disabled")
# BUILD PROPERLY FORMATTED REFERENCE LIST FROM FILTERED DOCUMENTS
print(f" ⢠Building reference list from {len(filtered_documents_for_processing)} filtered documents...")
# Use ReferenceManager for consistent formatting
ref_manager = ReferenceManager(default_style="apa")
reference_list = []
# Create properly formatted reference entries for FILTERED documents
for i, (metadata, _) in enumerate(filtered_documents_for_processing):
ref_num = i + 1
title = metadata.get('title', f'Document {ref_num}')
source_info = metadata.get('bibtex', metadata.get('parent_name', 'Unknown Source'))
# Extract document UUID if available
doc_uuid = metadata.get('doc_uid', '')
# Format based on source type using ReferenceManager methods
if metadata.get('bibtex', '') and '@' in metadata.get('bibtex', ''):
# Literature reference - use BibTeX formatting
try:
citation_marker, reference = ref_manager.format_bibtex_reference(
metadata['bibtex'], str(ref_num), "apa"
)
# Add UUID if available for literature documents
if doc_uuid:
reference += f" [UUID: {doc_uuid}]"
reference_list.append(reference)
except Exception as e:
print(f" ā ļø Error formatting BibTeX for ref {ref_num}: {e}")
ref_text = f"[{ref_num}]: {title} (Literature)"
if doc_uuid:
ref_text += f" [UUID: {doc_uuid}]"
reference_list.append(ref_text)
elif any(ext in source_info.lower() for ext in ['.pdf', '.docx', '.pptx', '.xlsx', '.txt']):
# Document reference - use file formatting with clickable links
try:
doc_path = source_info
# Don't use title as description if it's the same as filename
filename = doc_path.split('/')[-1] if '/' in doc_path else doc_path
description = title if title != filename and title != f'Document {ref_num}' and title != 'Unknown Document' else None
citation_marker, reference = ref_manager.process_file_reference(
doc_path, str(ref_num), description
)
# Add UUID if available for document files
if doc_uuid:
reference += f" [UUID: {doc_uuid}]"
reference_list.append(reference)
except Exception as e:
print(f" ā ļø Error formatting file reference for ref {ref_num}: {e}")
doc_name = source_info.split('/')[-1] if '/' in source_info else source_info
ref_text = f"[{ref_num}]: <a href='file:///{source_info}' target='_blank'>{doc_name}</a>"
if doc_uuid:
ref_text += f" [UUID: {doc_uuid}]"
reference_list.append(ref_text)
else:
# Generic reference - basic format with collection info
ref_text = f"[{ref_num}]: {title} (Collection: {collection_name})"
if doc_uuid:
ref_text += f" [UUID: {doc_uuid}]"
reference_list.append(ref_text)
print(f" ⢠Created properly formatted reference list with {len(reference_list)} entries")
# Create the final block with summary + simple reference list
reference_section = "\n\n---\n\n**References:**\n" + "\n".join(reference_list)
final_content = comprehensive_summary + reference_section
# Add as single comprehensive block
collected_blocks.append(f"[block {self.block_counter}] {final_content}")
# Store metadata for this comprehensive block
self.blocks_dict[self.block_counter] = {
"type": "comprehensive_extensive",
"id": f"extensive_{self.block_counter}",
"content": f"Comprehensive summary from {len(documents_for_processing)} documents in ChromaDB collection '{collection_name}' (extensive search mode - filtered reference list)",
"documents_processed": len(documents_for_processing),
"documents_in_references": len(filtered_documents_for_processing),
"full_docs_retrieved": full_docs_retrieved,
"source_batches": len(source_mapping),
"reference_count": len(reference_list),
"reference_section": reference_section, # Store the reference section separately
"comprehensive_summary": comprehensive_summary, # Store the summary separately
"collection": collection_name, # Store collection name for reference
"title": f"Comprehensive Analysis - {collection_name}",
"source": f"Extensive search from ChromaDB collection '{collection_name}'",
"doc_name": f"Comprehensive Summary ({len(filtered_documents_for_processing)} documents)",
# Store individual document metadata for reference extraction
"filtered_documents": [
{
'title': metadata.get('title', 'Unknown Document'),
'source': metadata.get('bibtex', metadata.get('parent_name', 'Unknown Source')),
'doc_uuid': metadata.get('doc_uid', ''),
'content_preview': content[:200] + '...' if len(content) > 200 else content
}
for metadata, content in filtered_documents_for_processing
]
}
print(f" ā
Created single comprehensive block {self.block_counter}")
print(f" ⢠Content length: {len(final_content):,} characters")
print(f" ⢠References listed: {len(reference_list)}")
self.block_counter += 1
else:
# Standard processing without extensive search
print(f" š STANDARD PROCESSING:")
print(f" ⢠Processing {len(final_docs)} documents without extensive search")
print(f" ⢠Using document content directly (no LLM extraction)")
for i, doc in enumerate(final_docs):
print(f" Processing document {i+1}/{len(final_docs)}: {doc['metadata'].get('bibtex', doc['id'])[:60]}...")
# In standard mode, use document content directly without LLM extraction
# This avoids slow LLM calls and just uses the already-retrieved content
doc_content = doc['document']
# Optionally truncate very long documents to avoid context overflow
max_doc_length = 5000 # characters
if len(doc_content) > max_doc_length:
print(f" ā ļø Document too long ({len(doc_content)} chars), truncating to {max_doc_length}")
doc_content = doc_content[:max_doc_length] + "... [content truncated]"
else:
print(f" ā
Using full document content ({len(doc_content)} chars)")
# Add reference in formatted text
collected_blocks.append(f"[block {self.block_counter}] {doc_content}")
# Create proper blocks_dict entry with actual content
filepath = doc['metadata'].get('bibtex', doc['metadata'].get('path', ''))
if filepath and filepath.lower().endswith(('.pptx', '.docx', '.xlsx', '.pdf', '.csv', '.txt')):
self.blocks_dict[self.block_counter] = {
"type": "document",
"id": f"doc_{self.block_counter}",
"path": filepath,
"content": doc_content, # Store actual content here
"title": f"Document from ChromaDB collection '{collection_name}'",
"source": filepath,
"collection": collection_name,
"doc_name": filepath.split('/')[-1] if '/' in filepath else filepath
}
elif '@article' in filepath:
self.blocks_dict[self.block_counter] = {
"type": "literature",
"id": f"doc_{self.block_counter}",
"bibtex": filepath,
"content": doc_content, # Store actual content here
"title": f"Literature from ChromaDB collection '{collection_name}'",
"source": filepath,
"collection": collection_name,
"doc_name": "Academic Literature"
}
else:
self.blocks_dict[self.block_counter] = {
"type": "generic",
"id": f"ref_{self.block_counter}",
"content": doc_content, # Store actual content here
"title": f"Reference from ChromaDB collection '{collection_name}'",
"source": f"ChromaDB: {collection_name}",
"collection": collection_name,
"doc_name": filepath or "Unknown Document"
}
self.block_counter += 1
print(f" ā
Created {len(final_docs)} standard blocks")
print(f"\nšÆ CHROMADB COLLECTION SUMMARY:")
print(f" ⢠Collection: {collection_name}")
if enable_extensive_search:
print(f" ⢠Mode: Extensive search (single comprehensive block + simple reference list, no citations)")
print(f" ⢠Documents processed: {len(final_docs)}")
print(f" ⢠Blocks created: 1 comprehensive block")
else:
print(f" ⢠Mode: Standard processing (individual blocks with inline citations)")
print(f" ⢠Documents processed: {len(final_docs)}")
print(f" ⢠Blocks created: {len(final_docs)} individual blocks")
print(f" ⢠Total characters: {sum(len(block) for block in collected_blocks):,}")
print("=" * 60)
return "\n".join(collected_blocks)
def get_embedding(self, text):
"""Generate an embedding for the given text using OpenAI's text-embedding model."""
# Use direct client instead of module-level API to avoid ambiguity errors
from openai import OpenAI
try:
client = OpenAI(api_key=os.environ["OPENAI_API_KEY"])
response = client.embeddings.create(
model="text-embedding-3-small",
input=text
)
embedding = response.data[0].embedding
return embedding
except Exception as e:
print(f"Error generating embedding: {str(e)}")
# Return an empty embedding in case of error
return [0.0] * 1536 # Typical dimension for text-embedding-3-small
def collect_data_from_neo4j(self, data, query, doc_type="literature data", detail_level="Balanced"):
"""
Collect relevant documents from Neo4j using keyword pre-filtering and FAISS vector search
Args:
data: Dictionary containing search configuration
query: User's query text
doc_type: Type of documents to search (default: "literature data")
detail_level: Level of detail for the summary (Summary, Balanced, Detailed, Comprehensive)
Returns:
String with formatted text blocks for LLM context
"""
collected_blocks = []
# Handle query extension if needed
if "extend_query" in data["processing_steps"]:
self.extend_query(query)
self.extended_query.append(query)
else:
self.extended_query = [query]
partial_inclusions=max(2,data['inclusions']//len(self.extended_query)+1)
# Set up embeddings
embedding_function = OpenAIEmbeddings(model="text-embedding-3-small")
# Set up a collector for documents processed
docs_added = 0
#target_docs = data["inclusions"]
self.litt_nodes_cache = None
self.litt_VS_cache = None
# Process each query (original and any extensions)
for q in self.extended_query:
print(f"Working on query: {q}")
# First, get embeddings for the query
query_embedding = self.get_embedding(q)
# Process Text_chunks
text_chunks_processed = self._process_node_type(
data['data'], # Base Cypher filter
query_embedding,
q,
"Text_chunk",
"Text",
partial_inclusions,
embedding_function,
doc_type
)
if text_chunks_processed:
collected_blocks.extend(text_chunks_processed["blocks"])
docs_added += len(text_chunks_processed["blocks"])
print(f"Added {len(text_chunks_processed.get('blocks', []))} text chunks")
# If we still need more documents, try Table_chunks
# if docs_added < target_docs:
# table_chunks_processed = self._process_node_type(
# data['data'], # Base Cypher filter
# query_embedding,
# q,
# "Table_chunk",
# "Html", # Use Html for tables
# initial_k,
# embedding_function,
# doc_type,
# target_docs - docs_added
# )
# if table_chunks_processed:
# collected_blocks.extend(table_chunks_processed["blocks"])
# docs_added += len(table_chunks_processed["blocks"])
# print(f"Added {len(table_chunks_processed.get('blocks', []))} table chunks")
# # If we have enough documents, stop processing queries
# if docs_added >= target_docs:
# break
print(f"Total blocks added: {len(collected_blocks)}")
return "\n".join(collected_blocks)
def collect_data_from_neo4j_new(self, data, query, doc_type="literature data", detail_level="Balanced"):
"""
Collect relevant documents from Neo4j using optimized workflow:
1) Combine results from all extended queries
2) Apply keyword filters across all results
3) Remove similar documents and apply cross-encoder if requested
4) Evaluate against target and add additional documents as needed
Args:
data: Dictionary containing search configuration
query: User's query text
doc_type: Type of documents to search (default: "literature data")
detail_level: Level of detail for the summary (Summary, Balanced, Detailed, Comprehensive)
Returns:
String with formatted text blocks for LLM context
"""
print("\n" + "=" * 60)
print(f"š NEO4J LITERATURE SEARCH: {doc_type}")
print("=" * 60)
# Clear cache for new query
self.litt_nodes_cache = None
self.litt_VS_cache = None
collected_blocks = []
# Configure retrieval parameters
use_crossencoder = "crossencoder" in data["processing_steps"]
target_docs = data["inclusions"]
initial_k = target_docs * 10 if use_crossencoder else target_docs * 3
print(f"š Retrieval Configuration:")
print(f" ⢠Target documents: {target_docs}")
print(f" ⢠Initial retrieval (k): {initial_k}")
print(f" ⢠Cross-encoder enabled: {use_crossencoder}")
print(f" ⢠Processing steps: {data['processing_steps']}")
# Handle query extension if needed
if "extend_query" in data["processing_steps"]:
print("š Extending query with additional variations...")
self.extend_query(query)
self.extended_query.append(query)
print(f"ā
Query extension complete. Total queries: {len(self.extended_query)}")
else:
self.extended_query = [query]
print(f"š Using single query (no extension)")
# Set up embeddings for vector search
embedding_function = OpenAIEmbeddings(model="text-embedding-3-small")
# STEP 1: Get query embedding for vector search
query_embedding = self.get_embedding(query)
# STEP 2: Retrieve nodes from Neo4j for all queries together
# This retrieval is based solely on cypher queries from all extended queries
all_nodes = []
all_node_ids = set() # Track retrieved node IDs to avoid duplicates
print(f"Retrieving Neo4j nodes")
# Extract keywords for this query to use in Neo4j filtering
filter_keywords = []
if "keyword_filter" in data.get("processing_steps", []):
keyword_filtering_enabled = self.flow_control.get("enable_keyword_filtering", False)
if keyword_filtering_enabled:
# Check for manual keywords first
manual_keywords_str = self.flow_control.get("manual_keywords", "").strip()
if manual_keywords_str:
# Scenario 1: Manual keywords provided - use ONLY those
manual_keywords = [kw.strip() for kw in manual_keywords_str.split(',') if kw.strip()]
filter_keywords = manual_keywords
print(f"Using manual keywords for Neo4j: {filter_keywords}")
else:
# Scenario 2: No manual keywords - use automatic LLM extraction
filter_keywords = self.extract_filter_keywords(q, n_keywords=3)
print(f"Using auto-extracted keywords for Neo4j: {filter_keywords}")
else:
print("Keyword filtering disabled - no keyword filtering for Neo4j")
else:
print("Keyword filter not in processing steps - no keyword filtering for Neo4j")
# Process base query based on type
if isinstance(data['data'], list):
# Multiple query variants
for query_variant in data['data']:
nodes = self._fetch_neo4j_nodes(
query_variant,
filter_keywords,
"Text_chunk",
"Text",
initial_k
)
# Add unique nodes to collection
for node in nodes:
if node["uid"] not in all_node_ids:
all_node_ids.add(node["uid"])
all_nodes.append(node)
else:
# Single query string
nodes = self._fetch_neo4j_nodes(
data['data'],
filter_keywords,
"Text_chunk",
"Text",
initial_k
)
# Add unique nodes to collection
for node in nodes:
if node["uid"] not in all_node_ids:
all_node_ids.add(node["uid"])
all_nodes.append(node)
print(f"Retrieved {len(all_nodes)} unique nodes from Neo4j")
# Cache all retrieved nodes
self.litt_nodes_cache = all_nodes
for q in self.extended_query:
# STEP 3: Filter nodes by basic criteria (length, token count)
min_doc_length = 30
max_token_length = 15000 # Avoid very long documents
filtered_nodes = [
node for node in all_nodes
if node["content"] and len(node["content"]) >= min_doc_length ]
print(f"Have {len(filtered_nodes)} nodes after basic filtering")
# If no filtered nodes, return empty result
if not filtered_nodes:
return ""
# STEP 4: Apply vector search with similarity threshold to remove near-duplicates
selected_nodes = []
selected_embeddings = []
similarity_threshold = 0.95
# First generate embeddings for all filtered nodes
node_embeddings = []
for node in filtered_nodes:
try:
content = node["content"]
if self.count_tokens(content) > 8192:
# Summarize very long content
from openai import OpenAI
client = OpenAI(api_key=os.environ["OPENAI_API_KEY"])
summarized = self.extract_for_queries(content, self.extended_query[:1])
embedding = client.embeddings.create(
model="text-embedding-3-small",
input=summarized
).data[0].embedding
else:
# Use original content for shorter texts
embedding = self.get_embedding(content)
node_embeddings.append((node, embedding))
except Exception as e:
print(f"Error generating embedding for node {node['uid']}: {str(e)}")
# Skip this node
continue
# Apply similarity filtering
query_embedding_array = np.array(query_embedding)
for node, embedding in node_embeddings:
embedding_array = np.array(embedding)
# Normalize embeddings
norm = np.linalg.norm(embedding_array)
if norm > 0:
embedding_array = embedding_array / norm
# Check if too similar to any already selected node
is_redundant = False
for sel_emb in selected_embeddings:
similarity = np.dot(embedding_array, sel_emb)
if similarity >= similarity_threshold:
is_redundant = True
break
if not is_redundant:
selected_nodes.append(node)
selected_embeddings.append(embedding_array)
print(f"Selected {len(selected_nodes)} nodes after similarity filtering")
# STEP 5: Apply cross-encoder reranking if requested
final_nodes = []
if use_crossencoder and len(selected_nodes) > target_docs:
print("Applying cross-encoder reranking")
from sentence_transformers import CrossEncoder
cross_model = CrossEncoder('BAAI/bge-reranker-base')
# Create query-document pairs for the reranker
query_doc_pairs = [(query, node["content"]) for node in selected_nodes]
scores = cross_model.predict(query_doc_pairs)
# Sort by score (highest first)
node_score_pairs = list(zip(selected_nodes, scores))
ranked_nodes = sorted(node_score_pairs, key=lambda x: x[1], reverse=True)
# Select top nodes after reranking
final_nodes = [node for node, _ in ranked_nodes[:target_docs]]
else:
# If not using cross-encoder or don't have enough docs, use all selected docs
final_nodes = selected_nodes[:target_docs]
# STEP 6: Process final nodes into blocks
print(f"Processing final {len(final_nodes)} nodes into blocks")
# Check if extensive search is enabled
enable_extensive_search = self.flow_control.get("enable_extensive_search", False)
extensive_search_chunks = self.flow_control.get("extensive_search_chunks", 100)
if enable_extensive_search and self.extensive_search_manager:
print(f"š Extensive search enabled for Neo4j - processing {len(final_nodes)} nodes with full document retrieval")
# If extensive search requested, retrieve more chunks
if len(final_nodes) < extensive_search_chunks:
print(f"Retrieving additional chunks for extensive search (target: {extensive_search_chunks})")
# Get more nodes for extensive search
additional_needed = extensive_search_chunks - len(final_nodes)
remaining_nodes = [node for node in selected_nodes if node["uid"] not in [n["uid"] for n in final_nodes]]
final_nodes.extend(remaining_nodes[:additional_needed])
# Process nodes with full document retrieval and summarization
documents_with_full_context = []
for node in final_nodes:
uid = node["uid"]
# Try to get full document from Neo4j
full_document = self.extensive_search_manager.get_full_document_neo4j(uid)
if full_document:
# Summarize the full document with query context
summarized_doc = self.extensive_search_manager.summarize_document(
full_document,
query,
max_tokens=1500
)
# Create metadata-like dict for compatibility
metadata = {
'title': node.get("parent_name", "Unknown Document"),
'parent_name': node["parent_name"],
'bibtex': node["bibtex"],
'uid': uid,
'node_type': node["node_type"]
}
documents_with_full_context.append((metadata, summarized_doc))
else:
# Fallback to original chunk content
metadata = {
'title': node.get("parent_name", "Unknown Document"),
'parent_name': node["parent_name"],
'bibtex': node["bibtex"],
'uid': uid,
'node_type': node["node_type"]
}
documents_with_full_context.append((metadata, node["content"]))
# Remove duplicate documents based on content similarity
unique_documents = self.extensive_search_manager.remove_duplicate_documents(
documents_with_full_context,
similarity_threshold=0.85
)
print(f"After deduplication: {len(unique_documents)} unique documents")
# APPLY KEYWORD FILTERING ON FULL DOCUMENTS FOR EXTENSIVE SEARCH MODE (NEO4J)
keyword_filtered_unique_documents = unique_documents
keyword_filtering_enabled = self.flow_control.get("enable_keyword_filtering", False)
# Extract filter keywords for Neo4j (same logic as ChromaDB section)
filter_keywords = []
if keyword_filtering_enabled:
# Check for manual keywords first
manual_keywords_str = self.flow_control.get("manual_keywords", "").strip()
if manual_keywords_str:
# Scenario 1: Manual keywords provided - use ONLY those
manual_keywords = [kw.strip() for kw in manual_keywords_str.split(',') if kw.strip()]
filter_keywords = manual_keywords
else:
# Scenario 2: No manual keywords - use automatic LLM extraction
filter_keywords = self.extract_filter_keywords(query)
if keyword_filtering_enabled and filter_keywords and "keyword_filter" in data.get("processing_steps", []):
print(f"\n š APPLYING KEYWORD FILTERING TO FULL DOCUMENTS (NEO4J):")
print(f" ⢠Filter keywords: {filter_keywords}")
print(f" ⢠Evaluating {len(unique_documents)} full documents")
# Apply keyword filtering to full document content
all_keywords_docs = []
any_keywords_docs = []
for metadata, full_content in unique_documents:
full_content_lower = full_content.lower()
doc_title = metadata.get('title', metadata.get('parent_name', 'Unknown'))[:50]
# Create searchable text that includes content, title, filename, and other metadata
searchable_text = full_content_lower
# Add title to searchable text
if metadata.get('title') and metadata.get('title') != 'Unknown Document':
searchable_text += " " + metadata.get('title', '').lower()
# Add parent_name (often contains filename/path info)
if metadata.get('parent_name') and metadata.get('parent_name') != 'Unknown Source':
searchable_text += " " + metadata.get('parent_name', '').lower()
# Add bibtex field (often contains filename)
if metadata.get('bibtex'):
searchable_text += " " + metadata.get('bibtex', '').lower()
# Add source information
if metadata.get('source'):
searchable_text += " " + metadata.get('source', '').lower()
# Check if document contains ALL keywords (in content or metadata)
if all(keyword.lower() in searchable_text for keyword in filter_keywords):
all_keywords_docs.append((metadata, full_content))
print(f" ā
ALL keywords found in: {doc_title}")
# Check if document contains ANY keyword (in content or metadata)
elif any(keyword.lower() in searchable_text for keyword in filter_keywords):
any_keywords_docs.append((metadata, full_content))
print(f" ā” SOME keywords found in: {doc_title}")
else:
print(f" ā NO keywords found in: {doc_title}")
# Prioritize documents with ALL keywords, then add ANY keywords if needed
keyword_filtered_unique_documents = all_keywords_docs.copy()
# Calculate how many more documents we need
target_after_filtering = min(len(unique_documents), target_summary_tokens // 200) # Rough estimate
if len(keyword_filtered_unique_documents) < target_after_filtering and any_keywords_docs:
additional_needed = target_after_filtering - len(keyword_filtered_unique_documents)
keyword_filtered_unique_documents.extend(any_keywords_docs[:additional_needed])
print(f" ⢠Added {min(additional_needed, len(any_keywords_docs))} documents with partial keyword matches")
print(f" š KEYWORD FILTERING RESULTS (NEO4J):")
print(f" ⢠Documents with ALL keywords: {len(all_keywords_docs)}")
print(f" ⢠Documents with ANY keywords: {len(any_keywords_docs)}")
print(f" ⢠Documents with NO keywords: {len(unique_documents) - len(all_keywords_docs) - len(any_keywords_docs)}")
print(f" ⢠Final filtered documents: {len(keyword_filtered_unique_documents)}")
print(f" ⢠Documents removed by filtering: {len(unique_documents) - len(keyword_filtered_unique_documents)}")
if len(keyword_filtered_unique_documents) == 0:
print(f" ā KEYWORD FILTERING (NEO4J): No documents match keyword criteria '{', '.join(filter_keywords)}'")
print(f" š« Respecting filtering constraints - no content will be generated from Neo4j.")
# Don't fall back to original documents - respect the filtering
# keyword_filtered_unique_documents remains empty
else:
if keyword_filtering_enabled:
print(f"\n ā ļø Keyword filtering enabled but conditions not met for Neo4j:")
print(f" ⢠Keywords available: {bool(filter_keywords)}")
print(f" ⢠Processing steps include keyword_filter: {'keyword_filter' in data.get('processing_steps', [])}")
else:
print(f"\n ā ļø Keyword filtering disabled for Neo4j extensive search")
# Use keyword-filtered documents for further processing
unique_documents = keyword_filtered_unique_documents
# Check if any documents remain after keyword filtering
if len(unique_documents) == 0:
print(f" ā No documents available for processing after keyword filtering (Neo4j)")
print(f" š« Skipping Neo4j collection - no matching content found")
print(f"\nšÆ NEO4J COLLECTION SUMMARY:")
print(f" ⢠Collection: Neo4j literature data")
print(f" ⢠Mode: Keyword filtering applied")
print(f" ⢠Documents processed: 0 (filtered out by keyword criteria)")
print(f" ⢠Blocks created: 0")
print(f" ⢠Keywords required: {filter_keywords}")
print("=" * 60)
return "" # Return empty string - no content to process
# Process with extensive search manager (no citation processing in extensive search mode)
target_summary_tokens = self.flow_control.get("target_summary_tokens", 8000)
comprehensive_summary, source_mapping, individual_summaries = self.extensive_search_manager.process_documents_with_source_tracking(
unique_documents,
query,
target_summary_tokens=target_summary_tokens,
batch_size=10,
use_inline_citations=False, # Extensive search mode: no inline citations
disable_citations=True, # Completely disable citation logic for extensive search
detail_level=detail_level
)
print(f" ⢠Comprehensive summary generated (no citation processing)")
print(f" ⢠Summary tokens: {self.count_tokens(comprehensive_summary):,}")
# APPLY REFERENCE RELEVANCE FILTERING IF ENABLED
filtered_unique_documents = unique_documents
if self.flow_control.get("enable_reference_filtering", True):
relevance_threshold = self.flow_control.get("relevance_threshold", 0.3)
print(f" šÆ Applying reference relevance filtering...")
# Score documents against the comprehensive summary
relevant_docs_with_scores = self.score_reference_relevance(
comprehensive_summary,
unique_documents,
relevance_threshold
)
# Extract just the metadata and content (remove scores for compatibility)
filtered_unique_documents = [(metadata, content) for metadata, content, score in relevant_docs_with_scores]
print(f" ⢠References before filtering: {len(unique_documents)}")
print(f" ⢠References after filtering: {len(filtered_unique_documents)}")
print(f" ⢠References removed: {len(unique_documents) - len(filtered_unique_documents)}")
else:
print(f" ā ļø Reference relevance filtering disabled")
# BUILD PROPERLY FORMATTED REFERENCE LIST FROM FILTERED DOCUMENTS (no citation extraction)
print(f" ⢠Building reference list from {len(filtered_unique_documents)} filtered documents...")
# Use ReferenceManager for consistent formatting
ref_manager = ReferenceManager(default_style="apa")
reference_list = []
# Create properly formatted reference entries for FILTERED documents
for i, (metadata, _) in enumerate(filtered_unique_documents):
ref_num = i + 1
title = metadata.get('title', metadata.get('parent_name', f'Document {ref_num}'))
bibtex = metadata.get('bibtex', '')
parent_name = metadata.get('parent_name', '')
# Improve title extraction - use filename if title is generic or same as parent_name
if title == f'Document {ref_num}' or title == parent_name:
if parent_name:
# Use filename as title
filename = parent_name.split('/')[-1] if '/' in parent_name else parent_name
title = filename
# Format based on source type using ReferenceManager methods
if bibtex and '@' in bibtex:
# Literature reference - use BibTeX formatting
try:
citation_marker, reference = ref_manager.format_bibtex_reference(
bibtex, str(ref_num), "apa"
)
reference_list.append(reference)
except Exception as e:
print(f" ā ļø Error formatting BibTeX for ref {ref_num}: {e}")
reference_list.append(f"[{ref_num}]: {title} (Literature)")
elif parent_name and any(ext in parent_name.lower() for ext in ['.pdf', '.docx', '.pptx', '.xlsx', '.txt']):
# Document reference - use file formatting with clickable links
try:
# Don't use title as description if it's the same as filename
filename = parent_name.split('/')[-1] if '/' in parent_name else parent_name
description = title if title != filename and title != f'Document {ref_num}' else None
citation_marker, reference = ref_manager.process_file_reference(
parent_name, str(ref_num), description
)
reference_list.append(reference)
except Exception as e:
print(f" ā ļø Error formatting file reference for ref {ref_num}: {e}")
reference_list.append(f"[{ref_num}]: <a href='file:///{parent_name}' target='_blank'>{parent_name}</a>")
else:
# Neo4j document reference - basic format
reference_list.append(f"[{ref_num}]: {title} (Neo4j document)")
print(f" ⢠Created properly formatted reference list with {len(reference_list)} entries")
# Create the final block with summary + simple reference list
reference_section = "\n\n---\n\n**References:**\n" + "\n".join(reference_list)
final_content = comprehensive_summary + reference_section
# Add as single comprehensive block
collected_blocks.append(f"[block {self.block_counter}] {final_content}")
# Store metadata for this comprehensive block
self.blocks_dict[self.block_counter] = {
"type": "comprehensive_extensive_neo4j",
"id": f"extensive_neo4j_{self.block_counter}",
"content": f"Comprehensive summary from {len(unique_documents)} documents in Neo4j (extensive search mode - filtered reference list)",
"documents_processed": len(unique_documents),
"documents_in_references": len(filtered_unique_documents),
"source_batches": len(source_mapping),
"reference_count": len(reference_list),
"reference_section": reference_section, # Store the reference section separately
"comprehensive_summary": comprehensive_summary # Store the summary separately
}
print(f"ā
Created single comprehensive Neo4j block {self.block_counter}")
print(f"⢠Content length: {len(final_content):,} characters")
print(f"⢠References listed: {len(reference_list)}")
self.block_counter += 1
else:
# Standard processing without extensive search
for node in final_nodes:
content = node["content"]
uid = node["uid"]
parent_name = node["parent_name"]
bibtex = node["bibtex"]
node_type = node["node_type"]
# Extract the most relevant content using query-based extractor
extracted_content = self.extract_for_queries(content, self.extended_query)
# Add reference in formatted text
block_text = f"[block {self.block_counter}] {extracted_content}"
collected_blocks.append(block_text)
# Create reference entry
if bibtex and '@' in bibtex:
# Literature reference with BibTeX
self.blocks_dict[self.block_counter] = {
"type": "literature",
"id": f"lit_{self.block_counter}",
"bibtex": bibtex,
"content": f"Neo4j literature: {parent_name}"
}
else:
# Generic document reference
self.blocks_dict[self.block_counter] = {
"type": "generic",
"id": f"doc_{self.block_counter}",
"content": f"Document: {parent_name}"
}
# Increment block counter
self.block_counter += 1
print(f"\nšÆ NEO4J LITERATURE SEARCH SUMMARY:")
if enable_extensive_search:
print(f" ⢠Mode: Extensive search (single comprehensive block + reference list)")
print(f" ⢠Documents processed: {len(final_nodes)}")
print(f" ⢠Blocks created: 1 comprehensive block")
else:
print(f" ⢠Mode: Standard processing (individual blocks with inline citations)")
print(f" ⢠Documents processed: {len(final_nodes)}")
print(f" ⢠Blocks created: {len(final_nodes)} individual blocks")
print(f" ⢠Total characters: {sum(len(block) for block in collected_blocks):,}")
print("=" * 60)
return "\n".join(collected_blocks)
def _fetch_neo4j_nodes(self, base_query, keywords, node_type, content_field, limit):
"""
Helper method to fetch nodes from Neo4j with keyword filtering
Args:
base_query: Base cypher query for filtering
keywords: Keywords for filtering results
node_type: Type of node to fetch
content_field: Field containing the content
limit: Maximum number of nodes to retrieve
Returns:
List of node dictionaries
"""
# Construct keyword clause for filtering
keyword_clauses = []
for keyword in keywords:
# Escape single quotes in keywords
safe_keyword = keyword.replace("'", "\\'")
keyword_clauses.append(f"x.{content_field} CONTAINS '{safe_keyword}'")
# Combine keyword clauses with OR
if keyword_clauses==[]:
keyword_filter = None
else:
keyword_filter = " OR ".join(keyword_clauses)
# Construct the final query with keyword filtering
if keyword_filter:
if "WHERE" in base_query or "where" in base_query:
# Add to existing WHERE clause
query_with_keywords = base_query.replace("where","WHERE").replace("WHERE", f"WHERE ({keyword_filter}) AND ")
else:
# Add new WHERE clause
query_with_keywords = f"{base_query} WHERE {keyword_filter}"
else:
# No keywords, use original query
query_with_keywords = base_query
# Complete the query to fetch publications, documents, and other metadata
cypher_query = f"""
{query_with_keywords}
OPTIONAL MATCH (p:Publication)-->(x)
OPTIONAL MATCH (d:Document)-[:CHUNK]->(x)
RETURN x.UID AS uid,
x.{content_field} AS content,
p.Name AS parent_name,
p.BibTex AS bibtex,
d.UID AS doc_uid,
d.Name AS doc_name
LIMIT {limit}
"""
# Execute query and collect results
results = self.session.run(cypher_query)
nodes = []
for record in results:
node_data = {
"uid": record["uid"],
"content": record["content"],
"parent_name": record["parent_name"],
"bibtex": record["bibtex"],
"node_type": node_type
}
# Add document information if available
if record["doc_uid"]:
node_data["doc_uid"] = record["doc_uid"]
node_data["doc_name"] = record["doc_name"]
nodes.append(node_data)
return nodes
def _process_node_type(self, base_query, query_embedding, query_text, node_type, content_field, k, embedding_function, doc_type):
"""
Helper method to process a specific node type with FAISS vector search
Args:
base_query: Base cypher query for pre-filtering
query_embedding: Embedding vector for the query
query_text: Text of the query for cross-encoder ranking
node_type: Type of node to process (Text_chunk or Table_chunk)
content_field: Field containing the node content
k: Number of results to retrieve
embedding_function: Function to generate embeddings
doc_type: Type of document
max_results: Maximum number of results to return
Returns:
Dictionary with blocks added and other metadata
"""
import numpy as np
import faiss
from sentence_transformers import CrossEncoder
processing_steps = self.data_handles.handlers.get(doc_type, {}).get("processing_steps", [])
if "crossencoder" in processing_steps:
k_initial=k*10
else:
k_initial=k
# Step 1: Fetch pre-filtered nodes from Neo4j without vector search
# Instead of using the raw base_query, let's parse it properly
if not self.litt_nodes_cache:
if isinstance(base_query, list):
# If base_query is a list of query strings, run them separately
all_nodes = []
for query_variant in base_query:
# Use MATCH pattern instead of directly inserting raw query
cypher_query = f"""
{query_variant}
WITH x, count(distinct k) as keyword_count
ORDER BY keyword_count DESC
LIMIT {k_initial * 10}
MATCH (p:Publication)-->(x)
RETURN x.UID AS uid,
x.{content_field} AS content,
p.Name AS parent_name,
p.BibTex AS bibtex
"""
results = self.session.run(cypher_query)
for record in results:
all_nodes.append({
"uid": record["uid"],
"content": record["content"],
"parent_name": record["parent_name"],
"bibtex": record["bibtex"],
"node_type": node_type
})
nodes = all_nodes
else:
# For string base_query, use it as a filter
cypher_query = f"""
{base_query}
MATCH (p:Publication)-->(x)
RETURN x.UID AS uid,
x.{content_field} AS content,
p.Name AS parent_name,
p.BibTex AS bibtex
LIMIT {k_initial * 10}
"""
results = self.session.run(cypher_query)
nodes = []
for record in results:
nodes.append({
"uid": record["uid"],
"content": record["content"],
"parent_name": record["parent_name"],
"bibtex": record["bibtex"],
"node_type": node_type
})
self.litt_nodes_cache = nodes
else:
nodes = self.litt_nodes_cache
# Rest of the method remains the same...
contents = [node["content"] for node in nodes if node["content"] and len(node["content"]) >= 30]
metadata = [{
"uid": node["uid"],
"parent_name": node["parent_name"],
"bibtex": node["bibtex"],
"node_type": node["node_type"]
} for node in nodes if node["content"] and len(node["content"]) >= 30]
# If we didn't find any nodes, return empty result
if not contents:
return {"blocks": [], "count": 0}
# Continue with the rest of the method...
if len(contents) > 0:
try:
if not(self.litt_VS_cache):
# Use a direct OpenAI client without module-level API to avoid ambiguity
from openai import OpenAI
client = OpenAI(api_key=os.environ["OPENAI_API_KEY"])
# Generate embeddings for all contents
content_embeddings = []
for content in contents:
try:
if self.count_tokens(content) > 8192:
# Summarize for very long content
content = self.summarize_text(content, os.environ["OPENAI_API_KEY"])
# Use direct client instead of module-level functions
response = client.embeddings.create(
model="text-embedding-3-small",
input=content
)
embedding = response.data[0].embedding
content_embeddings.append(embedding)
except Exception as e:
print(f"Error generating embedding: {str(e)}")
content_embeddings.append([0.0] * len(query_embedding)) # Empty embedding
# Create FAISS index
dimension = len(query_embedding)
index = faiss.IndexFlatL2(dimension)
# Add embeddings to index
if content_embeddings:
index.add(np.array(content_embeddings, dtype=np.float32))
self.litt_VS_cache = index
else:
index=self.litt_VS_cache
try:
# Search for similar vectors
D, I = index.search(np.array([query_embedding], dtype=np.float32), k_initial)
# Get the most similar nodes
similar_indices = I[0]
similar_nodes = [nodes[idx] for idx in similar_indices if idx < len(nodes)]
# Step 3: Apply cross-encoder reranking if needed
processing_steps = self.data_handles.handlers.get(doc_type, {}).get("processing_steps", [])
if "crossencoder" in processing_steps:
print("Applying cross-encoder reranking")
cross_model = CrossEncoder('BAAI/bge-reranker-base')
# Prepare document pairs for reranking
query_chunk_pairs = [(query_text, node["content"]) for node in similar_nodes]
scores = cross_model.predict(query_chunk_pairs)
# Combine nodes with their scores
node_score_pairs = list(zip(similar_nodes, scores))
# Sort by score (highest first)
ranked_nodes = sorted(node_score_pairs, key=lambda x: x[1], reverse=True)
# Take top nodes after reranking
top_nodes = [node for node, _ in ranked_nodes[:k]]
else:
# Just limit the number if no reranking
top_nodes = similar_nodes[:k]
# Step 4: Format the results
blocks = []
for i, node in enumerate(top_nodes):
content = node["content"]
uid = node["uid"]
parent_name = node["parent_name"]
bibtex = node["bibtex"]
node_type = node["node_type"]
# Format the content block
content = self.extract_for_queries(content, self.extended_query)
block_text = f"[block {self.block_counter}] {content}"
blocks.append(block_text)
# Create reference entry
if bibtex and '@' in bibtex:
# Literature reference with BibTeX
self.blocks_dict[self.block_counter] = {
"type": "literature",
"id": f"lit_{self.block_counter}",
"bibtex": bibtex,
"content": f"Neo4j literature: {parent_name}"
}
else:
# Generic document reference
self.blocks_dict[self.block_counter] = {
"type": "generic",
"id": f"doc_{self.block_counter}",
"content": f"Document: {parent_name}"
}
# Increment block counter
self.block_counter += 1
return {"blocks": blocks, "count": len(blocks)}
except Exception as e:
print(f"Error processing block results: {str(e)}")
return {"blocks": [], "count": 0}
except Exception as e:
print(f"Error in FAISS processing: {str(e)}")
return {"blocks": [], "count": 0}
def collect_text_blocks(self, data,query):
embedding_function = OpenAIEmbeddings()
if "crossencoder" in data["processing_steps"]:
initial_k=data["inclusions"]*10
else:
initial_k=data["inclusions"]
if "extend_query" in data["processing_steps"]:
self.extend_query(query)
self.extended_query.append(query)
else:
self.extended_query=[query]
## First step is alway a similarity search
collected_blocks = []
retriever = data['data'].as_retriever(
search_type="similarity",
search_kwargs={"k": initial_k*3}
)
for q in self.extended_query:
print("working on query ",q)
retrieved_docs = retriever.invoke(q)
retrieved_texts = [doc.page_content for doc in retrieved_docs if len(doc.page_content)>30]
# Here we recompute embeddings for each candidate document.
candidate_embeddings = np.array([self.normalize(embedding_function.embed_query(doc))
for doc in retrieved_texts])
# Compute and normalize the query embedding
query_embedding = self.normalize(np.array(embedding_function.embed_query(q)))
# 4. Run MMR to select a diverse subset of documents
print("running MMR")
#retrieved_texts = self.mmr(query_embedding, candidate_embeddings, retrieved_texts, lambda_param=0.5, top_k=initial_k)
retrieved_texts=self.similarity_threshold_filter(query_embedding, candidate_embeddings, retrieved_texts, similarity_threshold=0.95,top_k=initial_k)
## If crossencoder is used, we need to rerank the results
if "crossencoder" in data["processing_steps"]:
cross_model = CrossEncoder('BAAI/bge-reranker-base')
query_chunk_pairs = [(q, chunk) for chunk in retrieved_texts]
scores = cross_model.predict(query_chunk_pairs)
chunk_score_pairs = list(zip(retrieved_texts, scores))
ranked_chunks = sorted(chunk_score_pairs, key=lambda x: x[1], reverse=True)
retrieved_texts = [chunk for chunk, score in ranked_chunks[:data["inclusions"]//2]]
#print("blocks from ",q," \n","\n".join(retrieved_texts))
for block in retrieved_texts:
collected_blocks.append("[block "+str(self.block_counter)+"] "+block)
self.inline_refs['block '+str(self.block_counter)]='VStore Block '+str(self.block_counter)
self.block_counter+=1
return "\n".join(collected_blocks)
def generate_prompt(self,template,data_sections,query):
prompt_template=my_prompt_templates.get(template,'')
if prompt_template=="":
prompt_template=my_prompt_templates.get("Vaccine_base",'')
prompt=prompt_template["Instructions"]+"\n"
# Add detailed instructions if provided
detailed_instructions = self.flow_control.get("detailed_instructions", "").strip()
if detailed_instructions:
prompt += "\n[Additional Detailed Instructions]:\n" + detailed_instructions + "\n"
i=0
for i, key in enumerate(data_sections.keys()):
prompt=prompt+"Step "+str(i+1)+" on section labeled [" +key+"]: "+self.data_handles.handlers[key]['instructions']+ "\n"
if self.flow_control.get("enable_web_search", False):
prompt=prompt+"Step "+str(i+2)+" on section labeled [web search results] : Provide a summary of the given context data extracted from the web, using summary tables when possible.\n"
if self.flow_control["enable_memory"]:
prompt=prompt+"Step "+str(i+3)+" on section labeled [previous chats] : Also take into account your previous answers.\n"
prompt=prompt+prompt_template["Output Constraints"]+"\n\n"
i=0
for i, key in enumerate(data_sections.keys()):
prompt=prompt+"Data section "+str(i+1)+"- [" +key+"]\n"+data_sections[key]+ "\n"
if self.flow_control.get("enable_web_search", False):
prompt=prompt+"Data section "+str(i+2)+"- [web search results] \n"+self.search_results+ "\n"
if self.flow_control["enable_memory"]:
formatted_history = self.chat_memory.get_formatted_history()
prompt=prompt+"Data section "+str(i+3)+"- [previous chats] \n"+formatted_history+ "\n"
print(f"š Debug - Main LLM prompt includes chat history: {len(formatted_history)} characters")
else:
print(f"š Debug - Memory disabled, no chat history in main LLM prompt")
prompt=prompt+"User query: "+query
return prompt
def extend_query(self,query):
print(f"š Extending query with additional variations...")
print(f" š¤ Using small LLM (gpt-4o-mini) for query expansion")
self.small_llm_usage["query_expansion"] += 1
self.small_llm_usage["total_calls"] += 1
llm = ChatOpenAI(model_name="gpt-4o-mini", temperature=0)
# Debug chat memory state
memory_enabled = self.flow_control.get("enable_memory", False)
has_chat_memory = hasattr(self, 'chat_memory') and self.chat_memory is not None
memory_messages_count = len(self.chat_memory.messages) if has_chat_memory else 0
print(f" š Debug - Memory enabled: {memory_enabled}")
print(f" š Debug - Has chat_memory: {has_chat_memory}")
print(f" š Debug - Memory messages count: {memory_messages_count}")
# Check if memory is enabled and we have chat history
has_history = memory_enabled and has_chat_memory and memory_messages_count > 0
if has_history:
print(f" š Including chat history context ({len(self.chat_memory.messages)//2} previous exchanges)")
# Get formatted chat history
chat_history = self.chat_memory.get_formatted_history()
prompt = f"""
You are an AI that enhances a given user search query to improve information retrieval, taking into account the conversation history.
### Previous Conversation History:
{chat_history}
### Current User Query:
{query}
### Instructions:
- Analyze the conversation history to understand the context and evolution of the discussion.
- The current query may be a follow-up question that refers to previous topics, entities, or concepts discussed.
- Provide exactly 5 expanded queries that incorporate relevant context from the conversation history.
- Each query should explore a different aspect or perspective, using the conversation context to clarify ambiguous references.
- If the current query contains pronouns (it, this, that, they) or short references, expand them using context from the history.
- If the query is related to previous questions, create variations that make the connection explicit.
- Use synonyms, related terms, or rephrased versions while maintaining the conversational context.
- Ensure that the expanded queries are self-contained and coherent even without the conversation history.
- Avoid generating queries that are too similar to each other.
- ONLY return the text of the expanded queries.
### Expanded Queries:
"""
else:
print(f" š No chat history available - using standard query expansion")
prompt = f"""
You are an AI that enhances a given user search query to improve information retrieval.
### User Query:
{query}
### Instructions:
- Provide exactly 5 expanded queries.
- Each query should explore a different aspect or perspective of the original query.
- Use synonyms, related terms, or rephrased versions to cover various dimensions of the topic.
- Ensure that the expanded queries are relevant and coherent with the original query.
- Avoid generating queries that are too similar to each other.
- ONLY return the text of the expanded queries.
### Expanded Queries:
"""
answer = llm.invoke(prompt)
self.extended_query=[x for x in answer.content.strip().split("\n") if x != ""]
if has_history:
print(f"ā
Context-aware query extension complete. Total queries: {len(self.extended_query) + 1}")
else:
print(f"ā
Query extension complete. Total queries: {len(self.extended_query) + 1}") # +1 for original
return
def normalize(self,vector):
return vector / np.linalg.norm(vector)
def mmr(self,query_embedding, candidate_embeddings, candidate_docs, lambda_param=0.7, top_k=5):
# Compute similarity between the query and each candidate (dot product assumes normalized vectors)
candidate_similarities = np.dot(candidate_embeddings, query_embedding)
# Initialize selected and remaining indices
selected_indices = []
candidate_indices = list(range(len(candidate_docs)))
# First selection: candidate with highest similarity
first_idx = int(np.argmax(candidate_similarities))
selected_indices.append(first_idx)
candidate_indices.remove(first_idx)
# Iteratively select documents that balance relevance and diversity
while len(selected_indices) < top_k and candidate_indices:
best_score = -np.inf
best_idx = None
for idx in candidate_indices:
# Relevance score for candidate idx
relevance = candidate_similarities[idx]
# Diversity score: maximum similarity with any already selected document
diversity = max(np.dot(candidate_embeddings[idx], candidate_embeddings[sel_idx])
for sel_idx in selected_indices)
# Combined MMR score
score = lambda_param * relevance - (1 - lambda_param) * diversity
if score > best_score:
best_score = score
best_idx = idx
selected_indices.append(best_idx)
candidate_indices.remove(best_idx)
return [candidate_docs[i] for i in selected_indices]
def similarity_threshold_filter(self, query_embedding, candidate_embeddings, candidate_docs, similarity_threshold=0.9,top_k=5):
selected_docs = []
selected_embeddings = []
# Compute query similarity scores for sorting candidates (highest first)
candidate_scores = np.dot(candidate_embeddings, query_embedding)
sorted_indices = np.argsort(candidate_scores)[::-1]
for idx in sorted_indices:
candidate_embedding = candidate_embeddings[idx]
# Check if candidate is too similar to any already selected document
is_redundant = any(np.dot(candidate_embedding, sel_emb) >= similarity_threshold
for sel_emb in selected_embeddings)
if not is_redundant and len(selected_docs) < top_k:
print("appending ",candidate_docs[idx])
selected_docs.append(candidate_docs[idx])
selected_embeddings.append(candidate_embedding)
return selected_docs
def consolidate_with_large_llm_and_citations(self, *args, **kwargs):
"""
Wrapper method that delegates to the ExtensiveSearchManager instance.
This provides backward compatibility and easy access to the dual approach functionality.
"""
if self.extensive_search_manager:
return self.extensive_search_manager.consolidate_with_large_llm_and_citations(*args, **kwargs)
else:
raise RuntimeError("ExtensiveSearchManager not initialized. Cannot use consolidate_with_large_llm_and_citations.")
Parameters
| Name | Type | Default | Kind |
|---|---|---|---|
bases |
- | - |
Parameter Details
bases: Parameter of type
Return Value
Returns unspecified type
Class Interface
Methods
__init__(self)
Purpose: Internal method: init
Returns: None
get_instruction_template(self, template_name)
Purpose: Get instruction template by name
Parameters:
template_name: Parameter
Returns: None
save_instruction_template(self, template_name, instructions)
Purpose: Save a custom instruction template
Parameters:
template_name: Parameterinstructions: Parameter
Returns: None
load_custom_templates(self)
Purpose: Load custom instruction templates from file
Returns: None
init_connections(self)
Purpose: Performs init connections
Returns: None
run_query(self, query, params)
Purpose: Execute a Cypher query and return the result Parameters ---------- query : str The Cypher query to execute params : dict, optional Parameters for the query Returns ------- result The query result
Parameters:
query: Parameterparams: Parameter
Returns: See docstring for return details
evaluate_query(self, query, params)
Purpose: Execute a Cypher query and return a single result Parameters ---------- query : str The Cypher query to execute params : dict, optional Parameters for the query Returns ------- object The single result value
Parameters:
query: Parameterparams: Parameter
Returns: See docstring for return details
push_changes(self, node)
Purpose: Push changes to a node to the database Parameters ---------- node : dict or node-like object Node with properties to update
Parameters:
node: Parameter
Returns: None
count_tokens(self, text)
Purpose: Performs count tokens
Parameters:
text: Parameter
Returns: None
set_api_keys(self)
Purpose: Sets api keys
Returns: None
extract_core_query(self, query_text)
Purpose: Extracts the core information-seeking question from a user query that may contain both a question and processing instructions for the RAG system. Args: query_text: The original user query text Returns: dict: Contains the extracted information with keys: - core_question: The actual information need/question - instructions: Any processing instructions found - is_complex: Boolean indicating if query contained instructions
Parameters:
query_text: Parameter
Returns: See docstring for return details
extract_serper_results(self, serper_response)
Purpose: Extract formatted search results and URLs from GoogleSerperAPI response. Args: serper_response: Raw response from GoogleSerperAPI (JSON object or string) Returns: tuple: (formatted_results, extracted_urls)
Parameters:
serper_response: Parameter
Returns: See docstring for return details
response_callback(self, query, progress_callback)
Purpose: Performs response callback
Parameters:
query: Parameterprogress_callback: Parameter
Returns: None
get_block_by_number(self, block_number)
Purpose: Retrieve a specific block by its number from the most recent response. For extensive search mode, this may extract specific documents from comprehensive blocks. Args: block_number (int): The block number to retrieve Returns: dict: Block data including content, title, source info, etc., or None if not found
Parameters:
block_number: Parameter
Returns: See docstring for return details
extract_referenced_blocks_from_response(self, response_text)
Purpose: Extract which block numbers are actually referenced in the response text. Args: response_text (str): The final formatted response text Returns: set: Set of block numbers that are referenced in the response
Parameters:
response_text: Parameter
Returns: See docstring for return details
get_available_references(self, response_text)
Purpose: Get list of available references that can be used in the UI selection box. Now filters to only show blocks that are actually referenced in the response. Args: response_text (str, optional): The response text to filter by. If None, uses stored response or returns all blocks. Returns: list: List of reference dictionaries with id, type, title, and source information
Parameters:
response_text: Parameter
Returns: See docstring for return details
get_embedding(self, text)
Purpose: Generate an embedding for the given text using OpenAI's text-embedding-ada-002 model.
Parameters:
text: Parameter
Returns: None
extract_for_queries(self, text, queries, max_tokens, api_key)
Purpose: Extract information from text based on queries. Args: text: Text to extract from queries: List of queries to guide extraction max_tokens: Maximum tokens in the output api_key: API key for the LLM service Returns: Extracted text relevant to the queries
Parameters:
text: Parameterqueries: Parametermax_tokens: Parameterapi_key: Parameter
Returns: See docstring for return details
parse_handler(self, query, detail_level)
Purpose: Performs parse handler
Parameters:
query: Parameterdetail_level: Parameter
Returns: None
reformat_data(self, data, min_document_length, similarity_threshold, use_crossencoder, inclusions)
Purpose: Reformat and filter data to be grouped by ID, excluding too-short documents and documents that are too similar to each other. Optionally applies crossencoder ranking. Args: data: Original data structure min_document_length: Minimum character length for documents to include (default: 30) similarity_threshold: Threshold for document similarity (default: 0.95, higher means more similar) use_crossencoder: Whether to apply crossencoder reranking (default: False) inclusions: Number of documents to return after filtering (default: 10) Returns: List of selected documents (not dictionary)
Parameters:
data: Parametermin_document_length: Parametersimilarity_threshold: Parameteruse_crossencoder: Parameterinclusions: Parameter
Returns: See docstring for return details
score_reference_relevance(self, final_answer, reference_documents, relevance_threshold)
Purpose: Score the relevance of each reference document against the final answer using a small LLM. Args: final_answer: The generated answer text reference_documents: List of (metadata, content) tuples for reference documents relevance_threshold: Minimum score to include a reference (0.0-1.0) Returns: List of (metadata, content, score) tuples for documents above threshold
Parameters:
final_answer: Parameterreference_documents: Parameterrelevance_threshold: Parameter
Returns: See docstring for return details
extract_filter_keywords(self, query, n_keywords)
Purpose: Extract distinguishing keywords from a query for filtering search results. Args: query: The user's query text n_keywords: Maximum number of keywords to extract Returns: List of keywords for filtering
Parameters:
query: Parametern_keywords: Parameter
Returns: See docstring for return details
collect_data_from_chroma(self, data, query, detail_level)
Purpose: Collect relevant documents from ChromaDB based on query with optimized workflow: 1) Combine results from all extended queries 2) Apply keyword filters across all results 3) Remove similar documents and apply cross-encoder if requested 4) Evaluate against target and add additional documents as needed Args: data: Configuration data for collection and processing query: The user query detail_level: Level of detail for the summary (Summary, Balanced, Detailed, Comprehensive) Returns: String with collected document blocks
Parameters:
data: Parameterquery: Parameterdetail_level: Parameter
Returns: See docstring for return details
get_embedding(self, text)
Purpose: Generate an embedding for the given text using OpenAI's text-embedding model.
Parameters:
text: Parameter
Returns: None
collect_data_from_neo4j(self, data, query, doc_type, detail_level)
Purpose: Collect relevant documents from Neo4j using keyword pre-filtering and FAISS vector search Args: data: Dictionary containing search configuration query: User's query text doc_type: Type of documents to search (default: "literature data") detail_level: Level of detail for the summary (Summary, Balanced, Detailed, Comprehensive) Returns: String with formatted text blocks for LLM context
Parameters:
data: Parameterquery: Parameterdoc_type: Parameterdetail_level: Parameter
Returns: See docstring for return details
collect_data_from_neo4j_new(self, data, query, doc_type, detail_level)
Purpose: Collect relevant documents from Neo4j using optimized workflow: 1) Combine results from all extended queries 2) Apply keyword filters across all results 3) Remove similar documents and apply cross-encoder if requested 4) Evaluate against target and add additional documents as needed Args: data: Dictionary containing search configuration query: User's query text doc_type: Type of documents to search (default: "literature data") detail_level: Level of detail for the summary (Summary, Balanced, Detailed, Comprehensive) Returns: String with formatted text blocks for LLM context
Parameters:
data: Parameterquery: Parameterdoc_type: Parameterdetail_level: Parameter
Returns: See docstring for return details
_fetch_neo4j_nodes(self, base_query, keywords, node_type, content_field, limit)
Purpose: Helper method to fetch nodes from Neo4j with keyword filtering Args: base_query: Base cypher query for filtering keywords: Keywords for filtering results node_type: Type of node to fetch content_field: Field containing the content limit: Maximum number of nodes to retrieve Returns: List of node dictionaries
Parameters:
base_query: Parameterkeywords: Parameternode_type: Parametercontent_field: Parameterlimit: Parameter
Returns: See docstring for return details
_process_node_type(self, base_query, query_embedding, query_text, node_type, content_field, k, embedding_function, doc_type)
Purpose: Helper method to process a specific node type with FAISS vector search Args: base_query: Base cypher query for pre-filtering query_embedding: Embedding vector for the query query_text: Text of the query for cross-encoder ranking node_type: Type of node to process (Text_chunk or Table_chunk) content_field: Field containing the node content k: Number of results to retrieve embedding_function: Function to generate embeddings doc_type: Type of document max_results: Maximum number of results to return Returns: Dictionary with blocks added and other metadata
Parameters:
base_query: Parameterquery_embedding: Parameterquery_text: Parameternode_type: Parametercontent_field: Parameterk: Parameterembedding_function: Parameterdoc_type: Parameter
Returns: See docstring for return details
collect_text_blocks(self, data, query)
Purpose: Performs collect text blocks
Parameters:
data: Parameterquery: Parameter
Returns: None
generate_prompt(self, template, data_sections, query)
Purpose: Performs generate prompt
Parameters:
template: Parameterdata_sections: Parameterquery: Parameter
Returns: None
extend_query(self, query)
Purpose: Performs extend query
Parameters:
query: Parameter
Returns: None
normalize(self, vector)
Purpose: Performs normalize
Parameters:
vector: Parameter
Returns: None
mmr(self, query_embedding, candidate_embeddings, candidate_docs, lambda_param, top_k)
Purpose: Performs mmr
Parameters:
query_embedding: Parametercandidate_embeddings: Parametercandidate_docs: Parameterlambda_param: Parametertop_k: Parameter
Returns: None
similarity_threshold_filter(self, query_embedding, candidate_embeddings, candidate_docs, similarity_threshold, top_k)
Purpose: Performs similarity threshold filter
Parameters:
query_embedding: Parametercandidate_embeddings: Parametercandidate_docs: Parametersimilarity_threshold: Parametertop_k: Parameter
Returns: None
consolidate_with_large_llm_and_citations(self)
Purpose: Wrapper method that delegates to the ExtensiveSearchManager instance. This provides backward compatibility and easy access to the dual approach functionality.
Returns: None
Required Imports
from typing import List
from typing import Any
from typing import Dict
import os
import panel as pn
Usage Example
# Example usage:
# result = OneCo_hybrid_RAG(bases)
Tags
Similar Components
AI-powered semantic similarity - components with related functionality:
-
class OneCo_hybrid_RAG_v2 99.1% similar
-
class OneCo_hybrid_RAG_v1 98.7% similar
-
class OneCo_hybrid_RAG 98.2% similar
-
function check_rag_config 59.3% similar
-
class DocChatRAG 54.6% similar