🔍 Code Extractor

class OneCo_hybrid_RAG

Maturity: 15

A class named OneCo_hybrid_RAG

File:
/tf/active/vicechatdev/OneCo_hybrid_RAG copy.py
Lines:
790 - 1760
Complexity:
moderate

Purpose

No detailed description available

Source Code

class OneCo_hybrid_RAG ():


    def __init__(self):
        ## Set API keys
        self.set_api_keys()
        ## Define the flow control variables to be exposed and set default values
        self.flow_control = {
            "pre_model" : ["OpenAi","gpt-4o-mini",0],
            "model" : ["OpenAi","gpt-4o",0],
            "search_engine" : ["Serper","google"],
            "enable_search" : False,
            "enable_memory" : False,
            "memory_max_size" : 3,
            "enable_referencing" : True,
        }
        ## Different type of data can be provided here and will be included in the flow
        self.data_handles = SimpleDataHandle()
        ## Define the UI elements to be exposed
        self.chat_interface=pn.chat.ChatInterface(callback=self.response_callback,width=1200,callback_exception='verbose')
        ## Plan for chat memory
        self.chat_memory = SimpleChatMemory(max_history=self.flow_control["memory_max_size"])
        self.extended_query=None
        # Set up the blocks_dict for references
        self.blocks_dict = {}
        self.block_counter = 1
        
        # Explicitly set OpenAI API type for this class
        os.environ["OPENAI_API_TYPE"] = "openai"
        
        self.init_connections()
        return
    
    def init_connections(self):

        uri = config.DB_ADDR
        user, password = config.DB_AUTH
        self.driver = GraphDatabase.driver(uri, auth=(user, password))
        self.session = self.driver.session(database=config.DB_NAME)
        api_key = "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"  # Replace with your actual API key
        self.chroma_embedder=MyEmbeddingFunction("gpt-4o-mini","text-embedding-3-small",api_key)
        self.chroma_client=chromadb.HttpClient(host='vice_chroma', port=8000)
        self.available_collections = self.chroma_client.list_collections()
        return

    def run_query(self, query, params=None):
        """
        Execute a Cypher query and return the result
        
        Parameters
        ----------
        query : str
            The Cypher query to execute
        params : dict, optional
            Parameters for the query
            
        Returns
        -------
        result
            The query result
        """
        if params is None:
            params = {}
        return self.session.run(query, params)
    
    def evaluate_query(self, query, params=None):
        """
        Execute a Cypher query and return a single result
        
        Parameters
        ----------
        query : str
            The Cypher query to execute
        params : dict, optional
            Parameters for the query
            
        Returns
        -------
        object
            The single result value
        """
        if params is None:
            params = {}
        result = self.session.run(query, params)
        record = result.single()
        if record:
            return record[0]
        return None
    
    def push_changes(self, node):
        """
        Push changes to a node to the database
        
        Parameters
        ----------
        node : dict or node-like object
            Node with properties to update
        """
        # Extract node properties, handling both dict-like and node-like objects
        if hasattr(node, 'items'):
            # Dict-like object
            properties = {k: v for k, v in node.items() if k != 'labels'}
            labels = node.get('labels', [])
            uid = node.get('UID')
        else:
            # Node-like object from previous driver
            properties = {k: node[k] for k in node.keys() if k != 'UID'}
            labels = list(node.labels)
            uid = node['UID']
        
        # Construct labels string for Cypher
        if labels:
            labels_str = ':'.join(labels)
            match_clause = f"MATCH (n:{labels_str} {{UID: $uid}})"
        else:
            match_clause = "MATCH (n {UID: $uid})"
        
        # Update node properties
        if properties:
            set_clauses = [f"n.`{key}` = ${key}" for key in properties]
            query = f"{match_clause} SET {', '.join(set_clauses)}"
            params = {"uid": uid, **properties}
            self.run_query(query, params)
        
        return
    

    def count_tokens(self,text):
        encoding = tiktoken.get_encoding("cl100k_base")
        return len(encoding.encode(text))
    
    def set_api_keys(self):
        ## Public openAI key
        os.environ["OPENAI_API_KEY"]='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'
        ## Serper API key
        os.environ["SERPER_API_KEY"] = "9a1f42c99feee69526e216af14e07b64fb4b3bfb"
        ## AzureOpenAI endpoint
        os.environ["AZURE_OPENAI_ENDPOINT"] = "https://vice-llm-2.openai.azure.com/openai/deployments/OneCo-gpt/chat/completions?api-version=2024-08-01-preview"
        ## AzureOpenAI key
        os.environ["AZURE_OPENAI_API_KEY"] = "8DaDtzYz3HePiypmFb6JQmJd3zUCtyCQkiYE8bePRnpyk2YNkJZRJQQJ99BAACfhMk5XJ3w3AAABACOGyJVB"
        return
    
    def extract_core_query(self,query_text):
        """
        Extracts the core information-seeking question from a user query that may contain
        both a question and processing instructions for the RAG system.
        
        Args:
            query_text: The original user query text
            
        Returns:
            dict: Contains the extracted information with keys:
                - core_question: The actual information need/question
                - instructions: Any processing instructions found
                - is_complex: Boolean indicating if query contained instructions
        """
        # Use the pre-model (smaller model) for this extraction task
        llm = ChatOpenAI(
            model=self.flow_control['pre_model'][1],
            temperature=self.flow_control['pre_model'][2],
        )
        
        prompt = f"""
        You are an AI query analyzer. Your task is to analyze the following user query and separate it into:
        1. The core information-seeking question (what the user actually wants to know)
        2. Any processing instructions (how the user wants information presented or processed)
        
        User query: {query_text}
        
        Output your analysis in strict JSON format:
        ```json
        {{
            "core_question": "The main question or information need",
            "instructions": "Any processing instructions (or empty string if none)",
            "is_complex": true/false (true if query contains instructions, false if it's just a question)
        }}
        ```
        
        Examples:
        
        Input: "Tell me about mRNA vaccines and format the answer with bullet points"
        Output: 
        ```json
        {{
            "core_question": "Tell me about mRNA vaccines",
            "instructions": "format the answer with bullet points",
            "is_complex": true
        }}
        ```
        
        Input: "What are the main types of vaccine adjuvants?"
        Output: 
        ```json
        {{
            "core_question": "What are the main types of vaccine adjuvants?",
            "instructions": "",
            "is_complex": false
        }}
        ```
        
        Only respond with the JSON output, nothing else.
        """
        
        response = llm.invoke(prompt)
        
        try:
            # Extract JSON from response if needed
            content = response.content
            if '```json' in content:
                content = content.split('```json')[1].split('```')[0].strip()
            elif '```' in content:
                content = content.split('```')[1].split('```')[0].strip()
                
            result = json.loads(content)
            return result
        except Exception as e:
            # Fallback if parsing fails
            print(f"Error parsing LLM response: {e}")
            return {
                "core_question": query_text,
                "instructions": "",
                "is_complex": False
            }

    def response_callback(self, query):
        ## We make a difference between the search enabled or disabled mode  - the first will have 2 separate LLM calls.
        ## Common part - prepare the data
        query_analysis = self.extract_core_query(query)
        search_query = query_analysis["core_question"]
        print("search query", search_query)
        
        # Store the analysis for later use in processing
        self.current_query_analysis = query_analysis
        
        # Parse handler using the core question for retrieval
        data_sections = self.parse_handler(search_query)

        ## prepare LLM following flow control
        if self.flow_control['model'][0]=="OpenAi":
            llm = ChatOpenAI(
                model=self.flow_control['model'][1],
                temperature=self.flow_control['model'][2],
                timeout=None,
                max_retries=2)
        elif self.flow_control['model'][0]=="Azure":
            llm = AzureChatOpenAI(
                azure_deployment=self.flow_control['model'][1],
                api_version=self.flow_control['model'][3],
                temperature=self.flow_control['model'][2],
                max_tokens=2500,
                timeout=None,
                max_retries=2)
        else:
            llm = ChatOpenAI(
                model='gpt-4o',
                temperature=0,
                timeout=None,
                max_retries=2)
    
        ## Search enabled mode
        self.search_results = ""
        if self.flow_control["enable_search"]:
            ## generate a first response to start the search
            prompt=self.generate_prompt("Vaccine_google",data_sections,query)
            answer = llm.invoke(prompt)
            print("input for web search", answer.content)
            dict=json.loads(answer.content[8:-4])
            search_tool = GoogleSerperAPIWrapper()
            for s in dict['search_queries']:
                print("searching with ",s)
                self.search_results = self.search_results+"\n"+ search_tool.run(s)
        ## This is the common part for both modes
        prompt=self.generate_prompt("Vaccine_base",data_sections,query)
        #print("prompt for final answer : ", prompt)
        answer = llm.invoke(prompt)
        # If reference formatting is enabled, apply it
        if self.flow_control["enable_referencing"]:
            # No need for conversion - use blocks_dict directly
            ref_manager = ReferenceManager(default_style="apa")
            processed_text, references_section = ref_manager.process_references(
                answer.content, 
                self.blocks_dict, 
                style="apa"
            )
            #print("using refs", references_section)
            formatted_answer = processed_text + "\n\n" + references_section
        else:
            formatted_answer = answer.content



        self.chat_memory.save_context(
                {"role": "user", "content": query},
                {"role": "assistant", "content": answer.content},
            )
        ## We generate a list of references in Markdown format

        return pn.pane.Markdown(formatted_answer)
    
    def get_embedding(self,text):
        """Generate an embedding for the given text using OpenAI's text-embedding-ada-002 model."""
        response = openai.embeddings.create(
                    model="text-embedding-3-small",
                    input=text
                )
        return response.data[0].embedding
    


    def parse_handler(self, query):
        data_sections = {}
        # Create blocks_dict directly in the format needed by ReferenceManager
        self.blocks_dict = {}  # Replace self.inline_refs with self.blocks_dict
        self.block_counter = 1  # Start block numbering from 1 to match example
        
        for key in self.data_handles.handlers.keys():
            if self.data_handles.handlers[key]["type"] == "text":
                data_sections[key] = f"[block {self.block_counter}] {self.data_handles.handlers[key]['data']}"
                # Create block entry in proper format
                self.blocks_dict[self.block_counter] = {
                    "type": "generic",
                    "id": f"text_{self.block_counter}",
                    "content": f"Text content: {self.data_handles.handlers[key]['data'][:100]}..."
                }
            elif self.data_handles.handlers[key]["type"] == "dataframe":
                data_sections[key] = f"[block {self.block_counter}] {self.data_handles.handlers[key]['data'].to_markdown()}"
                # Create block entry for dataframe
                self.blocks_dict[self.block_counter] = {
                    "type": "generic",
                    "id": f"dataframe_{self.block_counter}",
                    "content": f"Dataframe content from {key}"
                }
            elif self.data_handles.handlers[key]["type"] == "vectorstore":
                data_sections[key] = self.collect_text_blocks(self.data_handles.handlers[key], query)
            elif self.data_handles.handlers[key]["type"] == "db_search":
                data_sections[key] = self.collect_data_from_neo4j(self.data_handles.handlers[key], query)
            elif self.data_handles.handlers[key]["type"] == "chromaDB":
                data_sections[key] = self.collect_data_from_chroma(self.data_handles.handlers[key], query)
            
            self.block_counter += 1
        
        return data_sections
    
    def reformat_data(self, data, min_document_length=30, similarity_threshold=0.95, use_crossencoder=False, inclusions=10):
        """
        Reformat and filter data to be grouped by ID, excluding too-short documents
        and documents that are too similar to each other. Optionally applies crossencoder ranking.
        
        Args:
            data: Original data structure
            min_document_length: Minimum character length for documents to include (default: 30)
            similarity_threshold: Threshold for document similarity (default: 0.95, higher means more similar)
            use_crossencoder: Whether to apply crossencoder reranking (default: False)
            inclusions: Number of documents to return after filtering (default: 10)
            
        Returns:
            List of selected documents (not dictionary)
        """
        from sentence_transformers import CrossEncoder
        import numpy as np
        
        result = {}
        selected_docs = []
        selected_embeddings = []
        
        # Unpack the nested lists for easier access
        ids_list = data['ids'][0]
        documents_list = data['documents'][0]
        metadatas_list = data['metadatas'][0]
        embeddings_array = data['embeddings'][0]
        
        # First pass: filter by document length and organize data
        candidates = []
        for i, id_val in enumerate(ids_list):
            # Check if document meets length requirement and does not exceed a max toaken lenght
            if len(documents_list[i]) >= min_document_length and self.count_tokens(documents_list[i]) <= 10000:
                candidates.append({
                    'id': id_val,
                    'document': documents_list[i],
                    'metadata': metadatas_list[i],
                    'embedding': embeddings_array[i].tolist() if embeddings_array is not None else None
                })
        
        # If we don't have enough candidates, return all we have
        if len(candidates) <= inclusions:
            return [(doc['metadata'],doc['document']) for doc in candidates]
        
        # Second pass: filter by similarity
        for candidate in candidates:
            candidate_embedding = np.array(candidate['embedding'])
            # Normalize embedding
            norm = np.linalg.norm(candidate_embedding)
            if norm > 0:
                candidate_embedding = candidate_embedding / norm
                
            # Check if candidate is too similar to any already selected document
            is_redundant = False
            for sel_emb in selected_embeddings:
                similarity = np.dot(candidate_embedding, sel_emb)
                if similarity >= similarity_threshold:
                    is_redundant = True
                    break
                    
            if not is_redundant:
                # Add to result dictionary
                result[candidate['id']] = {
                    'document': candidate['document'],
                    'metadata': candidate['metadata'],
                    'embedding': candidate['embedding']
                }
                # Add to selected lists for similarity checks
                selected_docs.append(candidate)
                selected_embeddings.append(candidate_embedding)
                
                # If we've collected enough documents and don't need crossencoder, we can stop
                if len(selected_docs) >= inclusions * 2 and not use_crossencoder:
                    break
        
        # If using crossencoder for reranking
        if use_crossencoder and len(selected_docs) > inclusions:
            query = data.get('query_text', '')
            if not query:  # If no query provided, use a placeholder
                query = "default query"  # Ideally this should be passed in
                
            cross_model = CrossEncoder('BAAI/bge-reranker-base')
            query_doc_pairs = [(query, doc['document']) for doc in selected_docs]
            scores = cross_model.predict(query_doc_pairs)
            
            # Zip documents with their scores and sort by score (highest first)
            doc_score_pairs = list(zip(selected_docs, scores))
            ranked_docs = sorted(doc_score_pairs, key=lambda x: x[1], reverse=True)
            
            # Take the top 'inclusions' documents after reranking
            selected_docs = [doc for doc, _ in ranked_docs[:inclusions]]
        elif len(selected_docs) > inclusions:
            # If not using crossencoder but have too many docs, just take the first 'inclusions'
            selected_docs = selected_docs[:inclusions]
        
        # Return just the document text for further processing
        #print("returning ",[(doc['metadata'],doc['document']) for doc in selected_docs])
        return [(doc['metadata'],doc['document']) for doc in selected_docs]

    def extract_filter_keywords(self, query, n_keywords=2):
        """
        Extract distinguishing keywords from a query for filtering search results.
        
        Args:
            query: The user's query text
            n_keywords: Maximum number of keywords to extract
        
        Returns:
            List of keywords for filtering
        """
        llm = ChatOpenAI(
            model=self.flow_control['pre_model'][1],
            temperature=0
        )
        
        # Make the instruction much more explicit about the exact count
        prompt = f"""
        You are a search optimization expert. Extract EXACTLY {n_keywords} specific distinguishing keyword(s) from this query:
        "{query}"
        
        Guidelines:
        - You MUST return EXACTLY {n_keywords} keyword(s) - no more, no less
        - Focus on proper nouns, company names, technical terms, and specific concepts
        - Select word(s) that would differentiate this topic from related but irrelevant topics
        - Choose word(s) that could filter out incorrect contexts (wrong companies, unrelated domains)
        - Exclude common words like "the", "and", "of"
        - Return ONLY a JSON array of strings with EXACTLY {n_keywords} string(s)
        
        Example:
        For "Impact of Pfizer's mRNA vaccine development on COVID-19 transmission" and n_keywords=1
        Output: ["Pfizer"]
        
        For "Impact of Pfizer's mRNA vaccine development on COVID-19 transmission" and n_keywords=3
        Output: ["Pfizer", "mRNA", "COVID-19"]
        
        Output format:
        ```json
        ["keyword1"{", keyword2" if n_keywords > 1 else ""}{", ..." if n_keywords > 2 else ""}]
        ```
        
        Remember: I need EXACTLY {n_keywords} keyword(s). Count carefully before submitting.
        """
        
        response = llm.invoke(prompt)
        
        try:
            # Extract JSON from response
            content = response.content.strip()
            if '```json' in content:
                content = content.split('```json')[1].split('```')[0].strip()
            elif '```' in content:
                content = content.split('```')[1].split('```')[0].strip()
                
            keywords = json.loads(content)
            
            # Force the correct number of keywords
            if len(keywords) > n_keywords:
                keywords = keywords[:n_keywords]
            elif len(keywords) < n_keywords and len(keywords) > 0:
                # If we got fewer keywords than requested but at least one, duplicate the first one
                while len(keywords) < n_keywords:
                    keywords.append(keywords[0])
                
            return [k.lower() for k in keywords]  # Convert to lowercase for case-insensitive matching
        except Exception as e:
            print(f"Error extracting keywords: {e}")
            # Fall back to simple keyword extraction if LLM fails
            words = query.lower().split()
            stopwords = ['the', 'a', 'an', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for', 'with', 'about']
            return [w for w in words if w not in stopwords and len(w) > 3][:n_keywords]
    
    def collect_data_from_chroma(self, data, query):
        collected_blocks = []
        
        # Extract filter keywords
        filter_keywords = self.extract_filter_keywords(query)
        print(f"Using filter keywords: {filter_keywords}")
        
        # Configuration for retrieval
        if "crossencoder" in data["processing_steps"]:
            initial_k = data["inclusions"] * 10
        else:
            initial_k = data["inclusions"]
            
        if "extend_query" in data["processing_steps"]:
            self.extend_query(query)
            self.extended_query.append(query)
        else:
            self.extended_query = [query]
                
        client = self.chroma_client.get_collection(data["data"], embedding_function=self.chroma_embedder)
        
        # Track how many documents we've added
        docs_added = 0
        target_docs = data["inclusions"]
        
        for q in self.extended_query:
            print("working on query ", q)
            
            # Build where clauses for keyword filtering
            # Start with hybrid search (both vector and keyword)
            if filter_keywords and "keyword_filter" in data.get("processing_steps", []):
                # First attempt: Try to get documents that contain ALL keywords
                # We need to retrieve documents first, then filter them manually since ChromaDB
                # doesn't support complex boolean operations in where clauses
                
                print(f"Retrieving documents for ALL keywords filter: {filter_keywords}")
                
                # Retrieve a larger batch of documents for post-filtering
                retrieved_docs = client.query(
                    query_texts=[q],
                    n_results=initial_k*5,  # Get more results since we'll filter many out
                    include=["documents", "metadatas", "embeddings"]
                )
                
                # Post-process to filter for documents containing ALL keywords
                if retrieved_docs['documents'] and len(retrieved_docs['documents'][0]) > 0:
                    filtered_docs = {
                        'ids': [[]],
                        'documents': [[]],
                        'metadatas': [[]],
                        'embeddings': [[]]
                    }
                    
                    for i, doc in enumerate(retrieved_docs['documents'][0]):
                        # Convert to lowercase for case-insensitive matching
                        doc_lower = doc.lower()
                        
                        # Check if document contains ALL keywords
                        all_keywords_present = all(keyword.lower() in doc_lower for keyword in filter_keywords)
                        
                        if all_keywords_present:
                            filtered_docs['ids'][0].append(retrieved_docs['ids'][0][i])
                            filtered_docs['documents'][0].append(doc)
                            filtered_docs['metadatas'][0].append(retrieved_docs['metadatas'][0][i])
                            if retrieved_docs['embeddings'] and len(retrieved_docs['embeddings'][0]) > i:
                                filtered_docs['embeddings'][0].append(retrieved_docs['embeddings'][0][i])
                    
                    print(f"Found {len(filtered_docs['documents'][0])} documents containing ALL keywords")
                    
                    # If we found documents with all keywords, process them
                    if filtered_docs['documents'][0]:
                        retrieved_texts = self.reformat_data(
                            filtered_docs, 
                            min_document_length=30, 
                            similarity_threshold=0.95,
                            use_crossencoder="crossencoder" in data["processing_steps"],
                            inclusions=(target_docs - docs_added) // 2 or 1
                        )
                        
                        for metadata, document in retrieved_texts:
                            # Add reference in formatted text
                            collected_blocks.append(f"[block {self.block_counter}] {document}")
                                
                            # Create proper blocks_dict entry
                            filepath = metadata.get('bibtex', '')
                            if filepath and filepath.lower().endswith(('.pptx', '.docx', '.xlsx', '.pdf', '.csv', '.txt')):
                                self.blocks_dict[self.block_counter] = {
                                    "type": "document",
                                    "id": f"doc_{self.block_counter}",
                                    "path": filepath,
                                    "description": f"Document from ChromaDB collection '{data['data']}'"
                                }
                            else:
                                self.blocks_dict[self.block_counter] = {
                                    "type": "generic",
                                    "id": f"ref_{self.block_counter}",
                                    "content": f"ChromaDB: {data['data']} - {filepath}"
                                }
                                    
                            self.block_counter += 1
                            docs_added += 1
                            
                            # Check if we have enough documents
                            if docs_added >= target_docs:
                                break
                
                # If we didn't get enough results with all keywords, fallback to individual keyword filtering
                if docs_added == 0 :
                    print(f"No results with ALL keywords, falling back to individual keywords")
                    
                    # Try with individual keywords
                    for keyword in filter_keywords[:3]:  # Limit to 3 keywords to avoid too many queries
                        where_filter = {"$contains": keyword}
                        print(f"Searching with filter: {where_filter}")
                        
                        retrieved_docs = client.query(
                            query_texts=[q],
                            n_results=initial_k,
                            where_document=where_filter,
                            include=["documents", "metadatas", "embeddings"]
                        )
                        
                        if retrieved_docs['documents'] and len(retrieved_docs['documents'][0]) > 0:  # If we got results, process them
                            retrieved_texts = self.reformat_data(
                                retrieved_docs, 
                                min_document_length=30, 
                                similarity_threshold=0.95,
                                use_crossencoder="crossencoder" in data["processing_steps"],
                                inclusions=(target_docs - docs_added) // 2 or 1
                            )
                            
                            for metadata, document in retrieved_texts:
                                # Add reference in formatted text
                                collected_blocks.append(f"[block {self.block_counter}] {document}")
                                    
                                # Create proper blocks_dict entry
                                filepath = metadata.get('bibtex', '')
                                if filepath and filepath.lower().endswith(('.pptx', '.docx', '.xlsx', '.pdf', '.csv', '.txt')):
                                    self.blocks_dict[self.block_counter] = {
                                        "type": "document",
                                        "id": f"doc_{self.block_counter}",
                                        "path": filepath,
                                        "description": f"Document from ChromaDB collection '{data['data']}'"
                                    }
                                else:
                                    self.blocks_dict[self.block_counter] = {
                                        "type": "generic",
                                        "id": f"ref_{self.block_counter}",
                                        "content": f"ChromaDB: {data['data']} - {filepath}"
                                    }
                                        
                                self.block_counter += 1
                                docs_added += 1
                                
                                # Check if we have enough documents
                                if docs_added >= target_docs:
                                    break
                        
                        # If we have enough documents, stop processing keywords
                        if docs_added >= target_docs:
                            break
            else:
                # Standard vector search without keyword filtering
                where_filter = None
                
            # If we still don't have enough results, fall back to pure vector search
            if docs_added == 0 :
                print(f"Falling back to pure vector search for additional documents")
                retrieved_docs = client.query(
                    query_texts=[q],
                    n_results=initial_k*2,
                    where_document=None,  # No filtering for fallback
                    include=["documents", "metadatas", "embeddings"]
                )
                    
                retrieved_texts = self.reformat_data(
                    retrieved_docs, 
                    min_document_length=30, 
                    similarity_threshold=0.95,
                    use_crossencoder="crossencoder" in data["processing_steps"],
                    inclusions=(target_docs - docs_added) // 2 or 1
                )
                    
                for metadata, document in retrieved_texts:
                    # Add reference in formatted text
                    collected_blocks.append(f"[block {self.block_counter}] {document}")
                        
                    # Create proper blocks_dict entry
                    filepath = metadata.get('bibtex', '')
                    if filepath and filepath.lower().endswith(('.pptx', '.docx', '.xlsx', '.pdf', '.csv', '.txt')):
                        self.blocks_dict[self.block_counter] = {
                            "type": "document",
                            "id": f"doc_{self.block_counter}",
                            "path": filepath,
                            "description": f"Document from ChromaDB collection '{data['data']}'"
                        }
                    else:
                        self.blocks_dict[self.block_counter] = {
                            "type": "generic",
                            "id": f"ref_{self.block_counter}",
                            "content": f"ChromaDB: {data['data']} - {filepath}"
                        }
                            
                    self.block_counter += 1
                    docs_added += 1
                    
                    # Check if we have enough documents
                    if docs_added >= target_docs:
                        break
                    
            # If we have enough documents, stop processing queries
            if docs_added >= target_docs:
                break

        print(f"Total docs added: {docs_added}")
        print(f"Collected blocks: ","\n".join(collected_blocks))
        
        return "\n".join(collected_blocks)
    
    def get_embedding(self, text):
        """Generate an embedding for the given text using OpenAI's text-embedding model."""
        # Explicitly set the API type before making the API call
        import openai
        original_api_type = os.environ.get("OPENAI_API_TYPE", None)
        os.environ["OPENAI_API_TYPE"] = "openai"  # Explicitly setting to standard OpenAI
        
        try:
            response = openai.embeddings.create(
                model="text-embedding-3-small",
                input=text
            )
            embedding = response.data[0].embedding
        finally:
            # Restore original API type setting
            if original_api_type is not None:
                os.environ["OPENAI_API_TYPE"] = original_api_type
            elif "OPENAI_API_TYPE" in os.environ:
                del os.environ["OPENAI_API_TYPE"]
        
        return embedding
    
    

    def collect_data_from_neo4j(self,data,query,doc_type):
        collected_blocks = []
        embedding_function = OpenAIEmbeddings(model="text-embedding-3-small")
        if "crossencoder" in data["processing_steps"]:
            initial_k=data["inclusions"]*10
        else:
            initial_k=data["inclusions"]
        if "extend_query" in data["processing_steps"]:
            self.extend_query(query)
            self.extended_query.append(query)
        else:
            self.extended_query=[query]
        if doc_type=="literature data":
            cypher_query = data['data'] + """
            WITH COLLECT(x) AS relevantDocs, $embedding AS query_embedding, $initial_k AS k
            // Perform the vector search on the pre-filtered nodes.
            $vector_call
            YIELD node AS doc, score AS vectorScore
            WHERE doc IN relevantDocs
            // Retrieve additional fields from the parent Topic node.
            MATCH (p:Publication)<--(doc)
            RETURN collect([doc.id AS docId, 
                $target_field AS ChunkText, 
                p.name AS ParentName, 
                p.BibTex AS BibTex,
                vectorScore])
            """
        else:
            cypher_query = data['data'] + """
            WITH COLLECT(x) AS relevantDocs, $embedding AS query_embedding, $initial_k AS k
            // Perform the vector search on the pre-filtered nodes.
            $vector_call
            YIELD node AS doc, score AS vectorScore
            WHERE doc IN relevantDocs
            // Retrieve additional fields from the parent Topic node.
            MATCH (p:Publication)<--(doc)
            RETURN collect([doc.id AS docId, 
                $target_field AS ChunkText, 
                p.name AS ParentName, 
                p.BibTex AS BibTex,
                vectorScore])
            """
        ## First step is alway a similarity search
        for q in self.extended_query:
            print("working on query ",q)
            embedding = self.get_embedding(q)
            ## We first collect Text_chunks
            vector_call='CALL db.index.vector.queryNodes("text_embedding", k, query_embedding)'
            target_field='doc.text'
            query_run = self.session.run(
                    cypher_query,
                    embedding=embedding,
                    initial_k=initial_k,
                    vector_call=vector_call,
                    target_field=target_field
                )
            record = query_run.single()
            if record:
                result = record[0]




        return "\n".join(collected_blocks)
    

    def collect_text_blocks(self, data,query):
        embedding_function = OpenAIEmbeddings()
        if "crossencoder" in data["processing_steps"]:
            initial_k=data["inclusions"]*10
        else:
            initial_k=data["inclusions"]
        if "extend_query" in data["processing_steps"]:
            self.extend_query(query)
            self.extended_query.append(query)
        else:
            self.extended_query=[query]
        ## First step is alway a similarity search
        collected_blocks = []
        retriever = data['data'].as_retriever(
                search_type="similarity", 
                search_kwargs={"k": initial_k*3}
            )
        for q in self.extended_query:
            print("working on query ",q)
            retrieved_docs = retriever.invoke(q)
            retrieved_texts = [doc.page_content for doc in retrieved_docs if len(doc.page_content)>30]
            # Here we recompute embeddings for each candidate document.
            candidate_embeddings = np.array([self.normalize(embedding_function.embed_query(doc))
                                 for doc in retrieved_texts])

            # Compute and normalize the query embedding
            query_embedding = self.normalize(np.array(embedding_function.embed_query(q)))

            # 4. Run MMR to select a diverse subset of documents
            print("running MMR")
            #retrieved_texts = self.mmr(query_embedding, candidate_embeddings, retrieved_texts, lambda_param=0.5, top_k=initial_k)
            retrieved_texts=self.similarity_threshold_filter(query_embedding, candidate_embeddings, retrieved_texts, similarity_threshold=0.95,top_k=initial_k)
            ## If crossencoder is used, we need to rerank the results
            if "crossencoder" in data["processing_steps"]:
                cross_model = CrossEncoder('BAAI/bge-reranker-base')
                query_chunk_pairs = [(q, chunk) for chunk in retrieved_texts]
                scores = cross_model.predict(query_chunk_pairs)
                chunk_score_pairs = list(zip(retrieved_texts, scores))
                ranked_chunks = sorted(chunk_score_pairs, key=lambda x: x[1], reverse=True)
                retrieved_texts = [chunk for chunk, score in ranked_chunks[:data["inclusions"]//2]]
            #print("blocks from ",q," \n","\n".join(retrieved_texts))
            for block in retrieved_texts:
                collected_blocks.append("[block "+str(self.block_counter)+"] "+block)
                self.inline_refs['block '+str(self.block_counter)]='VStore Block '+str(self.block_counter)
                self.block_counter+=1
        return "\n".join(collected_blocks)
    
    def generate_prompt(self,template,data_sections,query):
        prompt_template=my_prompt_templates.get(template,'')
        if prompt_template=="":
            prompt_template=my_prompt_templates.get("Vaccine_base",'')
        prompt=prompt_template["Instructions"]+"\n"
        i=0
        for i, key in enumerate(data_sections.keys()):
            prompt=prompt+"Step "+str(i+1)+" on section labeled  [" +key+"]:  "+self.data_handles.handlers[key]['instructions']+ "\n"
        if self.flow_control["enable_search"]:
            prompt=prompt+"Step "+str(i+2)+" on section labeled [web search results] : Provide a summary of the given context data extracted from the web, using summary tables when possible.\n"
        if self.flow_control["enable_memory"]:
            prompt=prompt+"Step "+str(i+3)+" on section labeled [previous chats] : Also take into account your previous answers.\n"
        prompt=prompt+prompt_template["Output Constraints"]+"\n\n"
        i=0
        for i, key in enumerate(data_sections.keys()):
            prompt=prompt+"Data section "+str(i+1)+"- [" +key+"]\n"+data_sections[key]+ "\n"
        if self.flow_control["enable_search"]:
            prompt=prompt+"Data section "+str(i+2)+"- [web search results] \n"+self.search_results+ "\n"
        if self.flow_control["enable_memory"]:
            prompt=prompt+"Data section "+str(i+3)+"- [previous chats] \n"+self.chat_memory.get_formatted_history()+ "\n"
        prompt=prompt+"User query: "+query
        return prompt

    def extend_query(self,query):
        llm = ChatOpenAI(model_name="gpt-4o-mini", temperature=0)
        prompt = f"""
        You are an AI that enhances a given user search query to improve information  retrieval.
    
        ### User Query:
        {query}

        ### Instructions:   
        - Provide exactly 5 expanded queries.
        - Each query should explore a different aspect or perspective of the original query.
        - Use synonyms, related terms, or rephrased versions to cover various dimensions of the topic.
    
        ### Expanded Queries:
        """
        answer = llm.invoke(prompt)
        self.extended_query=[x for x in answer.content.strip().split("\n") if x != ""]
        print("extended query",self.extended_query)
        return
    
    def normalize(self,vector):
 
       return vector / np.linalg.norm(vector)
    
    def mmr(self,query_embedding, candidate_embeddings, candidate_docs, lambda_param=0.7, top_k=5):

        # Compute similarity between the query and each candidate (dot product assumes normalized vectors)
        candidate_similarities = np.dot(candidate_embeddings, query_embedding)
        
        # Initialize selected and remaining indices
        selected_indices = []
        candidate_indices = list(range(len(candidate_docs)))
        
        # First selection: candidate with highest similarity
        first_idx = int(np.argmax(candidate_similarities))
        selected_indices.append(first_idx)
        candidate_indices.remove(first_idx)
        
        # Iteratively select documents that balance relevance and diversity
        while len(selected_indices) < top_k and candidate_indices:
            best_score = -np.inf
            best_idx = None
            for idx in candidate_indices:
                # Relevance score for candidate idx
                relevance = candidate_similarities[idx]
                # Diversity score: maximum similarity with any already selected document
                diversity = max(np.dot(candidate_embeddings[idx], candidate_embeddings[sel_idx])
                                for sel_idx in selected_indices)
                # Combined MMR score
                score = lambda_param * relevance - (1 - lambda_param) * diversity
                if score > best_score:
                    best_score = score
                    best_idx = idx
            selected_indices.append(best_idx)
            candidate_indices.remove(best_idx)
        
        return [candidate_docs[i] for i in selected_indices]

    def similarity_threshold_filter(self, query_embedding, candidate_embeddings, candidate_docs, similarity_threshold=0.9,top_k=5):
   
        selected_docs = []
        selected_embeddings = []

        # Compute query similarity scores for sorting candidates (highest first)
        candidate_scores = np.dot(candidate_embeddings, query_embedding)
        sorted_indices = np.argsort(candidate_scores)[::-1]

        for idx in sorted_indices:
            candidate_embedding = candidate_embeddings[idx]
            # Check if candidate is too similar to any already selected document
            is_redundant = any(np.dot(candidate_embedding, sel_emb) >= similarity_threshold 
                            for sel_emb in selected_embeddings)
            if not is_redundant and len(selected_docs) < top_k:
                print("appending ",candidate_docs[idx])
                selected_docs.append(candidate_docs[idx])
                selected_embeddings.append(candidate_embedding)
                
        return selected_docs

Parameters

Name Type Default Kind
bases - -

Parameter Details

bases: Parameter of type

Return Value

Returns unspecified type

Class Interface

Methods

__init__(self)

Purpose: Internal method: init

Returns: None

init_connections(self)

Purpose: Performs init connections

Returns: None

run_query(self, query, params)

Purpose: Execute a Cypher query and return the result Parameters ---------- query : str The Cypher query to execute params : dict, optional Parameters for the query Returns ------- result The query result

Parameters:

  • query: Parameter
  • params: Parameter

Returns: See docstring for return details

evaluate_query(self, query, params)

Purpose: Execute a Cypher query and return a single result Parameters ---------- query : str The Cypher query to execute params : dict, optional Parameters for the query Returns ------- object The single result value

Parameters:

  • query: Parameter
  • params: Parameter

Returns: See docstring for return details

push_changes(self, node)

Purpose: Push changes to a node to the database Parameters ---------- node : dict or node-like object Node with properties to update

Parameters:

  • node: Parameter

Returns: None

count_tokens(self, text)

Purpose: Performs count tokens

Parameters:

  • text: Parameter

Returns: None

set_api_keys(self)

Purpose: Sets api keys

Returns: None

extract_core_query(self, query_text)

Purpose: Extracts the core information-seeking question from a user query that may contain both a question and processing instructions for the RAG system. Args: query_text: The original user query text Returns: dict: Contains the extracted information with keys: - core_question: The actual information need/question - instructions: Any processing instructions found - is_complex: Boolean indicating if query contained instructions

Parameters:

  • query_text: Parameter

Returns: See docstring for return details

response_callback(self, query)

Purpose: Performs response callback

Parameters:

  • query: Parameter

Returns: None

get_embedding(self, text)

Purpose: Generate an embedding for the given text using OpenAI's text-embedding-ada-002 model.

Parameters:

  • text: Parameter

Returns: None

parse_handler(self, query)

Purpose: Performs parse handler

Parameters:

  • query: Parameter

Returns: None

reformat_data(self, data, min_document_length, similarity_threshold, use_crossencoder, inclusions)

Purpose: Reformat and filter data to be grouped by ID, excluding too-short documents and documents that are too similar to each other. Optionally applies crossencoder ranking. Args: data: Original data structure min_document_length: Minimum character length for documents to include (default: 30) similarity_threshold: Threshold for document similarity (default: 0.95, higher means more similar) use_crossencoder: Whether to apply crossencoder reranking (default: False) inclusions: Number of documents to return after filtering (default: 10) Returns: List of selected documents (not dictionary)

Parameters:

  • data: Parameter
  • min_document_length: Parameter
  • similarity_threshold: Parameter
  • use_crossencoder: Parameter
  • inclusions: Parameter

Returns: See docstring for return details

extract_filter_keywords(self, query, n_keywords)

Purpose: Extract distinguishing keywords from a query for filtering search results. Args: query: The user's query text n_keywords: Maximum number of keywords to extract Returns: List of keywords for filtering

Parameters:

  • query: Parameter
  • n_keywords: Parameter

Returns: See docstring for return details

collect_data_from_chroma(self, data, query)

Purpose: Performs collect data from chroma

Parameters:

  • data: Parameter
  • query: Parameter

Returns: None

get_embedding(self, text)

Purpose: Generate an embedding for the given text using OpenAI's text-embedding model.

Parameters:

  • text: Parameter

Returns: None

collect_data_from_neo4j(self, data, query, doc_type)

Purpose: Performs collect data from neo4j

Parameters:

  • data: Parameter
  • query: Parameter
  • doc_type: Parameter

Returns: None

collect_text_blocks(self, data, query)

Purpose: Performs collect text blocks

Parameters:

  • data: Parameter
  • query: Parameter

Returns: None

generate_prompt(self, template, data_sections, query)

Purpose: Performs generate prompt

Parameters:

  • template: Parameter
  • data_sections: Parameter
  • query: Parameter

Returns: None

extend_query(self, query)

Purpose: Performs extend query

Parameters:

  • query: Parameter

Returns: None

normalize(self, vector)

Purpose: Performs normalize

Parameters:

  • vector: Parameter

Returns: None

mmr(self, query_embedding, candidate_embeddings, candidate_docs, lambda_param, top_k)

Purpose: Performs mmr

Parameters:

  • query_embedding: Parameter
  • candidate_embeddings: Parameter
  • candidate_docs: Parameter
  • lambda_param: Parameter
  • top_k: Parameter

Returns: None

similarity_threshold_filter(self, query_embedding, candidate_embeddings, candidate_docs, similarity_threshold, top_k)

Purpose: Performs similarity threshold filter

Parameters:

  • query_embedding: Parameter
  • candidate_embeddings: Parameter
  • candidate_docs: Parameter
  • similarity_threshold: Parameter
  • top_k: Parameter

Returns: None

Required Imports

from typing import List
from typing import Any
from typing import Dict
import os
import panel as pn

Usage Example

# Example usage:
# result = OneCo_hybrid_RAG(bases)

Similar Components

AI-powered semantic similarity - components with related functionality:

  • class OneCo_hybrid_RAG_v1 98.6% similar

    A class named OneCo_hybrid_RAG

    From: /tf/active/vicechatdev/OneCo_hybrid_RAG_old.py
  • class OneCo_hybrid_RAG_v2 98.5% similar

    A class named OneCo_hybrid_RAG

    From: /tf/active/vicechatdev/OneCo_hybrid_RAG.py
  • class OneCo_hybrid_RAG_v3 98.2% similar

    A class named OneCo_hybrid_RAG

    From: /tf/active/vicechatdev/vice_ai/hybrid_rag_engine.py
  • function check_rag_config 59.2% similar

    Diagnostic function that inspects and reports configuration details of a hybrid RAG (Retrieval-Augmented Generation) engine module, including model settings and class attributes.

    From: /tf/active/vicechatdev/vice_ai/check_rag_config.py
  • class DocChatRAG 51.7% similar

    Main RAG engine with three operating modes: 1. Basic RAG (similarity search) 2. Extensive (full document retrieval with preprocessing) 3. Full Reading (process all documents)

    From: /tf/active/vicechatdev/docchat/rag_engine.py
← Back to Browse