🔍 Code Extractor

class QAUpdater_v1

Maturity: 25

Orchestrates the Q&A document updating process.

File:
/tf/active/vicechatdev/QA_updater/qa_engine/qa_updater_full.py
Lines:
17 - 483
Complexity:
moderate

Purpose

Orchestrates the Q&A document updating process.

Source Code

class QAUpdater:
    """Orchestrates the Q&A document updating process."""

    def __init__(self, config: ConfigParser):
        """
        Initializes the QAUpdater with the necessary clients and managers.

        Args:
            config (ConfigParser): Configuration object containing settings for all components.
        """
        self.logger = logging.getLogger(__name__)
        self.config = config

        # Core components
        self.llm_client = LLMClient(config)
        self.prompt_manager = PromptManager(config)
        self.query_parser = QueryParser()

        # Data access clients
        self.literature_client = LiteratureClient(config)
        self.clinical_trials_client = ClinicalTrialsClient(config)
        self.patent_client = PatentClient(config)
        self.company_news_client = CompanyNewsClient(config)
        self.document_downloader = DocumentDownloader(config)
        self.google_search_client = GoogleSearchClient(config)

        # Knowledge store managers
        self.chroma_manager = ChromaManager(config)
        self.neo4j_manager = Neo4jManager(config)

        self.logger.info("QAUpdater initialized.")

    def update_qa_document(self, question_id: str) -> Dict[str, Any]:
        """
        Updates a Q&A document using the two-step process.

        Args:
            question_id (str): ID of the Q&A pair to potentially update.

        Returns:
            Dict[str, Any]: A dictionary containing the update status, new answer (if updated),
                             reasoning, and changes.
        """
        try:
            # Retrieve original Q&A pair (replace with your actual data retrieval method)
            qa_pair = self._get_qa_pair(question_id)

            # STEP 1: Generate optimal search queries
            query_generation_prompt = self.prompt_manager.load_prompt_template("query_generation.txt")
            query_prompt_variables = {
                "original_question": qa_pair["question"],
                "current_answer": qa_pair["answer"],
                "last_update_date": qa_pair["last_updated"],
                "num_google_queries": 5  # Configure the number of Google queries to generate
            }

            # Fill in the query generation prompt
            query_prompt = query_generation_prompt.format(**query_prompt_variables)

            # Call LLM to generate search queries
            query_response = self.llm_client.call_llm(query_prompt)

            # Parse the query response
            parsed_queries = self.query_parser.parse_query_response(query_response)
            #return parsed_queries,query_response
            # STEP 2: Retrieve information using the optimized queries
            retrieved_content = {}

            # Internal vector search (replace with your actual vector search implementation)
            retrieved_content["internal"] = self._query_internal_sources(
                parsed_queries["vector_search_queries"],
                limit=5
            )

            # Google search results
            retrieved_content["google"] = self.google_search_client.search_all(
                queries=parsed_queries["google_search_queries"],
                operators=parsed_queries["search_operators"],
                domains=parsed_queries["domain_restrictions"],
                max_results_per_query=2
            )

            # Scientific literature search
            retrieved_content["literature"] = self.literature_client.search_all(
                query=parsed_queries["scientific_literature_queries"][0],  # Use first query
                max_results_per_source=3,
                days_back=parsed_queries["optimal_lookback_period"]
            )

            # Clinical trials search
            retrieved_content["clinical_trials"] = self.clinical_trials_client.search_all(
                query=parsed_queries["clinical_trial_queries"][0],  # Use first query
                max_results_per_source=2,
                days_back=parsed_queries["optimal_lookback_period"]
            )

            # Patent search
            retrieved_content["patents"] = self.patent_client.search_all(
                query=parsed_queries["patent_queries"][0],  # Use first query
                max_results_per_source=2,
                days_back=parsed_queries["optimal_lookback_period"]
            )

            # Company and news search
            retrieved_content["company_news"] = self.company_news_client.search_all_news(
                query=parsed_queries["company_news_queries"][0],  # Use first query
                max_results_per_source=2,
                days_back=min(90, parsed_queries["optimal_lookback_period"])  # Cap at 90 days
            )

            # Format retrieved content for the update prompt
            formatted_content = {
                "formatted_internal_content": self._format_internal_content(retrieved_content["internal"]),
                "formatted_google_content": self._format_google_content(retrieved_content["google"]),
                "formatted_literature_content": self._format_literature_content(retrieved_content["literature"]),
                "formatted_clinical_trial_content": self._format_clinical_trial_content(retrieved_content["clinical_trials"]),
                "formatted_patent_content": self._format_patent_content(retrieved_content["patents"]),
                "formatted_company_news_content": self._format_company_news_content(retrieved_content["company_news"])
            }

            # STEP 3: Generate update decision and new content if needed
            update_prompt_template = self.prompt_manager.load_prompt_template("qa_update.txt")
            update_prompt_variables = {
                "original_question": qa_pair["question"],
                "current_answer": qa_pair["answer"],
                "last_update_date": qa_pair["last_updated"],
                **formatted_content
            }

            # Fill in the update prompt
            update_prompt = update_prompt_template.format(**update_prompt_variables)

            # Call LLM to make update decision
            update_response = self.llm_client.call_llm(update_prompt)

            # Parse the response (replace with your actual parsing method)
            parsed_update = self._parse_update_response(update_response)

            # Process the update decision
            if parsed_update["update_needed"] == "Yes":
                if parsed_update["confidence"].lower() == "high":
                    # Apply the update automatically (replace with your actual update method)
                    self._apply_update(question_id, parsed_update["updated_answer"], parsed_update["sources"])
                    return {
                        "status": "updated",
                        "new_answer": parsed_update["updated_answer"],
                        "reasoning": parsed_update["reasoning"],
                        "changes": parsed_update["change_summary"]
                    }
                else:
                    # Queue for human review (replace with your actual queuing method)
                    self._queue_for_human_review(question_id, parsed_update)
                    return {
                        "status": "review_needed",
                        "reasoning": parsed_update["reasoning"],
                        "suggested_answer": parsed_update["updated_answer"]
                    }
            else:
                # No update needed
                self._log_no_update_needed(question_id, parsed_update["reasoning"])
                return {
                    "status": "no_update_needed",
                    "reasoning": parsed_update["reasoning"]
                }

        except Exception as e:
            self.logger.exception(f"Error updating Q&A document: {e}")
            return {
                "status": "error",
                "error_message": str(e)
            }

    # --- Helper methods (replace with your actual implementations) ---

    def _get_qa_pair(self, question_id: str) -> Dict[str, Any]:
        """Retrieves a Q&A pair from your data store."""
        # Replace with your actual data retrieval logic
        # This is just a placeholder
        return {
            "question": "4. Will the durability of the vaccine be demonstrated in the phase 1 trial?",
            "answer": "Yes. In the phase 1 trial about half of the older adult participants who received the 1st vaccination with VXB-241 (N=36, assuming a 10% drop-out rate) will be revaccinated with placebo 1 year later. For these subjects the durability of the immune response will be assessed over 2 years, while for all other subjects—including those treated with GSK’s commercial RSV vaccine Arexvy—durability will be assessed over 1 year after the first vaccination and 1 year after revaccination.",
            "last_updated": "2024-01-01"
        }


    def _query_internal_sources(self, queries: List[str], limit: int) -> List[str]:
        """Queries internal knowledge sources using vector search."""
        # Replace with your actual vector search implementation
        # This is just a placeholder
        results = []
        for query in queries:
            results.append(f"Internal result for query: {query}")
        return results[:limit]

    def _format_internal_content(self, content: List[str]) -> str:
        """Formats internal content for the update prompt."""
        # Replace with your actual formatting logic
        return "\n".join(content)
    
    def _format_google_content(self, content: List[Dict[str, Any]]) -> str:
        """Formats Google search content for the update prompt."""
        formatted_results = []
        
        for i, result in enumerate(content, 1):
            formatted_result = f"{i}. {result.get('title', 'No Title')}\n"
            formatted_result += f"   Source: {result.get('source', 'Unknown')}\n"
            formatted_result += f"   URL: {result.get('link', 'No URL')}\n"
            formatted_result += f"   Summary: {result.get('snippet', 'No snippet available')}\n"
            formatted_results.append(formatted_result)
        
        if not formatted_results:
            return "No relevant Google search results found."
        
        return "\n".join(formatted_results)

    def _format_literature_content(self, content: List[Dict[str, Any]]) -> str:
        """Formats literature content for the update prompt."""
        formatted_results = []
        
        for i, result in enumerate(content, 1):
            formatted_result = f"{i}. {result.get('title', 'No Title')}\n"
            formatted_result += f"   Source: {result.get('source', 'Unknown')}\n"
            formatted_result += f"   Authors: {', '.join(author.get('name', '') for author in result.get('authors', []))}\n"
            formatted_result += f"   Date: {result.get('publication_date', 'Unknown date')}\n"
            formatted_result += f"   DOI/URL: {result.get('doi') or result.get('url', 'No link')}\n"
            
            # Add abstract if available
            abstract = result.get('abstract') or result.get('summary', '')
            if abstract:
                # Limit the abstract length to prevent overly long content
                max_abstract_length = 500
                if len(abstract) > max_abstract_length:
                    abstract = abstract[:max_abstract_length] + "..."
                formatted_result += f"   Abstract: {abstract}\n"
                
            formatted_results.append(formatted_result)
        
        if not formatted_results:
            return "No relevant literature results found."
        
        return "\n".join(formatted_results)

    def _format_clinical_trial_content(self, content: List[Dict[str, Any]]) -> str:
        """Formats clinical trial content for the update prompt."""
        formatted_results = []
        
        for i, trial in enumerate(content, 1):
            # Handle different sources with different field names
            if trial.get('source') == 'clinicaltrials.gov':
                title = trial.get('BriefTitle', trial.get('OfficialTitle', 'No Title'))
                status = trial.get('OverallStatus', 'Unknown status')
                phase = trial.get('Phase', 'Unknown phase')
                sponsor = trial.get('LeadSponsorName', 'Unknown sponsor')
                conditions = ', '.join(trial.get('Condition', [])) if isinstance(trial.get('Condition'), list) else trial.get('Condition', 'Not specified')
                interventions = ', '.join(trial.get('Intervention', [])) if isinstance(trial.get('Intervention'), list) else trial.get('Intervention', 'Not specified')
                last_update = trial.get('LastUpdatePostDate', 'Unknown')
                url = f"https://clinicaltrials.gov/study/{trial.get('NCTId', [''])[0]}" if trial.get('NCTId') else 'No URL'
            else:
                # Generic fallback for other sources
                title = trial.get('title', 'No Title')
                status = trial.get('status', trial.get('recruitment_status', 'Unknown status'))
                phase = trial.get('phase', 'Not specified')
                sponsor = trial.get('sponsor_name', trial.get('primary_sponsor', 'Unknown sponsor'))
                conditions = ', '.join(trial.get('conditions', [])) if isinstance(trial.get('conditions'), list) else trial.get('conditions', 'Not specified')
                interventions = ', '.join(trial.get('interventions', [])) if isinstance(trial.get('interventions'), list) else trial.get('interventions', 'Not specified')
                last_update = trial.get('last_update', 'Unknown')
                url = trial.get('url', 'No URL')
            
            formatted_result = f"{i}. {title}\n"
            formatted_result += f"   Source: {trial.get('source', 'Unknown')}\n"
            formatted_result += f"   Status: {status}\n"
            formatted_result += f"   Phase: {phase}\n"
            formatted_result += f"   Sponsor: {sponsor}\n"
            formatted_result += f"   Conditions: {conditions}\n"
            formatted_result += f"   Interventions: {interventions}\n"
            formatted_result += f"   Last Updated: {last_update}\n"
            formatted_result += f"   URL: {url}\n"
            
            formatted_results.append(formatted_result)
        
        if not formatted_results:
            return "No relevant clinical trial results found."
        
        return "\n".join(formatted_results)

    def _format_patent_content(self, content: List[Dict[str, Any]]) -> str:
        """Formats patent content for the update prompt."""
        formatted_results = []
        
        for i, patent in enumerate(content, 1):
            # Handle different patent sources
            title = patent.get('invention_title', patent.get('title', 'No Title'))
            patent_id = patent.get('patent_number', patent.get('publication_number', patent.get('application_id', 'No ID')))
            applicant = patent.get('applicant_name', 'Not specified')
            filing_date = patent.get('file_date', patent.get('filing_date', 'Unknown date'))
            url = patent.get('url', 'No URL')
            
            formatted_result = f"{i}. {title}\n"
            formatted_result += f"   Source: {patent.get('source', 'Unknown')}\n"
            formatted_result += f"   Patent/Application ID: {patent_id}\n"
            formatted_result += f"   Applicant: {applicant}\n"
            formatted_result += f"   Filing Date: {filing_date}\n"
            formatted_result += f"   URL: {url}\n"
            
            # Add abstract if available
            abstract = patent.get('abstract', '')
            if abstract:
                # Limit the abstract length
                max_abstract_length = 300
                if len(abstract) > max_abstract_length:
                    abstract = abstract[:max_abstract_length] + "..."
                formatted_result += f"   Abstract: {abstract}\n"
                
            formatted_results.append(formatted_result)
        
        if not formatted_results:
            return "No relevant patent results found."
        
        return "\n".join(formatted_results)

    def _format_company_news_content(self, content: List[Dict[str, Any]]) -> str:
        """Formats company news content for the update prompt."""
        formatted_results = []
        
        for i, news in enumerate(content, 1):
            title = news.get('title', 'No Title')
            source = news.get('source_name', news.get('source', 'Unknown source'))
            date = news.get('published_at', news.get('date', 'Unknown date'))
            url = news.get('url', 'No URL')
            
            formatted_result = f"{i}. {title}\n"
            formatted_result += f"   Source: {source}\n"
            formatted_result += f"   Date: {date}\n"
            formatted_result += f"   URL: {url}\n"
            
            # Add description/snippet if available
            description = news.get('description', news.get('snippet', ''))
            if description:
                # Limit the description length
                max_desc_length = 300
                if len(description) > max_desc_length:
                    description = description[:max_desc_length] + "..."
                formatted_result += f"   Description: {description}\n"
                
            formatted_results.append(formatted_result)
        
        if not formatted_results:
            return "No relevant company news results found."
        
        return "\n".join(formatted_results)

    def _parse_update_response(self, response_text: str) -> Dict[str, Any]:
        """
        Parses the LLM's update response.
        
        Args:
            response_text: The raw response from the LLM
            
        Returns:
            Dict containing structured update information
        """
        try:
            lines = response_text.strip().split('\n')
            
            result = {
                "update_needed": "No",  # Default
                "confidence": "Low",    # Default
                "reasoning": "",
                "updated_answer": "",
                "change_summary": [],
                "sources": []
            }
            
            current_section = None
            section_content = []
            
            for line in lines:
                line = line.strip()
                
                # Skip empty lines
                if not line:
                    continue
                
                # Check for numbered items that indicate section headers
                if line.startswith('1. UPDATE NEEDED:'):
                    update_needed = line.split(':', 1)[1].strip()
                    result["update_needed"] = "Yes" if "yes" in update_needed.lower() else "No"
                    continue
                    
                if line.startswith('2. CONFIDENCE:'):
                    confidence = line.split(':', 1)[1].strip()
                    result["confidence"] = confidence
                    continue
                    
                # Check for section headers
                if line.startswith('3. REASONING:'):
                    current_section = "reasoning"
                    continue
                    
                if line.startswith('4. UPDATED ANSWER:'):
                    current_section = "updated_answer"
                    section_content = []
                    continue
                    
                if line.startswith('5. CHANGE SUMMARY:'):
                    current_section = "change_summary"
                    section_content = []
                    continue
                    
                if line.startswith('6. SOURCES:'):
                    current_section = "sources"
                    section_content = []
                    continue
                
                # Process content for the current section
                if current_section == "reasoning":
                    if not result["reasoning"]:
                        result["reasoning"] = line
                    else:
                        result["reasoning"] += "\n" + line
                elif current_section == "updated_answer":
                    section_content.append(line)
                elif current_section == "change_summary":
                    if line.startswith('-') or line.startswith('*'):
                        result["change_summary"].append(line[1:].strip())
                    else:
                        result["change_summary"].append(line)
                elif current_section == "sources":
                    if line[0].isdigit() and '. ' in line:
                        result["sources"].append(line[line.find('.')+1:].strip())
            
            # Set the updated answer from collected content
            if section_content:
                result["updated_answer"] = "\n".join(section_content)
                
            return result
            
        except Exception as e:
            self.logger.exception(f"Error parsing update response: {e}")
            return {
                "update_needed": "No",
                "confidence": "Low",
                "reasoning": f"Error parsing LLM response: {str(e)}",
                "updated_answer": "",
                "change_summary": [],
                "sources": []
            }

    def _apply_update(self, question_id: str, new_answer: str, sources: List[str]) -> None:
        """Applies the updated answer to the Q&A document."""
        # Replace with your actual update logic
        self.logger.info(f"Updated Q&A {question_id} with new answer")
        self.logger.info(f"Answer: {new_answer[:100]}...")
        self.logger.info(f"Sources: {sources}")

    def _queue_for_human_review(self, question_id: str, update_data: Dict[str, Any]) -> None:
        """Queues the update for human review."""
        # Replace with your actual queuing logic
        self.logger.info(f"Queued Q&A {question_id} for human review")
        self.logger.info(f"Suggested answer: {update_data['updated_answer'][:100]}...")
        self.logger.info(f"Reasoning: {update_data['reasoning'][:100]}...")

    def _log_no_update_needed(self, question_id: str, reasoning: str) -> None:
        """Logs that no update was needed."""
        # Replace with your actual logging logic
        self.logger.info(f"No update needed for Q&A {question_id}")
        self.logger.info(f"Reasoning: {reasoning[:100]}...")

