class QAUpdater
Orchestrates a two-step Q&A document updating process that generates optimal search queries, retrieves information from internal and external sources, and uses an LLM to determine if updates are needed.
/tf/active/vicechatdev/QA_updater/qa_engine/qa_updater.py
13 - 368
complex
Purpose
The QAUpdater class manages the complete lifecycle of updating Q&A documents by: (1) generating targeted search queries using an LLM, (2) retrieving relevant information from internal vector stores and Google search, (3) using an LLM to analyze retrieved content and decide if updates are needed, and (4) applying updates automatically or queuing them for human review based on confidence levels. It integrates multiple data sources (ChromaDB, Neo4j, Google Search) and uses prompt-based LLM interactions to maintain up-to-date Q&A content.
Source Code
class QAUpdater:
"""Orchestrates the Q&A document updating process."""
def __init__(self, config: ConfigParser):
"""
Initializes the QAUpdater with the necessary clients and managers.
Args:
config (ConfigParser): Configuration object containing settings for all components.
"""
self.logger = logging.getLogger(__name__)
self.config = config
# Core components
self.llm_client = LLMClient(config)
self.prompt_manager = PromptManager(config)
self.query_parser = QueryParser()
# Limited data access clients for initial runs
self.google_search_client = GoogleSearchClient(config)
# Knowledge store managers
self.chroma_manager = ChromaManager(config)
self.neo4j_manager = Neo4jManager(config)
self.logger.info("QAUpdater initialized with minimal clients.")
def update_qa_document(self, question_id: str) -> Dict[str, Any]:
"""
Updates a Q&A document using the two-step process.
Args:
question_id (str): ID of the Q&A pair to potentially update.
Returns:
Dict[str, Any]: A dictionary containing the update status, new answer (if updated),
reasoning, and changes.
"""
try:
# Retrieve original Q&A pair (replace with your actual data retrieval method)
qa_pair = self._get_qa_pair(question_id)
# STEP 1: Generate optimal search queries
query_generation_prompt = self.prompt_manager.load_prompt_template("query_generation.txt")
query_prompt_variables = {
"original_question": qa_pair["question"],
"current_answer": qa_pair["answer"],
"last_update_date": qa_pair["last_updated"],
"num_google_queries": 5 # Configure the number of Google queries to generate
}
# Fill in the query generation prompt
query_prompt = query_generation_prompt.format(**query_prompt_variables)
# Call LLM to generate search queries
query_response = self.llm_client.call_llm(query_prompt)
# Parse the query response
parsed_queries = self.query_parser.parse_query_response(query_response)
# STEP 2: Retrieve information using only internal sources and Google search
retrieved_content = {}
# Internal vector search (replace with your actual vector search implementation)
retrieved_content["internal"] = self._query_internal_sources(
parsed_queries["vector_search_queries"],
limit=20
)
# Google search results
retrieved_content["google"] = self.google_search_client.search_all(
queries=parsed_queries["google_search_queries"],
operators=parsed_queries["search_operators"],
domains=parsed_queries["domain_restrictions"],
max_results_per_query=3
)
# Format retrieved content for the update prompt
formatted_content = {
"formatted_internal_content": self._format_internal_content(retrieved_content["internal"]),
"formatted_google_content": self._format_google_content(retrieved_content["google"]),
# Provide empty content for other sources
"formatted_literature_content": "No literature search results available.",
"formatted_clinical_trial_content": "No clinical trial search results available.",
"formatted_patent_content": "No patent search results available.",
"formatted_company_news_content": "No company news search results available."
}
# STEP 3: Generate update decision and new content if needed
update_prompt_template = self.prompt_manager.load_prompt_template("qa_update.txt")
update_prompt_variables = {
"original_question": qa_pair["question"],
"current_answer": qa_pair["answer"],
"last_update_date": qa_pair["last_updated"],
**formatted_content
}
# Fill in the update prompt
update_prompt = update_prompt_template.format(**update_prompt_variables)
# Call LLM to make update decision
update_response = self.llm_client.call_llm(update_prompt)
#self.logger.info(f"Raw LLM update prompt:\n{update_prompt}")
#self.logger.info(f"Raw LLM update response:\n{update_response}")
# Parse the response
parsed_update = self._parse_update_response(update_response)
# Process the update decision
if parsed_update["update_needed"] == "Yes":
if parsed_update["confidence"].lower() == "high" or parsed_update["confidence"].lower() == "medium":
# Apply the update automatically
self._apply_update(question_id, parsed_update["updated_answer"], parsed_update["sources"])
return {
"status": "updated",
"new_answer": parsed_update["updated_answer"],
"reasoning": parsed_update["reasoning"],
"changes": parsed_update["change_summary"]
},parsed_queries,retrieved_content
else:
# Queue for human review
self._queue_for_human_review(question_id, parsed_update)
return {
"status": "review_needed",
"reasoning": parsed_update["reasoning"],
"suggested_answer": parsed_update["updated_answer"]
},parsed_queries,retrieved_content
else:
# No update needed
self._log_no_update_needed(question_id, parsed_update["reasoning"])
return {
"status": "no_update_needed",
"reasoning": parsed_update["reasoning"]
},parsed_queries,retrieved_content
except Exception as e:
self.logger.exception(f"Error updating Q&A document: {e}")
return {
"status": "error",
"error_message": str(e)
},parsed_queries,retrieved_content
# --- Helper methods ---
def _get_qa_pair(self, question_id: str) -> Dict[str, Any]:
"""Retrieves a Q&A pair from your data store."""
# Replace with your actual data retrieval logic
# This is just a placeholder
qa_pairs=json.loads(open('Q_A.json').read())['qa_pairs']
q = qa_pairs[int(question_id)]['question']
a = qa_pairs[int(question_id)]['answer']
return {
"question": q,
"answer": a,
"last_updated": "2024-01-01"
}
def _query_internal_sources(self, queries: List[str], limit: int) -> List[str]:
"""Queries internal knowledge sources using vector search."""
# Replace with your actual vector search implementation
# This is just a placeholder
results = []
for query in queries:
results.extend(self.chroma_manager.query_collection(query))
return results
def _format_internal_content(self, content: List[str]) -> str:
"""Formats internal content for the update prompt."""
# Replace with your actual formatting logic
print("content",content)
return "\n".join(content)
# def _format_google_content(self, content: List[Dict[str, Any]]) -> str:
# """Formats Google search content for the update prompt."""
# formatted_results = []
# for i, result in enumerate(content, 1):
# formatted_result = f"{i}. {result.get('title', 'No Title')}\n"
# formatted_result += f" Source: {result.get('source', 'Unknown')}\n"
# formatted_result += f" URL: {result.get('link', 'No URL')}\n"
# formatted_result += f" Summary: {result.get('snippet', 'No snippet available')}\n"
# formatted_results.append(formatted_result)
# if not formatted_results:
# return "No relevant Google search results found."
# return "\n".join(formatted_results)
def _format_google_content(self, content: List[str]) -> str:
"""
Formats Google search content for the update prompt.
Args:
content: List of search result strings from GoogleSerperAPIWrapper
Returns:
A formatted string containing the search results
"""
if not content:
return "No relevant Google search results found."
# For LangChain GoogleSerperAPIWrapper, content is now a list of strings
# Each string contains the results for a query
formatted_results = []
for i, result_text in enumerate(content, 1):
formatted_results.append(f"RESULT SET {i}:\n{result_text}")
return "\n\n" + "\n\n".join(formatted_results)
def _parse_update_response(self, response_text: str) -> Dict[str, Any]:
"""
Parses the LLM's update response.
Args:
response_text: The raw response from the LLM
Returns:
Dict containing structured update information
"""
try:
self.logger.debug(f"Parsing LLM response of length: {len(response_text)}")
lines = response_text.strip().split('\n')
result = {
"update_needed": "No", # Default
"confidence": "Low", # Default
"reasoning": "",
"updated_answer": "",
"change_summary": [],
"sources": []
}
current_section = None
section_content = []
for i, line in enumerate(lines):
line = line.strip()
# Skip empty lines
if not line:
continue
# Check for section headers
section_match = re.match(r'^(\d+)\.\s+([A-Z\s]+):(.*)$', line)
if section_match:
section_num = int(section_match.group(1))
section_name = section_match.group(2).strip().lower()
remainder = section_match.group(3).strip()
# If we were collecting content for a previous section
if current_section and section_content:
if current_section == "reasoning":
result["reasoning"] = "\n".join(section_content)
elif current_section == "updated_answer":
result["updated_answer"] = "\n".join(section_content)
# Start new section
section_content = []
# Process specific sections
if "update needed" in section_name:
result["update_needed"] = "Yes" if "yes" in remainder.lower() else "No"
current_section = None
elif "confidence" in section_name:
result["confidence"] = remainder
current_section = None
elif "reasoning" in section_name:
current_section = "reasoning"
if remainder: # If there's content on the same line
section_content.append(remainder)
elif "updated answer" in section_name:
current_section = "updated_answer"
if remainder: # If there's content on the same line
section_content.append(remainder)
elif "change summary" in section_name:
current_section = "change_summary"
elif "sources" in section_name:
current_section = "sources"
continue
# Process content based on current section
if current_section == "reasoning":
section_content.append(line)
elif current_section == "updated_answer":
section_content.append(line)
elif current_section == "change_summary":
if line.startswith('-') or line.startswith('*'):
result["change_summary"].append(line[1:].strip())
else:
result["change_summary"].append(line)
elif current_section == "sources":
if re.match(r'^\d+\.', line) or line.startswith('-'):
# Extract the source from numbered or bulleted item
if line.startswith('-'):
source = line[1:].strip()
else:
source = line[line.find('.')+1:].strip() if '.' in line else line
result["sources"].append(source)
else:
# Append to the last source if it exists
if result["sources"]:
result["sources"][-1] += " " + line
else:
result["sources"].append(line)
# Process any remaining section content at end of response
if current_section and section_content:
if current_section == "reasoning":
result["reasoning"] = "\n".join(section_content)
elif current_section == "updated_answer":
result["updated_answer"] = "\n".join(section_content)
# Debug output
self.logger.debug(f"Parsed update_needed: {result['update_needed']}")
self.logger.debug(f"Parsed confidence: {result['confidence']}")
self.logger.debug(f"Parsed reasoning length: {len(result['reasoning'])}")
self.logger.debug(f"Parsed updated_answer length: {len(result['updated_answer'])}")
self.logger.debug(f"Parsed change_summary count: {len(result['change_summary'])}")
self.logger.debug(f"Parsed sources count: {len(result['sources'])}")
return result
except Exception as e:
self.logger.exception(f"Error parsing update response: {e}")
return {
"update_needed": "No",
"confidence": "Low",
"reasoning": f"Error parsing LLM response: {str(e)}",
"updated_answer": "",
"change_summary": [],
"sources": []
}
def _apply_update(self, question_id: str, new_answer: str, sources: List[str]) -> None:
"""Applies the updated answer to the Q&A document."""
# Replace with your actual update logic
self.logger.info(f"Updated Q&A {question_id} with new answer")
self.logger.info(f"Answer: {new_answer[:100]}...")
self.logger.info(f"Sources: {sources}")
def _queue_for_human_review(self, question_id: str, update_data: Dict[str, Any]) -> None:
"""Queues the update for human review."""
# Replace with your actual queuing logic
self.logger.info(f"Queued Q&A {question_id} for human review")
self.logger.info(f"Suggested answer: {update_data['updated_answer'][:100]}...")
self.logger.info(f"Reasoning: {update_data['reasoning'][:100]}...")
def _log_no_update_needed(self, question_id: str, reasoning: str) -> None:
"""Logs that no update was needed."""
# Replace with your actual logging logic
self.logger.info(f"No update needed for Q&A {question_id}")
self.logger.info(f"Reasoning: {reasoning[:100]}...")
Parameters
| Name | Type | Default | Kind |
|---|---|---|---|
bases |
- | - |
Parameter Details
config: A ConfigParser object containing configuration settings for all components including LLM client settings, prompt paths, database connections (ChromaDB, Neo4j), and Google Search API credentials. This config is passed to all initialized sub-components.
Return Value
Instantiation returns a QAUpdater object with initialized clients and managers. The main method update_qa_document() returns a tuple containing: (1) a dictionary with status ('updated', 'review_needed', 'no_update_needed', or 'error'), optional new_answer, reasoning, and changes; (2) parsed_queries dictionary with generated search queries; (3) retrieved_content dictionary with results from internal and Google sources.
Class Interface
Methods
__init__(self, config: ConfigParser) -> None
Purpose: Initializes the QAUpdater with all necessary clients and managers for LLM interaction, prompt management, query parsing, search, and knowledge store access
Parameters:
config: ConfigParser object containing configuration settings for all sub-components
Returns: None - initializes instance attributes
update_qa_document(self, question_id: str) -> tuple[Dict[str, Any], Dict, Dict]
Purpose: Main orchestration method that executes the complete two-step Q&A update process: generates search queries, retrieves information, and determines if updates are needed
Parameters:
question_id: String identifier for the Q&A pair to potentially update
Returns: Tuple containing: (1) status dictionary with keys 'status', 'new_answer', 'reasoning', 'changes', 'error_message', or 'suggested_answer' depending on outcome; (2) parsed_queries dictionary with generated search queries; (3) retrieved_content dictionary with search results
_get_qa_pair(self, question_id: str) -> Dict[str, Any]
Purpose: Retrieves a Q&A pair from the data store (currently reads from Q_A.json file)
Parameters:
question_id: String identifier for the Q&A pair to retrieve
Returns: Dictionary with keys 'question', 'answer', and 'last_updated'
_query_internal_sources(self, queries: List[str], limit: int) -> List[str]
Purpose: Queries internal knowledge sources using vector search via ChromaManager
Parameters:
queries: List of search query strings to execute against internal vector storelimit: Maximum number of results to return (currently unused in implementation)
Returns: List of string results from vector search queries
_format_internal_content(self, content: List[str]) -> str
Purpose: Formats internal vector search results into a string suitable for LLM prompt inclusion
Parameters:
content: List of string results from internal vector search
Returns: Newline-joined string of all content items
_format_google_content(self, content: List[str]) -> str
Purpose: Formats Google search results from GoogleSerperAPIWrapper into a structured string for LLM prompt inclusion
Parameters:
content: List of search result strings from GoogleSerperAPIWrapper, each containing results for a query
Returns: Formatted string with numbered result sets, or 'No relevant Google search results found.' if empty
_parse_update_response(self, response_text: str) -> Dict[str, Any]
Purpose: Parses the LLM's structured response to extract update decision, confidence, reasoning, updated answer, change summary, and sources
Parameters:
response_text: Raw text response from LLM containing numbered sections with update information
Returns: Dictionary with keys 'update_needed' (Yes/No), 'confidence' (High/Medium/Low), 'reasoning', 'updated_answer', 'change_summary' (list), and 'sources' (list)
_apply_update(self, question_id: str, new_answer: str, sources: List[str]) -> None
Purpose: Applies the updated answer to the Q&A document in the data store (placeholder implementation logs only)
Parameters:
question_id: String identifier for the Q&A pair to updatenew_answer: The new answer text to applysources: List of source citations for the update
Returns: None - performs side effect of updating data store
_queue_for_human_review(self, question_id: str, update_data: Dict[str, Any]) -> None
Purpose: Queues a low-confidence update for human review (placeholder implementation logs only)
Parameters:
question_id: String identifier for the Q&A pair requiring reviewupdate_data: Dictionary containing 'updated_answer', 'reasoning', and other update metadata
Returns: None - performs side effect of adding to review queue
_log_no_update_needed(self, question_id: str, reasoning: str) -> None
Purpose: Logs that no update was needed for a Q&A pair
Parameters:
question_id: String identifier for the Q&A pairreasoning: Explanation of why no update was needed
Returns: None - performs logging side effect
Attributes
| Name | Type | Description | Scope |
|---|---|---|---|
logger |
logging.Logger | Logger instance for the QAUpdater class, used throughout for info, debug, and exception logging | instance |
config |
ConfigParser | Configuration object containing settings for all components, stored for potential future use | instance |
llm_client |
LLMClient | Client for making LLM API calls to generate queries and update decisions | instance |
prompt_manager |
PromptManager | Manager for loading and managing prompt templates (query_generation.txt, qa_update.txt) | instance |
query_parser |
QueryParser | Parser for extracting structured query information from LLM responses | instance |
google_search_client |
GoogleSearchClient | Client for executing Google searches with operators and domain restrictions | instance |
chroma_manager |
ChromaManager | Manager for interacting with ChromaDB vector store for internal knowledge retrieval | instance |
neo4j_manager |
Neo4jManager | Manager for interacting with Neo4j graph database (initialized but not actively used in current implementation) | instance |
Dependencies
loggingtypingconfigparserjsonrecore.llm_clientcore.prompt_managercore.query_parserdata_access.google_search_clientknowledge_store.chroma_managerknowledge_store.neo4j_manager
Required Imports
import logging
from typing import Dict, Any, List, Union
from configparser import ConfigParser
from core.llm_client import LLMClient
from core.prompt_manager import PromptManager
from core.query_parser import QueryParser
from data_access.google_search_client import GoogleSearchClient
from knowledge_store.chroma_manager import ChromaManager
from knowledge_store.neo4j_manager import Neo4jManager
import json
import re
Usage Example
from configparser import ConfigParser
import logging
# Setup logging
logging.basicConfig(level=logging.INFO)
# Create configuration
config = ConfigParser()
config.read('config.ini')
# Instantiate QAUpdater
qa_updater = QAUpdater(config)
# Update a specific Q&A document
question_id = '0'
result, queries, content = qa_updater.update_qa_document(question_id)
# Check the result
if result['status'] == 'updated':
print(f"Updated answer: {result['new_answer']}")
print(f"Reasoning: {result['reasoning']}")
print(f"Changes: {result['changes']}")
elif result['status'] == 'review_needed':
print(f"Review needed: {result['reasoning']}")
print(f"Suggested answer: {result['suggested_answer']}")
elif result['status'] == 'no_update_needed':
print(f"No update needed: {result['reasoning']}")
else:
print(f"Error: {result['error_message']}")
Best Practices
- Always initialize with a properly configured ConfigParser containing all required settings for sub-components
- The update_qa_document method follows a strict two-step process: query generation then information retrieval - do not skip steps
- Handle all three return tuple elements (result dict, parsed_queries, retrieved_content) when calling update_qa_document
- Implement proper error handling around update_qa_document calls as it can raise exceptions during LLM calls or data retrieval
- Replace placeholder methods (_get_qa_pair, _apply_update, _queue_for_human_review, _log_no_update_needed) with actual implementations for your data store
- The class maintains stateful connections to ChromaDB, Neo4j, and Google Search - ensure proper cleanup if needed
- Updates are automatically applied only for 'high' or 'medium' confidence; 'low' confidence updates are queued for human review
- The _parse_update_response method expects specific LLM output format with numbered sections - ensure your prompts generate compatible output
- Configure num_google_queries in query_prompt_variables to control search breadth (default: 5)
- The class reads Q&A pairs from 'Q_A.json' by default - modify _get_qa_pair for different data sources
Tags
Similar Components
AI-powered semantic similarity - components with related functionality:
-
class QAUpdater_v1 83.2% similar
-
function main_v3 71.4% similar
-
function main_v47 48.5% similar
-
class QueryParser 48.5% similar
-
function process_chat_background 46.5% similar