class VendorEnricher
A class that enriches vendor information by finding official email addresses and VAT numbers using RAG (Retrieval-Augmented Generation) with ChromaDB document search and web search capabilities.
/tf/active/vicechatdev/find_email/vendor_enrichment.py
33 - 415
complex
Purpose
VendorEnricher automates the process of finding and validating vendor contact information (emails and VAT numbers) from an Excel file. It uses a hybrid RAG engine that searches both a ChromaDB collection of company documents and performs web searches to find official, verified contact information. The class manages the entire workflow: loading vendor data, initializing the RAG engine with proper configuration, querying for each vendor, extracting structured information from responses, and saving enriched results back to Excel with confidence scores and source attribution.
Source Code
class VendorEnricher:
"""Enriches vendor information using RAG and web search"""
def __init__(self, excel_file, collection_name="00_company_governance"):
"""
Initialize the vendor enricher
Args:
excel_file: Path to Excel file with vendor list
collection_name: ChromaDB collection to search
"""
self.excel_file = excel_file
self.collection_name = collection_name
self.df = None
self.rag_engine = None
self.results = []
# Set API keys from environment or use defaults
os.environ["OPENAI_API_KEY"] = os.environ.get(
"OPENAI_API_KEY",
"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"
)
os.environ["SERPER_API_KEY"] = os.environ.get(
"SERPER_API_KEY",
"9a1f42c99feee69526e216af14e07b64fb4b3bfb"
)
def load_vendors(self):
"""Load vendor list from Excel file"""
logger.info(f"Loading vendor list from {self.excel_file}")
try:
self.df = pd.read_excel(self.excel_file)
logger.info(f"Loaded {len(self.df)} vendors")
logger.info(f"Columns: {list(self.df.columns)}")
# Identify vendor column (assume first column)
self.vendor_column = self.df.columns[0]
logger.info(f"Using vendor column: {self.vendor_column}")
# Add result columns if they don't exist
if 'Email_Found' not in self.df.columns:
self.df['Email_Found'] = ''
if 'VAT_Number' not in self.df.columns:
self.df['VAT_Number'] = ''
if 'Source' not in self.df.columns:
self.df['Source'] = ''
if 'Confidence' not in self.df.columns:
self.df['Confidence'] = ''
return True
except Exception as e:
logger.error(f"Error loading Excel file: {e}")
return False
def initialize_rag(self):
"""Initialize the RAG engine with proper configuration"""
logger.info("Initializing RAG engine...")
try:
self.rag_engine = OneCo_hybrid_RAG()
# Configure for document + web search
self.rag_engine.flow_control.update({
"model": ["OpenAi", "gpt-4o", 0], # Main model
"pre_model": ["OpenAi", "gpt-4o-mini", 0], # Small model for extraction
"enable_search": True, # Enable document search
"enable_web_search": True, # Enable web search
"web_search_queries": 5, # 5 queries per cycle for thorough search
"web_search_cycles": 1, # Single cycle
"enable_memory": False, # No memory needed
"detail_level": "Comprehensive", # Get comprehensive info
"enable_reference_filtering": True,
"relevance_threshold": 0.3
})
# **CRITICAL: Configure ChromaDB collection to search**
logger.info(f"Adding ChromaDB collection: {self.collection_name}")
self.rag_engine.data_handles.add_data(
name="company_docs",
type="chromaDB",
data=self.collection_name, # Collection name as string
inclusions=10, # Number of documents to retrieve
instructions="Search for official company contact information including email addresses and VAT/Tax ID numbers."
)
logger.info("RAG engine initialized successfully")
logger.info(f"✓ Collection '{self.collection_name}' configured for search")
return True
except Exception as e:
logger.error(f"Error initializing RAG engine: {e}")
return False
def create_search_query(self, vendor_name):
"""
Create optimized search query for vendor information
Args:
vendor_name: Name of the vendor
Returns:
Search query string
"""
query = f"""Find official contact information for {vendor_name}:
CRITICAL REQUIREMENTS:
1. **OFFICIAL EMAIL ADDRESS** - Find the primary, general-purpose business email:
- PRIORITY: Global corporate emails (from main .com domain) over regional/country-specific emails
- Preferred formats in order: "info@", "contact@", "sales@", "support@"
- AVOID: Regional emails (e.g., *india@, *uk@, *pscs*@), individual employee emails
- Must be from official company website or verified business directory
- Provide the EXACT email address format: name@domain.com
- The email should be suitable for general international business inquiries
2. **VAT NUMBER** (Tax ID / VAT Registration Number / BTW Number):
- Look for official VAT registration number
- May be labeled as: VAT, Tax ID, Company Registration Number, BTW-nummer
- Include the full number with country prefix if available (e.g., BE0123456789)
3. **SOURCE VERIFICATION**:
- Official company website (highest priority)
- Government business registers
- Verified business directories
OUTPUT FORMAT (be precise):
- Email: [exact email address - must include @ and domain]
- VAT Number: [complete VAT/Tax ID]
- Source: [specific website or register where found]
- Confidence: [High=official source / Medium=business directory / Low=uncertain]
If information cannot be found, state "Not found" for that specific field."""
return query
def extract_info_from_response(self, response_text):
"""
Extract structured information from LLM response
Args:
response_text: Response from RAG engine
Returns:
Dictionary with extracted info
"""
import re
result = {
'email': 'Not found',
'vat_number': 'Not found',
'source': 'Web search',
'confidence': 'Medium'
}
# Try to extract email addresses using regex
email_pattern = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'
emails = re.findall(email_pattern, response_text)
lines = response_text.split('\n')
for line in lines:
line_lower = line.lower().strip()
# Extract email - prefer from "Email:" lines
if 'email' in line_lower and ':' in line:
email_part = line.split(':', 1)[1].strip()
# Clean up markdown, brackets, citations
email_part = re.sub(r'\[.*?\]', '', email_part) # Remove [Block X] citations
email_part = email_part.replace('*', '').strip()
# Try to find email in cleaned text
found_emails = re.findall(email_pattern, email_part)
if found_emails:
result['email'] = found_emails[0]
elif email_part and email_part.lower() not in ['not found', 'not available', 'unknown']:
# Take the text as-is if it looks like an email
if '@' in email_part and '.' in email_part:
result['email'] = email_part.split()[0] # Take first word if multiple
# Extract VAT number
if ('vat' in line_lower or 'tax id' in line_lower or 'btw' in line_lower) and ':' in line:
vat_part = line.split(':', 1)[1].strip()
# Clean up markdown, brackets, citations
vat_part = re.sub(r'\[.*?\]', '', vat_part)
vat_part = vat_part.replace('*', '').strip()
if vat_part and vat_part.lower() not in ['not found', 'not available', 'unknown']:
# Remove any explanatory text after the VAT number
vat_part = vat_part.split('.')[0].split(',')[0].strip()
result['vat_number'] = vat_part
# Extract source
if 'source' in line_lower and ':' in line:
source_part = line.split(':', 1)[1].strip()
source_part = re.sub(r'\[.*?\]', '', source_part)
source_part = source_part.replace('*', '').strip()
if source_part and source_part.lower() not in ['unknown', 'not available']:
result['source'] = source_part[:100] # Limit length
# Extract confidence
if 'confidence' in line_lower and ':' in line:
conf_part = line.split(':', 1)[1].strip()
conf_part = re.sub(r'\[.*?\]', '', conf_part)
conf_part = conf_part.replace('*', '').strip().lower()
# Extract just the confidence level word
for level in ['high', 'medium', 'low']:
if level in conf_part:
result['confidence'] = level.capitalize()
break
# If no email found in structured format, try to find any email in the text
if result['email'] == 'Not found' and emails:
result['email'] = emails[0]
return result
def enrich_vendor(self, vendor_name, index, total):
"""
Enrich a single vendor with email and VAT information
Args:
vendor_name: Name of the vendor
index: Current index (for progress tracking)
total: Total number of vendors
Returns:
Dictionary with enrichment results
"""
logger.info(f"[{index}/{total}] Processing: {vendor_name}")
try:
# Create search query
query = self.create_search_query(vendor_name)
# Execute search using RAG engine
logger.info(f" Executing RAG query...")
response = self.rag_engine.response_callback(query)
# Extract response text - handle various response types
if hasattr(response, 'content'):
response_text = response.content
elif hasattr(response, 'object'):
# Panel Markdown object
response_text = str(response.object)
else:
response_text = str(response)
# Clean up if it's still a string representation of Markdown
if response_text.startswith('Markdown('):
# Extract the actual text from Markdown(str) format
import re
match = re.search(r"Markdown\(['\"](.+?)['\"]\)", response_text, re.DOTALL)
if match:
response_text = match.group(1)
else:
# Fallback: just remove Markdown() wrapper
response_text = response_text.replace('Markdown(str)', '').strip()
logger.info(f" Response length: {len(response_text)} chars")
# Extract structured information
result = self.extract_info_from_response(response_text)
result['vendor_name'] = vendor_name
result['raw_response'] = response_text[:500] # Store first 500 chars
logger.info(f" Results: Email={result['email']}, VAT={result['vat_number']}, Confidence={result['confidence']}")
return result
except Exception as e:
logger.error(f" Error processing {vendor_name}: {e}")
return {
'vendor_name': vendor_name,
'email': 'Error',
'vat_number': 'Error',
'source': f'Error: {str(e)}',
'confidence': 'Low',
'raw_response': ''
}
def enrich_all_vendors(self, start_index=0, end_index=None, delay=2):
"""
Enrich all vendors in the list
Args:
start_index: Start from this index (for resuming)
end_index: End at this index (None = all)
delay: Delay between requests in seconds
"""
if self.df is None:
logger.error("No vendor data loaded")
return False
if not self.initialize_rag():
logger.error("Failed to initialize RAG engine")
return False
vendors = self.df[self.vendor_column].tolist()
if end_index is None:
end_index = len(vendors)
vendors_to_process = vendors[start_index:end_index]
total = len(vendors_to_process)
logger.info(f"Processing {total} vendors (indices {start_index} to {end_index-1})")
for i, vendor in enumerate(vendors_to_process, 1):
if pd.isna(vendor) or str(vendor).strip() == '':
logger.info(f"[{i}/{total}] Skipping empty vendor at index {start_index + i - 1}")
continue
result = self.enrich_vendor(vendor, i, total)
self.results.append(result)
# Update DataFrame
df_index = start_index + i - 1
self.df.at[df_index, 'Email_Found'] = result['email']
self.df.at[df_index, 'VAT_Number'] = result['vat_number']
self.df.at[df_index, 'Source'] = result['source']
self.df.at[df_index, 'Confidence'] = result['confidence']
# Save progress every 5 vendors
if i % 5 == 0:
self.save_results()
# Rate limiting
if i < total:
logger.info(f" Waiting {delay}s before next request...")
time.sleep(delay)
logger.info("All vendors processed")
return True
def save_results(self):
"""Save enriched results to Excel file"""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
output_file = self.excel_file.replace('.xlsx', f'_enriched_{timestamp}.xlsx')
logger.info(f"Saving results to {output_file}")
try:
self.df.to_excel(output_file, index=False)
logger.info(f"Results saved successfully")
# Also save as CSV for easier inspection
csv_file = output_file.replace('.xlsx', '.csv')
self.df.to_csv(csv_file, index=False)
logger.info(f"CSV version saved to {csv_file}")
return output_file
except Exception as e:
logger.error(f"Error saving results: {e}")
return None
def generate_summary(self):
"""Generate summary statistics of enrichment"""
if self.df is None:
return
total = len(self.df)
emails_found = len(self.df[self.df['Email_Found'].str.contains('@', na=False)])
vat_found = len(self.df[(self.df['VAT_Number'] != '') &
(self.df['VAT_Number'] != 'Not found') &
(self.df['VAT_Number'].notna())])
high_conf = len(self.df[self.df['Confidence'] == 'High'])
medium_conf = len(self.df[self.df['Confidence'] == 'Medium'])
low_conf = len(self.df[self.df['Confidence'] == 'Low'])
logger.info("\n" + "=" * 60)
logger.info("ENRICHMENT SUMMARY")
logger.info("=" * 60)
logger.info(f"Total vendors: {total}")
logger.info(f"Emails found: {emails_found} ({emails_found/total*100:.1f}%)")
logger.info(f"VAT numbers found: {vat_found} ({vat_found/total*100:.1f}%)")
logger.info(f"\nConfidence levels:")
logger.info(f" High: {high_conf} ({high_conf/total*100:.1f}%)")
logger.info(f" Medium: {medium_conf} ({medium_conf/total*100:.1f}%)")
logger.info(f" Low: {low_conf} ({low_conf/total*100:.1f}%)")
logger.info("=" * 60)
Parameters
| Name | Type | Default | Kind |
|---|---|---|---|
bases |
- | - |
Parameter Details
excel_file: Path to the Excel file containing the vendor list. The first column is assumed to contain vendor names. The file will be read using pandas and enriched with additional columns (Email_Found, VAT_Number, Source, Confidence).
collection_name: Name of the ChromaDB collection to search for vendor information. Defaults to '00_company_governance'. This collection should contain company documents with contact information and VAT numbers.
Return Value
Instantiation returns a VendorEnricher object. Key method returns: load_vendors() returns bool (success/failure), initialize_rag() returns bool (success/failure), enrich_vendor() returns dict with keys {'vendor_name', 'email', 'vat_number', 'source', 'confidence', 'raw_response'}, enrich_all_vendors() returns bool (success/failure), save_results() returns str (output file path) or None on error.
Class Interface
Methods
__init__(self, excel_file, collection_name='00_company_governance')
Purpose: Initialize the VendorEnricher with Excel file path and ChromaDB collection name, set up API keys from environment variables
Parameters:
excel_file: Path to Excel file containing vendor listcollection_name: ChromaDB collection name to search (default: '00_company_governance')
Returns: None (constructor)
load_vendors(self) -> bool
Purpose: Load vendor list from Excel file into DataFrame, identify vendor column, and add result columns if they don't exist
Returns: True if loading successful, False if error occurred
initialize_rag(self) -> bool
Purpose: Initialize the RAG engine with proper configuration for document and web search, configure ChromaDB collection
Returns: True if initialization successful, False if error occurred
create_search_query(self, vendor_name: str) -> str
Purpose: Create an optimized search query for finding vendor email and VAT information with specific instructions for the LLM
Parameters:
vendor_name: Name of the vendor to search for
Returns: Formatted search query string with detailed instructions for finding email and VAT number
extract_info_from_response(self, response_text: str) -> dict
Purpose: Extract structured information (email, VAT number, source, confidence) from LLM response text using regex and parsing
Parameters:
response_text: Raw text response from the RAG engine
Returns: Dictionary with keys: 'email', 'vat_number', 'source', 'confidence' (defaults to 'Not found' if not extracted)
enrich_vendor(self, vendor_name: str, index: int, total: int) -> dict
Purpose: Enrich a single vendor by querying the RAG engine and extracting contact information
Parameters:
vendor_name: Name of the vendor to enrichindex: Current index for progress trackingtotal: Total number of vendors being processed
Returns: Dictionary with keys: 'vendor_name', 'email', 'vat_number', 'source', 'confidence', 'raw_response'
enrich_all_vendors(self, start_index: int = 0, end_index: int = None, delay: int = 2) -> bool
Purpose: Enrich all vendors in the list with rate limiting, progress tracking, and auto-save every 5 vendors
Parameters:
start_index: Start processing from this index (default: 0, useful for resuming)end_index: End processing at this index (default: None means process all)delay: Delay in seconds between requests for rate limiting (default: 2)
Returns: True if processing completed successfully, False if error occurred
save_results(self) -> str | None
Purpose: Save enriched results to timestamped Excel and CSV files
Returns: Path to output Excel file if successful, None if error occurred
generate_summary(self) -> None
Purpose: Generate and log summary statistics of the enrichment process including success rates and confidence levels
Returns: None (logs summary to logger)
Attributes
| Name | Type | Description | Scope |
|---|---|---|---|
excel_file |
str | Path to the Excel file containing vendor list | instance |
collection_name |
str | Name of the ChromaDB collection to search | instance |
df |
pandas.DataFrame | None | DataFrame containing vendor data loaded from Excel, initially None until load_vendors() is called | instance |
rag_engine |
OneCo_hybrid_RAG | None | Instance of the RAG engine for document and web search, initially None until initialize_rag() is called | instance |
results |
list[dict] | List of enrichment results for each vendor, each dict contains vendor_name, email, vat_number, source, confidence, raw_response | instance |
vendor_column |
str | Name of the column containing vendor names (automatically set to first column by load_vendors()) | instance |
Dependencies
ossyspandasjsontimedatetimeloggingargparserehybrid_rag_engine
Required Imports
import os
import sys
import pandas as pd
import json
import time
from datetime import datetime
from hybrid_rag_engine import OneCo_hybrid_RAG
import logging
import argparse
import re
Usage Example
# Initialize the enricher
enricher = VendorEnricher(
excel_file='vendors.xlsx',
collection_name='00_company_governance'
)
# Load vendor data from Excel
if not enricher.load_vendors():
print('Failed to load vendors')
exit(1)
# Enrich all vendors (or specify range)
# Process vendors 0-10 with 2 second delay between requests
enricher.enrich_all_vendors(start_index=0, end_index=10, delay=2)
# Save results to timestamped Excel file
output_file = enricher.save_results()
print(f'Results saved to: {output_file}')
# Generate summary statistics
enricher.generate_summary()
# Access results programmatically
for result in enricher.results:
print(f"{result['vendor_name']}: {result['email']} (Confidence: {result['confidence']})")
Best Practices
- Always call load_vendors() before enrich_all_vendors() to load the Excel data
- The class automatically initializes the RAG engine when enrich_all_vendors() is called, no need to call initialize_rag() manually
- Use start_index and end_index parameters to process vendors in batches for large datasets
- Set appropriate delay parameter (default 2 seconds) to avoid rate limiting from API providers
- Results are auto-saved every 5 vendors during processing to prevent data loss
- Call save_results() at the end to get the final timestamped output file
- The class modifies the original DataFrame in-place and adds new columns (Email_Found, VAT_Number, Source, Confidence)
- Check the Confidence field in results: 'High' = official source, 'Medium' = business directory, 'Low' = uncertain
- The class prioritizes global corporate emails over regional/country-specific emails
- Raw responses are truncated to 500 characters in results to save memory
- Use generate_summary() to get statistics on enrichment success rates
- Ensure ChromaDB collection exists and contains relevant documents before running
- The vendor_column is automatically detected as the first column in the Excel file
- Handle exceptions: methods return False/None on failure, check return values
Tags
Similar Components
AI-powered semantic similarity - components with related functionality:
-
function test_single_vendor 83.3% similar
-
function main_v15 76.0% similar
-
function main_v48 61.3% similar
-
function main_v27 55.0% similar
-
class VendorEmailExtractor 54.1% similar