Parameters

Name Type Default Kind
bases - -

Parameter Details

bases: Parameter of type

Return Value

Returns unspecified type

Class Interface

Methods

__init__(self, config)

Purpose: Initializes the QAUpdater with the necessary clients and managers. Args: config (ConfigParser): Configuration object containing settings for all components.

Parameters:

  • config: Type: ConfigParser

Returns: None

update_qa_document(self, question_id) -> Dict[str, Any]

Purpose: Updates a Q&A document using the two-step process. Args: question_id (str): ID of the Q&A pair to potentially update. Returns: Dict[str, Any]: A dictionary containing the update status, new answer (if updated), reasoning, and changes.

Parameters:

  • question_id: Type: str

Returns: Returns Dict[str, Any]

_get_qa_pair(self, question_id) -> Dict[str, Any]

Purpose: Retrieves a Q&A pair from your data store.

Parameters:

  • question_id: Type: str

Returns: Returns Dict[str, Any]

_query_internal_sources(self, queries, limit) -> List[str]

Purpose: Queries internal knowledge sources using vector search.

Parameters:

  • queries: Type: List[str]
  • limit: Type: int

Returns: Returns List[str]

_format_internal_content(self, content) -> str

Purpose: Formats internal content for the update prompt.

Parameters:

  • content: Type: List[str]

