class VendorEmailExtractor
Extract vendor email addresses from all organizational mailboxes
/tf/active/vicechatdev/find_email/vendor_email_extractor.py
29 - 945
moderate
Purpose
Extract vendor email addresses from all organizational mailboxes
Source Code
class VendorEmailExtractor:
"""Extract vendor email addresses from all organizational mailboxes"""
def __init__(
self,
tenant_id: str,
client_id: str,
client_secret: str,
openai_api_key: str,
domain: str = "vicebio.com",
output_dir: str = "./vendor_emails_output"
):
self.tenant_id = tenant_id
self.client_id = client_id
self.client_secret = client_secret
self.domain = domain
self.output_dir = Path(output_dir)
self.output_dir.mkdir(exist_ok=True)
# OpenAI client
self.openai_client = OpenAI(api_key=openai_api_key)
# Authentication
self.access_token: Optional[str] = None
self.authority = f"https://login.microsoftonline.com/{tenant_id}"
# Progress tracking
self.progress_file = self.output_dir / "extraction_progress.json"
self.results_file = self.output_dir / f"vendor_emails_{datetime.now().strftime('%Y%m%d_%H%M%S')}.xlsx"
self.log_file = self.output_dir / f"extraction_log_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt"
def log(self, message: str):
"""Log message to console and file"""
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
log_msg = f"[{timestamp}] {message}"
print(log_msg)
with open(self.log_file, 'a', encoding='utf-8') as f:
f.write(log_msg + "\n")
def authenticate(self) -> str:
"""Authenticate using client credentials (application permissions)"""
self.log("Authenticating with Azure AD...")
app = msal.ConfidentialClientApplication(
client_id=self.client_id,
client_credential=self.client_secret,
authority=self.authority
)
# Request token with application permissions scope
result = app.acquire_token_for_client(
scopes=["https://graph.microsoft.com/.default"]
)
if "access_token" not in result:
error = result.get("error_description", "Unknown error")
raise RuntimeError(f"Authentication failed: {error}")
self.access_token = result["access_token"]
self.log("✓ Authentication successful")
return self.access_token
def get_mailboxes(self, max_mailboxes: Optional[int] = None) -> List[Dict]:
"""Get list of all mailboxes in the organization"""
if not self.access_token:
self.authenticate()
self.log(f"Retrieving mailboxes for domain {self.domain}...")
headers = {
"Authorization": f"Bearer {self.access_token}",
"Accept": "application/json",
"ConsistencyLevel": "eventual" # Required for advanced queries
}
# Test token and re-authenticate if expired
test_response = requests.get(
"https://graph.microsoft.com/v1.0/users",
headers=headers,
params={"$count": "true", "$top": "1"}
)
if test_response.status_code == 401:
self.log(" Token expired, re-authenticating...")
self.authenticate()
headers["Authorization"] = f"Bearer {self.access_token}"
# Get users with email addresses in the domain
url = "https://graph.microsoft.com/v1.0/users"
params = {
"$filter": f"endswith(mail,'{self.domain}') or endswith(userPrincipalName,'{self.domain}')",
"$select": "id,userPrincipalName,mail,displayName",
"$top": "999",
"$count": "true" # Required with ConsistencyLevel header
}
all_users = []
page = 1
while url:
response = requests.get(url, headers=headers, params=params if page == 1 else None)
if response.status_code != 200:
raise RuntimeError(f"Failed to get users: {response.status_code} - {response.text}")
data = response.json()
batch = data.get("value", [])
all_users.extend(batch)
self.log(f" Retrieved page {page}: {len(batch)} users")
url = data.get("@odata.nextLink")
page += 1
# Stop early if we have enough users for the mailbox limit
if max_mailboxes and len(all_users) >= max_mailboxes * 2: # Get extra to account for filtering
break
self.log(f" Total users retrieved: {len(all_users)}")
# Filter to only users with mailboxes
mailboxes = [u for u in all_users if u.get("mail")]
# Apply max_mailboxes limit after filtering
if max_mailboxes and len(mailboxes) > max_mailboxes:
mailboxes = mailboxes[:max_mailboxes]
self.log(f"✓ Found {len(mailboxes)} mailboxes{' (limited for testing)' if max_mailboxes else ''}")
return mailboxes
def search_mailbox(
self,
user_email: str,
vendor_keywords: List[str],
max_emails: Optional[int] = None,
days_back: Optional[int] = None
) -> List[Dict]:
"""
Search a specific mailbox for emails containing vendor keywords
Uses adaptive search strategy for high-volume results:
- Tests result count first
- If > 300 matches, tries individual keywords
- If still too many, limits to 200 emails max
Args:
user_email: Email address of the mailbox to search
vendor_keywords: List of vendor name keywords to search for
max_emails: Maximum emails to retrieve per mailbox (None = no limit, retrieve all matching)
days_back: Number of days to search back (None = no limit, search all emails)
"""
if not self.access_token:
raise RuntimeError("Not authenticated")
headers = {
"Authorization": f"Bearer {self.access_token}",
"Accept": "application/json",
"ConsistencyLevel": "eventual" # Required for $search queries
}
# Calculate date filter if specified
cutoff_date = datetime.now() - timedelta(days=days_back) if days_back else None
# Build search query - using KQL (Keyword Query Language)
# For multiple keywords, use OR to find any of them
# Wrap individual keywords in quotes for phrase matching
if len(vendor_keywords) == 1:
keyword_query = f'"{vendor_keywords[0]}"'
else:
keyword_query = " OR ".join([f'"{kw}"' for kw in vendor_keywords])
self.log(f" Search query: {keyword_query}")
# ADAPTIVE SEARCH STRATEGY:
# 1. First, do a test search to check result count
# 2. If > 300 results, limit to 200 emails per mailbox
# Note: Individual keyword testing disabled due to Graph API limitation
# (single-keyword searches don't return @odata.count)
test_url = f"https://graph.microsoft.com/v1.0/users/{user_email}/messages"
test_params = {
"$search": keyword_query,
"$top": "1",
"$count": "true"
}
try:
test_response = requests.get(test_url, headers=headers, params=test_params)
if test_response.status_code == 200:
test_data = test_response.json()
total_count = test_data.get('@odata.count', 0)
if total_count > 300:
self.log(f" ⚠️ High volume: {total_count} potential matches")
self.log(f" ⚠️ Limiting to 50 emails per mailbox")
max_emails = 50
except Exception as e:
self.log(f" Warning: Could not get result count: {str(e)}")
# Search both received and sent emails
# We'll search all messages (includes inbox, sent items, etc.)
url = f"https://graph.microsoft.com/v1.0/users/{user_email}/messages"
params = {
"$search": keyword_query, # Already includes quotes for each keyword
"$select": "id,subject,from,toRecipients,ccRecipients,receivedDateTime,sentDateTime,bodyPreview,body,sender,isDraft",
"$top": "50" if max_emails is None else str(min(max_emails, 50)), # Graph API max is 50 per page
"$count": "true" # Required with ConsistencyLevel header
}
all_emails = []
page = 1
while url and (max_emails is None or len(all_emails) < max_emails):
try:
if page == 1:
response = requests.get(url, headers=headers, params=params)
else:
response = requests.get(url, headers=headers)
if response.status_code == 429: # Rate limit
retry_after = int(response.headers.get("Retry-After", 60))
self.log(f" Rate limited, waiting {retry_after}s...")
time.sleep(retry_after)
continue
if response.status_code != 200:
self.log(f" Warning: Failed to search {user_email}: {response.status_code}")
break
data = response.json()
batch = data.get("value", [])
# Filter results
for msg in batch:
# Skip drafts
if msg.get('isDraft', False):
continue
# Apply date filter if specified
if cutoff_date:
date_str = msg.get('receivedDateTime') or msg.get('sentDateTime', '')
if date_str:
try:
msg_date = datetime.fromisoformat(date_str.replace('Z', '+00:00'))
if msg_date.replace(tzinfo=None) < cutoff_date:
continue
except:
pass # Include if we can't parse date
all_emails.append(msg)
url = data.get("@odata.nextLink")
page += 1
# Respect rate limits
time.sleep(0.1)
except Exception as e:
self.log(f" Error searching {user_email}: {str(e)}")
break
return all_emails if max_emails is None else all_emails[:max_emails]
def extract_emails_from_text(self, text: str) -> Set[str]:
"""Extract email addresses from text using regex"""
if not text:
return set()
# Email regex pattern
email_pattern = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'
emails = set(re.findall(email_pattern, text))
# Filter out internal emails and common noreply patterns
noreply_patterns = [
'noreply', 'no-reply', 'no_reply', 'donotreply', 'do-not-reply',
'mailer-daemon', 'postmaster', 'daemon', 'automated', 'notifications',
'alerts', 'bounce'
]
filtered_emails = set()
for email in emails:
email_lower = email.lower()
# Skip internal emails
if email_lower.endswith(f"@{self.domain.lower()}"):
continue
# Skip noreply-type addresses
local_part = email_lower.split('@')[0]
if any(pattern in local_part for pattern in noreply_patterns):
continue
filtered_emails.add(email)
return filtered_emails
def _is_valid_vendor_email(self, email: str) -> bool:
"""Check if email passes basic validation"""
return '@' in email and not email.lower().endswith(f"@{self.domain.lower()}")
def _vendor_name_in_email(self, email_address: str, vendor_name: str) -> float:
"""
Check if vendor name appears in email DOMAIN with fuzzy matching
Returns similarity score (0.0 to 1.0)
Focus: Domain matching is primary indicator of vendor ownership
"""
email_lower = email_address.lower()
vendor_lower = vendor_name.lower()
# Extract domain (primary focus for vendor matching)
if '@' not in email_address:
return 0.0
username, full_domain = email_address.split('@', 1)
domain_parts = full_domain.split('.')
main_domain = domain_parts[0] if domain_parts else ""
if not main_domain:
return 0.0
# Tokenize vendor name (split on spaces, hyphens, etc.)
vendor_tokens = re.split(r'[\s\-_\.]+', vendor_lower)
vendor_tokens = [t for t in vendor_tokens if len(t) > 2] # Ignore short tokens like "AG", "Co"
if not vendor_tokens:
return 0.0
# PRIORITY 1: Exact token match in domain (100% confidence)
for token in vendor_tokens:
if token in main_domain:
return 1.0
# PRIORITY 2: Fuzzy match on full vendor name to domain
vendor_cleaned = vendor_lower.replace(' ', '').replace('-', '').replace('_', '')
domain_cleaned = main_domain.replace('-', '').replace('_', '')
full_similarity = SequenceMatcher(None, vendor_cleaned, domain_cleaned).ratio()
# PRIORITY 3: Fuzzy match on individual vendor tokens to domain
max_token_similarity = 0.0
for token in vendor_tokens:
token_similarity = SequenceMatcher(None, token, domain_cleaned).ratio()
max_token_similarity = max(max_token_similarity, token_similarity)
# Return the best match (prioritize full name match)
return max(full_similarity, max_token_similarity)
def analyze_email_with_llm(
self,
email_data: Dict,
vendor_name: str
) -> Dict[str, any]:
"""
Use LLM to extract relevant vendor email addresses from email content
Returns:
{
'vendor_emails': [list of extracted emails],
'confidence': 'high'/'medium'/'low',
'reasoning': 'explanation'
}
"""
# Extract basic email addresses from headers
header_emails = set()
# From address
from_addr = email_data.get("from", {})
if isinstance(from_addr, dict):
email_addr = from_addr.get("emailAddress", {}).get("address")
if email_addr:
header_emails.add(email_addr)
# To recipients
for recipient in email_data.get("toRecipients", []):
email_addr = recipient.get("emailAddress", {}).get("address")
if email_addr and not email_addr.lower().endswith(f"@{self.domain.lower()}"):
header_emails.add(email_addr)
# CC recipients
for recipient in email_data.get("ccRecipients", []):
email_addr = recipient.get("emailAddress", {}).get("address")
if email_addr and not email_addr.lower().endswith(f"@{self.domain.lower()}"):
header_emails.add(email_addr)
# Extract from body
body = email_data.get("body", {})
body_content = body.get("content", "") if isinstance(body, dict) else ""
body_emails = self.extract_emails_from_text(body_content)
# Combine all found emails
all_emails = header_emails.union(body_emails)
if not all_emails:
return {
'vendor_emails': [],
'confidence': 'low',
'reasoning': 'No external email addresses found'
}
# Use LLM to determine which emails are vendor-related
subject = email_data.get("subject", "")
body_preview = email_data.get("bodyPreview", "")
prompt = f"""You are analyzing an email to identify vendor contact email addresses for: {vendor_name}
Email Subject: {subject}
Email Preview: {body_preview[:500]}
Found email addresses: {', '.join(all_emails)}
Task:
1. Identify which email addresses likely belong to {vendor_name} (the vendor/supplier)
2. EXCLUDE the following:
- Any address containing: noreply, no-reply, no_reply, donotreply, do-not-reply
- Automated systems: notifications@, alerts@, automated@
- Internal addresses: @vicebio.com
- Generic mailers: mailer@, daemon@
3. PRIORITIZE: general contact emails (info@, contact@, sales@, support@) over personal emails
4. INCLUDE: Actual business contact emails from the vendor's domain
Return a JSON object:
{{
"vendor_emails": ["email1@domain.com", "email2@domain.com"],
"confidence": "high/medium/low",
"reasoning": "brief explanation"
}}
If no vendor emails found, return empty vendor_emails array."""
try:
response = self.openai_client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": "You are an expert at analyzing business emails and extracting vendor contact information. Always respond with valid JSON."},
{"role": "user", "content": prompt}
],
temperature=0,
response_format={"type": "json_object"}
)
result = json.loads(response.choices[0].message.content)
# Post-validation: Verify DOMAIN-based vendor name matching
validated_emails = []
email_scores = {}
for email_addr in result.get('vendor_emails', []):
similarity = self._vendor_name_in_email(email_addr, vendor_name)
email_scores[email_addr] = similarity
# Domain-based matching threshold: 70% fuzzy match required
if similarity >= 0.70:
validated_emails.append(email_addr)
self.log(f" ✓ Validated '{email_addr}' - domain match score: {similarity:.2f}")
else:
self.log(f" ⚠ Filtered out '{email_addr}' - weak domain match (score: {similarity:.2f}, required: 0.70)")
result['vendor_emails'] = validated_emails
# Adjust confidence based on domain matching strength
if validated_emails and result.get('confidence') == 'high':
# For HIGH confidence, require at least one email with 75%+ domain match
max_similarity = max(email_scores.get(e, 0.0) for e in validated_emails)
if max_similarity < 0.75:
result['confidence'] = 'medium'
result['reasoning'] += f" (Downgraded: max domain match {max_similarity:.2f} < 0.75 required for high confidence)"
return result
except Exception as e:
self.log(f" LLM analysis error: {str(e)}")
# Fallback: return all non-internal emails
return {
'vendor_emails': list(all_emails),
'confidence': 'low',
'reasoning': f'LLM analysis failed: {str(e)}'
}
def _create_search_keywords(self, vendor_name: str) -> List[str]:
"""
Create search-friendly keywords from vendor name
- Remove special characters
- Remove legal suffixes (B.V., Ltd, Inc, etc.)
- Split multi-word names
"""
import unicodedata
import re
# Fix already-corrupted UTF-8 encoding if present
try:
# Try to detect and fix mojibake (corrupted UTF-8)
if 'Ã' in vendor_name or 'â' in vendor_name:
# Common replacements for corrupted characters
vendor_name = vendor_name.replace('ë', 'e').replace('ä', 'a').replace('ö', 'o')
vendor_name = vendor_name.replace('é', 'e').replace('è', 'e').replace('à ', 'a')
vendor_name = vendor_name.replace('ñ', 'n').replace('ç', 'c')
except:
pass
# Normalize unicode characters properly (ë -> e, ñ -> n, etc.)
# Use NFD (decompose) then filter out combining characters
normalized = unicodedata.normalize('NFD', vendor_name)
ascii_name = ''.join(char for char in normalized if unicodedata.category(char) != 'Mn')
# Remove any remaining non-ASCII characters
ascii_name = re.sub(r'[^\x00-\x7F]+', '', ascii_name)
# Remove common legal suffixes
legal_suffixes = [
'B.V.', 'BV', 'B V', 'N.V.', 'NV', 'N V', 'Ltd', 'Ltd.', 'Limited',
'Inc', 'Inc.', 'Incorporated', 'Corp', 'Corp.', 'Corporation',
'GmbH', 'AG', 'SA', 'S.A.', 'SAS', 'S.A.S.', 'LLC', 'L.L.C.',
'PLC', 'LLP', 'BVBA', 'SRL', 'Srl', 'SpA', 'S.p.A.'
]
clean_name = ascii_name
for suffix in legal_suffixes:
# Remove suffix with various separators (case insensitive)
pattern = re.compile(re.escape(suffix), re.IGNORECASE)
clean_name = pattern.sub('', clean_name)
# Remove trailing/leading spaces and punctuation
clean_name = re.sub(r'[,.\-;]+$', '', clean_name) # Remove trailing punctuation
clean_name = re.sub(r'^[,.\-;]+', '', clean_name) # Remove leading punctuation
clean_name = clean_name.strip()
# For simplicity and speed: just use the main company name without country words
# Skip country/region names
country_words = ['belgie', 'belgium', 'belgique', 'france', 'germany', 'deutschland',
'nederland', 'netherlands', 'italia', 'italy', 'espana', 'spain',
'schweiz', 'suisse', 'austria', 'osterreich', 'polska', 'poland']
words = clean_name.split()
# Keep only words that are not countries and are substantial (>3 chars)
keywords = [word for word in words if len(word) > 3 and word.lower() not in country_words]
# If we filtered everything out, use the cleaned name
if not keywords:
keywords = [clean_name] if len(clean_name) > 3 else [vendor_name]
# Remove duplicates while preserving order
seen = set()
unique_keywords = []
for kw in keywords:
if kw.lower() not in seen:
seen.add(kw.lower())
unique_keywords.append(kw)
return unique_keywords[:2] # Limit to max 2 keywords for speed
def extract_for_vendor(
self,
vendor_name: str,
vendor_keywords: Optional[List[str]] = None,
max_mailboxes: Optional[int] = None,
max_emails_per_mailbox: Optional[int] = None,
days_back: Optional[int] = None
) -> pd.DataFrame:
"""
Extract vendor emails from all mailboxes
Args:
vendor_name: Name of the vendor
vendor_keywords: Keywords to search (defaults to vendor_name)
max_mailboxes: Limit number of mailboxes to search (None = all mailboxes)
max_emails_per_mailbox: Max emails to retrieve per mailbox (None = all matching emails)
days_back: Days to search back (None = all emails)
"""
if vendor_keywords is None:
vendor_keywords = self._create_search_keywords(vendor_name)
self.log(f"\n{'='*60}")
self.log(f"Extracting emails for vendor: {vendor_name}")
self.log(f"Search keywords: {', '.join(vendor_keywords)}")
self.log(f"Search limits: max_mailboxes={max_mailboxes}, max_emails_per_mailbox={max_emails_per_mailbox}, days_back={days_back}")
self.log(f"{'='*60}\n")
# Authenticate
if not self.access_token:
self.authenticate()
# Get mailboxes
all_mailboxes = self.get_mailboxes(max_mailboxes=max_mailboxes)
# Priority mailboxes for vendor-related emails (to reduce false positives)
priority_emails = [
'vendor_invoicing@vicebio.com',
'daniel@vicebio.com',
'emmanuel@vicebio.com',
'sandra@vicebio.com',
'elisabeth@vicebio.com',
'wim@vicebio.com',
'vincent@vicebio.com',
'jean@vicebio.com',
'koen@vicebio.com'
]
# Split mailboxes into priority and others
priority_mailboxes = []
other_mailboxes = []
for mailbox in all_mailboxes:
email = mailbox.get('mail', '').lower()
if email in priority_emails:
priority_mailboxes.append(mailbox)
else:
other_mailboxes.append(mailbox)
# Start with priority mailboxes
self.log(f"Strategy: Search {len(priority_mailboxes)} priority mailboxes first")
self.log(f"Priority mailboxes: {', '.join([m.get('mail') for m in priority_mailboxes])}")
mailboxes = priority_mailboxes
search_extended = False
# Check if vendor name matches any employee names (to avoid false positives)
vendor_name_lower = vendor_name.lower().strip()
vendor_name_cleaned = vendor_name_lower.replace(' ', '').replace('-', '').replace('_', '').replace('.', '')
# Check if vendor name fuzzy matches any employee PERSON name (not functional mailboxes)
from difflib import SequenceMatcher
for mailbox in mailboxes:
display_name = mailbox.get('displayName', '').lower().strip()
email = mailbox.get('mail', '').lower().strip()
if not display_name or len(display_name) < 3:
continue
# Skip checking functional/shared mailboxes (they're not person names)
# BUT keep vendor-related mailboxes like vendor_invoicing, info, etc.
# Look for patterns like: "room", "labo", "meeting" etc. but NOT vendor/info/invoicing
functional_patterns = ['room', 'labo', 'lab', 'meeting', 'filecloud', 'shared', 'admin']
if any(pattern in display_name for pattern in functional_patterns):
continue
# Only check if display_name looks like a person name (contains space = first + last name)
if ' ' not in display_name:
continue
# Clean name for comparison
employee_cleaned = display_name.replace(' ', '').replace('-', '').replace('_', '').replace('.', '')
# Calculate similarity
similarity = SequenceMatcher(None, vendor_name_cleaned, employee_cleaned).ratio()
# Also check if vendor name is mostly contained in employee name
# (e.g., "Morgan" in "Morgan Bodson")
if len(vendor_name_cleaned) > 4: # Only for names longer than 4 chars
if vendor_name_cleaned in employee_cleaned:
overlap = len(vendor_name_cleaned) / len(employee_cleaned)
if overlap > 0.6: # Vendor name is 60%+ of employee name
similarity = max(similarity, 0.85) # Boost similarity
# Skip if similarity is very high (85%+) - stricter threshold
if similarity >= 0.85:
self.log(f"⚠️ WARNING: Vendor name '{vendor_name}' fuzzy matches employee '{display_name}' (similarity: {similarity:.2f})")
self.log(f"⚠️ Skipping this vendor to avoid false positives from internal emails")
self.log(f"{'='*60}\n")
# Return empty result
return pd.DataFrame([{
'Vendor': vendor_name,
'Retained Emails': f'[SKIPPED - matches employee: {display_name}]',
'Source Mailboxes': ''
}])
# Results storage
results = []
vendor_emails_found = {} # Dict: email -> set of source mailboxes
# Process each mailbox
total_mailboxes = len(mailboxes)
total_emails_found = 0
high_confidence_count = 0
for idx, mailbox in enumerate(mailboxes, 1):
user_email = mailbox.get("mail")
display_name = mailbox.get("displayName", "Unknown")
self.log(f"\n[{idx}/{total_mailboxes}] Searching: {display_name} ({user_email})")
# Search mailbox
emails = self.search_mailbox(
user_email,
vendor_keywords,
max_emails=max_emails_per_mailbox,
days_back=days_back
)
total_emails_found += len(emails)
self.log(f" Found {len(emails)} matching emails (Total so far: {total_emails_found} emails)")
# Analyze each email with LLM
emails_analyzed = 0
emails_with_results = 0
for email_idx, email in enumerate(emails, 1):
emails_analyzed += 1
# Show progress for large batches
if len(emails) > 5 and email_idx % 5 == 0:
self.log(f" Analyzing emails: {email_idx}/{len(emails)}...")
analysis = self.analyze_email_with_llm(email, vendor_name)
if analysis['vendor_emails']:
emails_with_results += 1
# Only store HIGH confidence results
if analysis['confidence'] == 'high':
high_confidence_count += 1
for vendor_email in analysis['vendor_emails']:
if vendor_email not in vendor_emails_found:
vendor_emails_found[vendor_email] = set()
vendor_emails_found[vendor_email].add(user_email)
self.log(f" ✓ Email {email_idx}: Extracted {len(analysis['vendor_emails'])} HIGH confidence address(es) - {', '.join(analysis['vendor_emails'])}")
else:
self.log(f" • Email {email_idx}: Skipped {len(analysis['vendor_emails'])} address(es) - confidence too low ({analysis['confidence']})")
# Summary for this mailbox
if emails_with_results > 0:
self.log(f" ✓ Mailbox summary: {emails_with_results} emails analyzed")
else:
self.log(f" • No high-confidence vendor emails found in this mailbox")
# Rate limiting between mailboxes
if idx < total_mailboxes:
time.sleep(1)
# Check if we should extend search to all mailboxes
if not search_extended and len(vendor_emails_found) == 0 and len(other_mailboxes) > 0:
self.log(f"\n{'='*60}")
self.log(f"⚠️ No HIGH confidence emails found in priority mailboxes")
self.log(f"Extending search to all {len(other_mailboxes)} remaining mailboxes...")
self.log(f"{'='*60}\n")
search_extended = True
mailboxes = other_mailboxes
# Process remaining mailboxes
for idx, mailbox in enumerate(mailboxes, len(priority_mailboxes) + 1):
user_email = mailbox.get("mail")
display_name = mailbox.get("displayName", "Unknown")
self.log(f"\n[{idx}/{len(priority_mailboxes) + len(other_mailboxes)}] Searching: {display_name} ({user_email})")
# Search mailbox
emails = self.search_mailbox(
user_email,
vendor_keywords,
max_emails=max_emails_per_mailbox,
days_back=days_back
)
total_emails_found += len(emails)
self.log(f" Found {len(emails)} matching emails (Total so far: {total_emails_found} emails)")
# Analyze each email with LLM
emails_analyzed = 0
emails_with_results = 0
for email_idx, email in enumerate(emails, 1):
emails_analyzed += 1
# Show progress for large batches
if len(emails) > 5 and email_idx % 5 == 0:
self.log(f" Analyzing emails: {email_idx}/{len(emails)}...")
analysis = self.analyze_email_with_llm(email, vendor_name)
if analysis['vendor_emails']:
emails_with_results += 1
# Only store HIGH confidence results
if analysis['confidence'] == 'high':
high_confidence_count += 1
for vendor_email in analysis['vendor_emails']:
if vendor_email not in vendor_emails_found:
vendor_emails_found[vendor_email] = set()
vendor_emails_found[vendor_email].add(user_email)
self.log(f" ✓ Email {email_idx}: Extracted {len(analysis['vendor_emails'])} HIGH confidence address(es) - {', '.join(analysis['vendor_emails'])}")
else:
self.log(f" • Email {email_idx}: Skipped {len(analysis['vendor_emails'])} address(es) - confidence too low ({analysis['confidence']})")
# Summary for this mailbox
if emails_with_results > 0:
self.log(f" ✓ Mailbox summary: {emails_with_results} emails analyzed")
else:
self.log(f" • No high-confidence vendor emails found in this mailbox")
# Rate limiting between mailboxes
if idx < len(priority_mailboxes) + len(other_mailboxes):
time.sleep(1)
# Stop extended search if we found high-confidence emails
if len(vendor_emails_found) > 0:
self.log(f"\n✓ Found HIGH confidence emails, stopping extended search")
break
# Create DataFrame with 3-column format
if vendor_emails_found:
# Emails are already deduplicated in the dict keys
# Prepare data: one row per vendor
unique_emails = sorted(vendor_emails_found.keys()) # Already deduplicated
vendor_emails_list = ', '.join(unique_emails)
# Collect all source mailboxes (deduplicated)
source_mailboxes_set = set()
for mailboxes in vendor_emails_found.values():
source_mailboxes_set.update(mailboxes)
source_mailboxes_list = ', '.join(sorted(source_mailboxes_set))
df = pd.DataFrame([{
'Vendor': vendor_name,
'Retained Emails': vendor_emails_list,
'Source Mailboxes': source_mailboxes_list
}])
else:
# No results: return empty row with vendor name
df = pd.DataFrame([{
'Vendor': vendor_name,
'Retained Emails': '',
'Source Mailboxes': ''
}])
self.log(f"\n{'='*60}")
self.log(f"Extraction Complete for {vendor_name}")
self.log(f"{'='*60}")
self.log(f"Total mailboxes searched: {total_mailboxes}")
self.log(f"Total emails analyzed: {total_emails_found}")
self.log(f"High-confidence emails found: {high_confidence_count}")
self.log(f"Unique vendor email addresses (HIGH confidence only): {len(vendor_emails_found)}")
if vendor_emails_found:
self.log(f"Vendor emails:")
for email, mailboxes in sorted(vendor_emails_found.items()):
self.log(f" • {email} (found in {len(mailboxes)} mailbox(es): {', '.join(sorted(mailboxes))})")
self.log(f"{'='*60}\n")
return df
def extract_for_vendor_list(
self,
vendor_list: List[str],
max_mailboxes: Optional[int] = None,
max_emails_per_mailbox: Optional[int] = None,
days_back: Optional[int] = None,
resume: bool = True
) -> pd.DataFrame:
"""
Extract emails for multiple vendors
Args:
vendor_list: List of vendor names
resume: If True, skip vendors already processed
"""
self.log(f"\nStarting batch extraction for {len(vendor_list)} vendors")
all_results = []
processed_vendors = set()
# Load progress if resuming
if resume and self.progress_file.exists():
with open(self.progress_file, 'r') as f:
progress = json.load(f)
processed_vendors = set(progress.get('processed_vendors', []))
self.log(f"Resuming: {len(processed_vendors)} vendors already processed")
for idx, vendor in enumerate(vendor_list, 1):
if vendor in processed_vendors:
self.log(f"[{idx}/{len(vendor_list)}] Skipping {vendor} (already processed)")
continue
try:
self.log(f"\n[{idx}/{len(vendor_list)}] Processing: {vendor}")
df = self.extract_for_vendor(
vendor,
max_mailboxes=max_mailboxes,
max_emails_per_mailbox=max_emails_per_mailbox,
days_back=days_back
)
if not df.empty:
all_results.append(df)
# Save progress
processed_vendors.add(vendor)
with open(self.progress_file, 'w') as f:
json.dump({'processed_vendors': list(processed_vendors)}, f)
# Save intermediate results
if all_results:
combined_df = pd.concat(all_results, ignore_index=True)
combined_df.to_excel(self.results_file, index=False)
self.log(f"✓ Intermediate results saved: {self.results_file}")
except Exception as e:
self.log(f"ERROR processing {vendor}: {str(e)}")
continue
# Final results (already in 3-column format)
if all_results:
final_df = pd.concat(all_results, ignore_index=True)
final_df.to_excel(self.results_file, index=False)
self.log(f"\n{'='*60}")
self.log(f"BATCH EXTRACTION COMPLETE")
self.log(f"Results saved: {self.results_file}")
self.log(f"Total vendors processed: {len(final_df)}")
self.log(f"Vendors with emails: {len(final_df[final_df['Retained Emails'].str.len() > 0])}")
self.log(f"{'='*60}\n")
return final_df
else:
self.log("No results found")
return pd.DataFrame()
Parameters
| Name | Type | Default | Kind |
|---|---|---|---|
bases |
- | - |
Parameter Details
bases: Parameter of type
Return Value
Returns unspecified type
Class Interface
Methods
__init__(self, tenant_id, client_id, client_secret, openai_api_key, domain, output_dir)
Purpose: Internal method: init
Parameters:
tenant_id: Type: strclient_id: Type: strclient_secret: Type: stropenai_api_key: Type: strdomain: Type: stroutput_dir: Type: str
Returns: None
log(self, message)
Purpose: Log message to console and file
Parameters:
message: Type: str
Returns: None
authenticate(self) -> str
Purpose: Authenticate using client credentials (application permissions)
Returns: Returns str
get_mailboxes(self, max_mailboxes) -> List[Dict]
Purpose: Get list of all mailboxes in the organization
Parameters:
max_mailboxes: Type: Optional[int]
Returns: Returns List[Dict]
search_mailbox(self, user_email, vendor_keywords, max_emails, days_back) -> List[Dict]
Purpose: Search a specific mailbox for emails containing vendor keywords Uses adaptive search strategy for high-volume results: - Tests result count first - If > 300 matches, tries individual keywords - If still too many, limits to 200 emails max Args: user_email: Email address of the mailbox to search vendor_keywords: List of vendor name keywords to search for max_emails: Maximum emails to retrieve per mailbox (None = no limit, retrieve all matching) days_back: Number of days to search back (None = no limit, search all emails)
Parameters:
user_email: Type: strvendor_keywords: Type: List[str]max_emails: Type: Optional[int]days_back: Type: Optional[int]
Returns: Returns List[Dict]
extract_emails_from_text(self, text) -> Set[str]
Purpose: Extract email addresses from text using regex
Parameters:
text: Type: str
Returns: Returns Set[str]
_is_valid_vendor_email(self, email) -> bool
Purpose: Check if email passes basic validation
Parameters:
email: Type: str
Returns: Returns bool
_vendor_name_in_email(self, email_address, vendor_name) -> float
Purpose: Check if vendor name appears in email DOMAIN with fuzzy matching Returns similarity score (0.0 to 1.0) Focus: Domain matching is primary indicator of vendor ownership
Parameters:
email_address: Type: strvendor_name: Type: str
Returns: Returns float
analyze_email_with_llm(self, email_data, vendor_name) -> Dict[str, any]
Purpose: Use LLM to extract relevant vendor email addresses from email content Returns: { 'vendor_emails': [list of extracted emails], 'confidence': 'high'/'medium'/'low', 'reasoning': 'explanation' }
Parameters:
email_data: Type: Dictvendor_name: Type: str
Returns: Returns Dict[str, any]
_create_search_keywords(self, vendor_name) -> List[str]
Purpose: Create search-friendly keywords from vendor name - Remove special characters - Remove legal suffixes (B.V., Ltd, Inc, etc.) - Split multi-word names
Parameters:
vendor_name: Type: str
Returns: Returns List[str]
extract_for_vendor(self, vendor_name, vendor_keywords, max_mailboxes, max_emails_per_mailbox, days_back) -> pd.DataFrame
Purpose: Extract vendor emails from all mailboxes Args: vendor_name: Name of the vendor vendor_keywords: Keywords to search (defaults to vendor_name) max_mailboxes: Limit number of mailboxes to search (None = all mailboxes) max_emails_per_mailbox: Max emails to retrieve per mailbox (None = all matching emails) days_back: Days to search back (None = all emails)
Parameters:
vendor_name: Type: strvendor_keywords: Type: Optional[List[str]]max_mailboxes: Type: Optional[int]max_emails_per_mailbox: Type: Optional[int]days_back: Type: Optional[int]
Returns: Returns pd.DataFrame
extract_for_vendor_list(self, vendor_list, max_mailboxes, max_emails_per_mailbox, days_back, resume) -> pd.DataFrame
Purpose: Extract emails for multiple vendors Args: vendor_list: List of vendor names resume: If True, skip vendors already processed
Parameters:
vendor_list: Type: List[str]max_mailboxes: Type: Optional[int]max_emails_per_mailbox: Type: Optional[int]days_back: Type: Optional[int]resume: Type: bool
Returns: Returns pd.DataFrame
Required Imports
import os
import json
import time
import msal
import requests
Usage Example
# Example usage:
# result = VendorEmailExtractor(bases)
Similar Components
AI-powered semantic similarity - components with related functionality:
-
function main_v26 70.6% similar
-
function extract_batch 62.2% similar
-
function test_email_search 61.0% similar
-
function main_v27 59.7% similar
-
class VendorEnricher 54.1% similar