Returns: Returns str

_format_google_content(self, content) -> str

Purpose: Formats Google search content for the update prompt.

Parameters:

  • content: Type: List[Dict[str, Any]]

Returns: Returns str

_format_literature_content(self, content) -> str

Purpose: Formats literature content for the update prompt.

Parameters:

  • content: Type: List[Dict[str, Any]]

Returns: Returns str

_format_clinical_trial_content(self, content) -> str

Purpose: Formats clinical trial content for the update prompt.

Parameters:

  • content: Type: List[Dict[str, Any]]

Returns: Returns str

_format_patent_content(self, content) -> str

Purpose: Formats patent content for the update prompt.

Parameters:

  • content: Type: List[Dict[str, Any]]

Returns: Returns str

_format_company_news_content(self, content) -> str

Purpose: Formats company news content for the update prompt.

Parameters:

  • content: Type: List[Dict[str, Any]]

Returns: Returns str

_parse_update_response(self, response_text) -> Dict[str, Any]

Purpose: Parses the LLM's update response. Args: response_text: The raw response from the LLM Returns: Dict containing structured update information

Parameters:

  • response_text: Type: str

Returns: Returns Dict[str, Any]

_apply_update(self, question_id, new_answer, sources) -> None

Purpose: Applies the updated answer to the Q&A document.

Parameters:

  • question_id: Type: str
  • new_answer: Type: str
  • sources: Type: List[str]

Returns: Returns None

_queue_for_human_review(self, question_id, update_data) -> None

Purpose: Queues the update for human review.

Parameters:

  • question_id: Type: str
  • update_data: Type: Dict[str, Any]

Returns: Returns None

_log_no_update_needed(self, question_id, reasoning) -> None

Purpose: Logs that no update was needed.

Parameters:

  • question_id: Type: str
  • reasoning: Type: str

Returns: Returns None

Required Imports

import logging
from typing import Dict
from typing import Any
from typing import List
from configparser import ConfigParser

Usage Example

# Example usage:
# result = QAUpdater(bases)

Similar Components

AI-powered semantic similarity - components with related functionality:

  • class QAUpdater 83.2% similar

    Orchestrates a two-step Q&A document updating process that generates optimal search queries, retrieves information from internal and external sources, and uses an LLM to determine if updates are needed.

    From: /tf/active/vicechatdev/QA_updater/qa_engine/qa_updater.py
  • function main_v3 71.0% similar

    Entry point function that orchestrates a Q&A document updater application, loading configuration, setting up logging, and processing Q&A document updates with comprehensive status handling.

    From: /tf/active/vicechatdev/QA_updater/main.py
  • function main_v47 45.8% similar

    Orchestrates and executes a series of example demonstrations for the DocChat system, including document indexing, RAG queries, and conversation modes.

    From: /tf/active/vicechatdev/docchat/example_usage.py
  • class ApproverAssignment_v1 45.2% similar

    Model class representing an approver assignment within an approval cycle, managing the relationship between an approver and their approval task including status, decisions, and timeline tracking.

    From: /tf/active/vicechatdev/CDocs/models/approval.py
  • class ApproverAssignment 44.8% similar

    Model class representing an approver assignment within an approval cycle, managing the relationship between an approver and their approval task including status, decisions, and lifecycle tracking.

    From: /tf/active/vicechatdev/CDocs/models/approval_bis.py
← Back to Browse