🔍 Code Extractor

class VendorEmailExtractor

Maturity: 26

Extract vendor email addresses from all organizational mailboxes

File:
/tf/active/vicechatdev/find_email/vendor_email_extractor.py
Lines:
29 - 945
Complexity:
moderate

Purpose

Extract vendor email addresses from all organizational mailboxes

Source Code

class VendorEmailExtractor:
    """Extract vendor email addresses from all organizational mailboxes"""
    
    def __init__(
        self,
        tenant_id: str,
        client_id: str,
        client_secret: str,
        openai_api_key: str,
        domain: str = "vicebio.com",
        output_dir: str = "./vendor_emails_output"
    ):
        self.tenant_id = tenant_id
        self.client_id = client_id
        self.client_secret = client_secret
        self.domain = domain
        self.output_dir = Path(output_dir)
        self.output_dir.mkdir(exist_ok=True)
        
        # OpenAI client
        self.openai_client = OpenAI(api_key=openai_api_key)
        
        # Authentication
        self.access_token: Optional[str] = None
        self.authority = f"https://login.microsoftonline.com/{tenant_id}"
        
        # Progress tracking
        self.progress_file = self.output_dir / "extraction_progress.json"
        self.results_file = self.output_dir / f"vendor_emails_{datetime.now().strftime('%Y%m%d_%H%M%S')}.xlsx"
        self.log_file = self.output_dir / f"extraction_log_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt"
        
    def log(self, message: str):
        """Log message to console and file"""
        timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        log_msg = f"[{timestamp}] {message}"
        print(log_msg)
        with open(self.log_file, 'a', encoding='utf-8') as f:
            f.write(log_msg + "\n")
    
    def authenticate(self) -> str:
        """Authenticate using client credentials (application permissions)"""
        self.log("Authenticating with Azure AD...")
        
        app = msal.ConfidentialClientApplication(
            client_id=self.client_id,
            client_credential=self.client_secret,
            authority=self.authority
        )
        
        # Request token with application permissions scope
        result = app.acquire_token_for_client(
            scopes=["https://graph.microsoft.com/.default"]
        )
        
        if "access_token" not in result:
            error = result.get("error_description", "Unknown error")
            raise RuntimeError(f"Authentication failed: {error}")
        
        self.access_token = result["access_token"]
        self.log("✓ Authentication successful")
        return self.access_token
    
    def get_mailboxes(self, max_mailboxes: Optional[int] = None) -> List[Dict]:
        """Get list of all mailboxes in the organization"""
        if not self.access_token:
            self.authenticate()
        
        self.log(f"Retrieving mailboxes for domain {self.domain}...")
        
        headers = {
            "Authorization": f"Bearer {self.access_token}",
            "Accept": "application/json",
            "ConsistencyLevel": "eventual"  # Required for advanced queries
        }
        
        # Test token and re-authenticate if expired
        test_response = requests.get(
            "https://graph.microsoft.com/v1.0/users",
            headers=headers,
            params={"$count": "true", "$top": "1"}
        )
        if test_response.status_code == 401:
            self.log("  Token expired, re-authenticating...")
            self.authenticate()
            headers["Authorization"] = f"Bearer {self.access_token}"
        
        # Get users with email addresses in the domain
        url = "https://graph.microsoft.com/v1.0/users"
        params = {
            "$filter": f"endswith(mail,'{self.domain}') or endswith(userPrincipalName,'{self.domain}')",
            "$select": "id,userPrincipalName,mail,displayName",
            "$top": "999",
            "$count": "true"  # Required with ConsistencyLevel header
        }
        
        all_users = []
        page = 1
        
        while url:
            response = requests.get(url, headers=headers, params=params if page == 1 else None)
            
            if response.status_code != 200:
                raise RuntimeError(f"Failed to get users: {response.status_code} - {response.text}")
            
            data = response.json()
            batch = data.get("value", [])
            all_users.extend(batch)
            
            self.log(f"  Retrieved page {page}: {len(batch)} users")
            
            url = data.get("@odata.nextLink")
            page += 1
            
            # Stop early if we have enough users for the mailbox limit
            if max_mailboxes and len(all_users) >= max_mailboxes * 2:  # Get extra to account for filtering
                break
        
        self.log(f"  Total users retrieved: {len(all_users)}")
        
        # Filter to only users with mailboxes
        mailboxes = [u for u in all_users if u.get("mail")]
        
        # Apply max_mailboxes limit after filtering
        if max_mailboxes and len(mailboxes) > max_mailboxes:
            mailboxes = mailboxes[:max_mailboxes]
        
        self.log(f"✓ Found {len(mailboxes)} mailboxes{' (limited for testing)' if max_mailboxes else ''}")
        
        return mailboxes
    
    def search_mailbox(
        self,
        user_email: str,
        vendor_keywords: List[str],
        max_emails: Optional[int] = None,
        days_back: Optional[int] = None
    ) -> List[Dict]:
        """
        Search a specific mailbox for emails containing vendor keywords
        Uses adaptive search strategy for high-volume results:
        - Tests result count first
        - If > 300 matches, tries individual keywords
        - If still too many, limits to 200 emails max
        
        Args:
            user_email: Email address of the mailbox to search
            vendor_keywords: List of vendor name keywords to search for
            max_emails: Maximum emails to retrieve per mailbox (None = no limit, retrieve all matching)
            days_back: Number of days to search back (None = no limit, search all emails)
        """
        if not self.access_token:
            raise RuntimeError("Not authenticated")
        
        headers = {
            "Authorization": f"Bearer {self.access_token}",
            "Accept": "application/json",
            "ConsistencyLevel": "eventual"  # Required for $search queries
        }
        
        # Calculate date filter if specified
        cutoff_date = datetime.now() - timedelta(days=days_back) if days_back else None
        
        # Build search query - using KQL (Keyword Query Language)
        # For multiple keywords, use OR to find any of them
        # Wrap individual keywords in quotes for phrase matching
        if len(vendor_keywords) == 1:
            keyword_query = f'"{vendor_keywords[0]}"'
        else:
            keyword_query = " OR ".join([f'"{kw}"' for kw in vendor_keywords])
        self.log(f"  Search query: {keyword_query}")
        
        # ADAPTIVE SEARCH STRATEGY:
        # 1. First, do a test search to check result count
        # 2. If > 300 results, limit to 200 emails per mailbox
        # Note: Individual keyword testing disabled due to Graph API limitation
        #       (single-keyword searches don't return @odata.count)
        
        test_url = f"https://graph.microsoft.com/v1.0/users/{user_email}/messages"
        test_params = {
            "$search": keyword_query,
            "$top": "1",
            "$count": "true"
        }
        
        try:
            test_response = requests.get(test_url, headers=headers, params=test_params)
            if test_response.status_code == 200:
                test_data = test_response.json()
                total_count = test_data.get('@odata.count', 0)
                
                if total_count > 300:
                    self.log(f"  ⚠️  High volume: {total_count} potential matches")
                    self.log(f"  ⚠️  Limiting to 50 emails per mailbox")
                    max_emails = 50
        except Exception as e:
            self.log(f"  Warning: Could not get result count: {str(e)}")
        
        # Search both received and sent emails
        # We'll search all messages (includes inbox, sent items, etc.)
        url = f"https://graph.microsoft.com/v1.0/users/{user_email}/messages"
        params = {
            "$search": keyword_query,  # Already includes quotes for each keyword
            "$select": "id,subject,from,toRecipients,ccRecipients,receivedDateTime,sentDateTime,bodyPreview,body,sender,isDraft",
            "$top": "50" if max_emails is None else str(min(max_emails, 50)),  # Graph API max is 50 per page
            "$count": "true"  # Required with ConsistencyLevel header
        }
        
        all_emails = []
        page = 1
        
        while url and (max_emails is None or len(all_emails) < max_emails):
            try:
                if page == 1:
                    response = requests.get(url, headers=headers, params=params)
                else:
                    response = requests.get(url, headers=headers)
                
                if response.status_code == 429:  # Rate limit
                    retry_after = int(response.headers.get("Retry-After", 60))
                    self.log(f"  Rate limited, waiting {retry_after}s...")
                    time.sleep(retry_after)
                    continue
                
                if response.status_code != 200:
                    self.log(f"  Warning: Failed to search {user_email}: {response.status_code}")
                    break
                
                data = response.json()
                batch = data.get("value", [])
                
                # Filter results
                for msg in batch:
                    # Skip drafts
                    if msg.get('isDraft', False):
                        continue
                    
                    # Apply date filter if specified
                    if cutoff_date:
                        date_str = msg.get('receivedDateTime') or msg.get('sentDateTime', '')
                        if date_str:
                            try:
                                msg_date = datetime.fromisoformat(date_str.replace('Z', '+00:00'))
                                if msg_date.replace(tzinfo=None) < cutoff_date:
                                    continue
                            except:
                                pass  # Include if we can't parse date
                    
                    all_emails.append(msg)
                
                url = data.get("@odata.nextLink")
                page += 1
                
                # Respect rate limits
                time.sleep(0.1)
                
            except Exception as e:
                self.log(f"  Error searching {user_email}: {str(e)}")
                break
        
        return all_emails if max_emails is None else all_emails[:max_emails]
    
    def extract_emails_from_text(self, text: str) -> Set[str]:
        """Extract email addresses from text using regex"""
        if not text:
            return set()
        
        # Email regex pattern
        email_pattern = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'
        emails = set(re.findall(email_pattern, text))
        
        # Filter out internal emails and common noreply patterns
        noreply_patterns = [
            'noreply', 'no-reply', 'no_reply', 'donotreply', 'do-not-reply',
            'mailer-daemon', 'postmaster', 'daemon', 'automated', 'notifications',
            'alerts', 'bounce'
        ]
        
        filtered_emails = set()
        for email in emails:
            email_lower = email.lower()
            
            # Skip internal emails
            if email_lower.endswith(f"@{self.domain.lower()}"):
                continue
            
            # Skip noreply-type addresses
            local_part = email_lower.split('@')[0]
            if any(pattern in local_part for pattern in noreply_patterns):
                continue
            
            filtered_emails.add(email)
        
        return filtered_emails
    
    def _is_valid_vendor_email(self, email: str) -> bool:
        """Check if email passes basic validation"""
        return '@' in email and not email.lower().endswith(f"@{self.domain.lower()}")
    
    def _vendor_name_in_email(self, email_address: str, vendor_name: str) -> float:
        """
        Check if vendor name appears in email DOMAIN with fuzzy matching
        Returns similarity score (0.0 to 1.0)
        Focus: Domain matching is primary indicator of vendor ownership
        """
        email_lower = email_address.lower()
        vendor_lower = vendor_name.lower()
        
        # Extract domain (primary focus for vendor matching)
        if '@' not in email_address:
            return 0.0
        
        username, full_domain = email_address.split('@', 1)
        domain_parts = full_domain.split('.')
        main_domain = domain_parts[0] if domain_parts else ""
        
        if not main_domain:
            return 0.0
        
        # Tokenize vendor name (split on spaces, hyphens, etc.)
        vendor_tokens = re.split(r'[\s\-_\.]+', vendor_lower)
        vendor_tokens = [t for t in vendor_tokens if len(t) > 2]  # Ignore short tokens like "AG", "Co"
        
        if not vendor_tokens:
            return 0.0
        
        # PRIORITY 1: Exact token match in domain (100% confidence)
        for token in vendor_tokens:
            if token in main_domain:
                return 1.0
        
        # PRIORITY 2: Fuzzy match on full vendor name to domain
        vendor_cleaned = vendor_lower.replace(' ', '').replace('-', '').replace('_', '')
        domain_cleaned = main_domain.replace('-', '').replace('_', '')
        
        full_similarity = SequenceMatcher(None, vendor_cleaned, domain_cleaned).ratio()
        
        # PRIORITY 3: Fuzzy match on individual vendor tokens to domain
        max_token_similarity = 0.0
        for token in vendor_tokens:
            token_similarity = SequenceMatcher(None, token, domain_cleaned).ratio()
            max_token_similarity = max(max_token_similarity, token_similarity)
        
        # Return the best match (prioritize full name match)
        return max(full_similarity, max_token_similarity)
    
    def analyze_email_with_llm(
        self,
        email_data: Dict,
        vendor_name: str
    ) -> Dict[str, any]:
        """
        Use LLM to extract relevant vendor email addresses from email content
        
        Returns:
            {
                'vendor_emails': [list of extracted emails],
                'confidence': 'high'/'medium'/'low',
                'reasoning': 'explanation'
            }
        """
        # Extract basic email addresses from headers
        header_emails = set()
        
        # From address
        from_addr = email_data.get("from", {})
        if isinstance(from_addr, dict):
            email_addr = from_addr.get("emailAddress", {}).get("address")
            if email_addr:
                header_emails.add(email_addr)
        
        # To recipients
        for recipient in email_data.get("toRecipients", []):
            email_addr = recipient.get("emailAddress", {}).get("address")
            if email_addr and not email_addr.lower().endswith(f"@{self.domain.lower()}"):
                header_emails.add(email_addr)
        
        # CC recipients
        for recipient in email_data.get("ccRecipients", []):
            email_addr = recipient.get("emailAddress", {}).get("address")
            if email_addr and not email_addr.lower().endswith(f"@{self.domain.lower()}"):
                header_emails.add(email_addr)
        
        # Extract from body
        body = email_data.get("body", {})
        body_content = body.get("content", "") if isinstance(body, dict) else ""
        body_emails = self.extract_emails_from_text(body_content)
        
        # Combine all found emails
        all_emails = header_emails.union(body_emails)
        
        if not all_emails:
            return {
                'vendor_emails': [],
                'confidence': 'low',
                'reasoning': 'No external email addresses found'
            }
        
        # Use LLM to determine which emails are vendor-related
        subject = email_data.get("subject", "")
        body_preview = email_data.get("bodyPreview", "")
        
        prompt = f"""You are analyzing an email to identify vendor contact email addresses for: {vendor_name}

Email Subject: {subject}
Email Preview: {body_preview[:500]}

Found email addresses: {', '.join(all_emails)}

Task:
1. Identify which email addresses likely belong to {vendor_name} (the vendor/supplier)
2. EXCLUDE the following:
   - Any address containing: noreply, no-reply, no_reply, donotreply, do-not-reply
   - Automated systems: notifications@, alerts@, automated@
   - Internal addresses: @vicebio.com
   - Generic mailers: mailer@, daemon@
3. PRIORITIZE: general contact emails (info@, contact@, sales@, support@) over personal emails
4. INCLUDE: Actual business contact emails from the vendor's domain

Return a JSON object:
{{
    "vendor_emails": ["email1@domain.com", "email2@domain.com"],
    "confidence": "high/medium/low",
    "reasoning": "brief explanation"
}}

If no vendor emails found, return empty vendor_emails array."""

        try:
            response = self.openai_client.chat.completions.create(
                model="gpt-4o-mini",
                messages=[
                    {"role": "system", "content": "You are an expert at analyzing business emails and extracting vendor contact information. Always respond with valid JSON."},
                    {"role": "user", "content": prompt}
                ],
                temperature=0,
                response_format={"type": "json_object"}
            )
            
            result = json.loads(response.choices[0].message.content)
            
            # Post-validation: Verify DOMAIN-based vendor name matching
            validated_emails = []
            email_scores = {}
            
            for email_addr in result.get('vendor_emails', []):
                similarity = self._vendor_name_in_email(email_addr, vendor_name)
                email_scores[email_addr] = similarity
                
                # Domain-based matching threshold: 70% fuzzy match required
                if similarity >= 0.70:
                    validated_emails.append(email_addr)
                    self.log(f"    ✓ Validated '{email_addr}' - domain match score: {similarity:.2f}")
                else:
                    self.log(f"    ⚠ Filtered out '{email_addr}' - weak domain match (score: {similarity:.2f}, required: 0.70)")
            
            result['vendor_emails'] = validated_emails
            
            # Adjust confidence based on domain matching strength
            if validated_emails and result.get('confidence') == 'high':
                # For HIGH confidence, require at least one email with 75%+ domain match
                max_similarity = max(email_scores.get(e, 0.0) for e in validated_emails)
                if max_similarity < 0.75:
                    result['confidence'] = 'medium'
                    result['reasoning'] += f" (Downgraded: max domain match {max_similarity:.2f} < 0.75 required for high confidence)"
            
            return result
            
        except Exception as e:
            self.log(f"  LLM analysis error: {str(e)}")
            # Fallback: return all non-internal emails
            return {
                'vendor_emails': list(all_emails),
                'confidence': 'low',
                'reasoning': f'LLM analysis failed: {str(e)}'
            }
    
    def _create_search_keywords(self, vendor_name: str) -> List[str]:
        """
        Create search-friendly keywords from vendor name
        - Remove special characters
        - Remove legal suffixes (B.V., Ltd, Inc, etc.)
        - Split multi-word names
        """
        import unicodedata
        import re
        
        # Fix already-corrupted UTF-8 encoding if present
        try:
            # Try to detect and fix mojibake (corrupted UTF-8)
            if 'Ã' in vendor_name or 'â' in vendor_name:
                # Common replacements for corrupted characters
                vendor_name = vendor_name.replace('ë', 'e').replace('ä', 'a').replace('ö', 'o')
                vendor_name = vendor_name.replace('é', 'e').replace('è', 'e').replace('à ', 'a')
                vendor_name = vendor_name.replace('ñ', 'n').replace('ç', 'c')
        except:
            pass
        
        # Normalize unicode characters properly (ë -> e, ñ -> n, etc.)
        # Use NFD (decompose) then filter out combining characters
        normalized = unicodedata.normalize('NFD', vendor_name)
        ascii_name = ''.join(char for char in normalized if unicodedata.category(char) != 'Mn')
        
        # Remove any remaining non-ASCII characters
        ascii_name = re.sub(r'[^\x00-\x7F]+', '', ascii_name)
        
        # Remove common legal suffixes
        legal_suffixes = [
            'B.V.', 'BV', 'B V', 'N.V.', 'NV', 'N V', 'Ltd', 'Ltd.', 'Limited', 
            'Inc', 'Inc.', 'Incorporated', 'Corp', 'Corp.', 'Corporation',
            'GmbH', 'AG', 'SA', 'S.A.', 'SAS', 'S.A.S.', 'LLC', 'L.L.C.',
            'PLC', 'LLP', 'BVBA', 'SRL', 'Srl', 'SpA', 'S.p.A.'
        ]
        
        clean_name = ascii_name
        for suffix in legal_suffixes:
            # Remove suffix with various separators (case insensitive)
            pattern = re.compile(re.escape(suffix), re.IGNORECASE)
            clean_name = pattern.sub('', clean_name)
        
        # Remove trailing/leading spaces and punctuation
        clean_name = re.sub(r'[,.\-;]+$', '', clean_name)  # Remove trailing punctuation
        clean_name = re.sub(r'^[,.\-;]+', '', clean_name)  # Remove leading punctuation
        clean_name = clean_name.strip()
        
        # For simplicity and speed: just use the main company name without country words
        # Skip country/region names
        country_words = ['belgie', 'belgium', 'belgique', 'france', 'germany', 'deutschland',
                        'nederland', 'netherlands', 'italia', 'italy', 'espana', 'spain', 
                        'schweiz', 'suisse', 'austria', 'osterreich', 'polska', 'poland']
        
        words = clean_name.split()
        # Keep only words that are not countries and are substantial (>3 chars)
        keywords = [word for word in words if len(word) > 3 and word.lower() not in country_words]
        
        # If we filtered everything out, use the cleaned name
        if not keywords:
            keywords = [clean_name] if len(clean_name) > 3 else [vendor_name]
        
        # Remove duplicates while preserving order
        seen = set()
        unique_keywords = []
        for kw in keywords:
            if kw.lower() not in seen:
                seen.add(kw.lower())
                unique_keywords.append(kw)
        
        return unique_keywords[:2]  # Limit to max 2 keywords for speed
    
    def extract_for_vendor(
        self,
        vendor_name: str,
        vendor_keywords: Optional[List[str]] = None,
        max_mailboxes: Optional[int] = None,
        max_emails_per_mailbox: Optional[int] = None,
        days_back: Optional[int] = None
    ) -> pd.DataFrame:
        """
        Extract vendor emails from all mailboxes
        
        Args:
            vendor_name: Name of the vendor
            vendor_keywords: Keywords to search (defaults to vendor_name)
            max_mailboxes: Limit number of mailboxes to search (None = all mailboxes)
            max_emails_per_mailbox: Max emails to retrieve per mailbox (None = all matching emails)
            days_back: Days to search back (None = all emails)
        """
        if vendor_keywords is None:
            vendor_keywords = self._create_search_keywords(vendor_name)
        
        self.log(f"\n{'='*60}")
        self.log(f"Extracting emails for vendor: {vendor_name}")
        self.log(f"Search keywords: {', '.join(vendor_keywords)}")
        self.log(f"Search limits: max_mailboxes={max_mailboxes}, max_emails_per_mailbox={max_emails_per_mailbox}, days_back={days_back}")
        self.log(f"{'='*60}\n")
        
        # Authenticate
        if not self.access_token:
            self.authenticate()
        
        # Get mailboxes
        all_mailboxes = self.get_mailboxes(max_mailboxes=max_mailboxes)
        
        # Priority mailboxes for vendor-related emails (to reduce false positives)
        priority_emails = [
            'vendor_invoicing@vicebio.com',
            'daniel@vicebio.com',
            'emmanuel@vicebio.com',
            'sandra@vicebio.com',
            'elisabeth@vicebio.com',
            'wim@vicebio.com',
            'vincent@vicebio.com',
            'jean@vicebio.com',
            'koen@vicebio.com'
        ]
        
        # Split mailboxes into priority and others
        priority_mailboxes = []
        other_mailboxes = []
        
        for mailbox in all_mailboxes:
            email = mailbox.get('mail', '').lower()
            if email in priority_emails:
                priority_mailboxes.append(mailbox)
            else:
                other_mailboxes.append(mailbox)
        
        # Start with priority mailboxes
        self.log(f"Strategy: Search {len(priority_mailboxes)} priority mailboxes first")
        self.log(f"Priority mailboxes: {', '.join([m.get('mail') for m in priority_mailboxes])}")
        
        mailboxes = priority_mailboxes
        search_extended = False
        
        # Check if vendor name matches any employee names (to avoid false positives)
        vendor_name_lower = vendor_name.lower().strip()
        vendor_name_cleaned = vendor_name_lower.replace(' ', '').replace('-', '').replace('_', '').replace('.', '')
        
        # Check if vendor name fuzzy matches any employee PERSON name (not functional mailboxes)
        from difflib import SequenceMatcher
        
        for mailbox in mailboxes:
            display_name = mailbox.get('displayName', '').lower().strip()
            email = mailbox.get('mail', '').lower().strip()
            
            if not display_name or len(display_name) < 3:
                continue
            
            # Skip checking functional/shared mailboxes (they're not person names)
            # BUT keep vendor-related mailboxes like vendor_invoicing, info, etc.
            # Look for patterns like: "room", "labo", "meeting" etc. but NOT vendor/info/invoicing
            functional_patterns = ['room', 'labo', 'lab', 'meeting', 'filecloud', 'shared', 'admin']
            if any(pattern in display_name for pattern in functional_patterns):
                continue
            
            # Only check if display_name looks like a person name (contains space = first + last name)
            if ' ' not in display_name:
                continue
            
            # Clean name for comparison
            employee_cleaned = display_name.replace(' ', '').replace('-', '').replace('_', '').replace('.', '')
            
            # Calculate similarity
            similarity = SequenceMatcher(None, vendor_name_cleaned, employee_cleaned).ratio()
            
            # Also check if vendor name is mostly contained in employee name
            # (e.g., "Morgan" in "Morgan Bodson")
            if len(vendor_name_cleaned) > 4:  # Only for names longer than 4 chars
                if vendor_name_cleaned in employee_cleaned:
                    overlap = len(vendor_name_cleaned) / len(employee_cleaned)
                    if overlap > 0.6:  # Vendor name is 60%+ of employee name
                        similarity = max(similarity, 0.85)  # Boost similarity
            
            # Skip if similarity is very high (85%+) - stricter threshold
            if similarity >= 0.85:
                self.log(f"⚠️  WARNING: Vendor name '{vendor_name}' fuzzy matches employee '{display_name}' (similarity: {similarity:.2f})")
                self.log(f"⚠️  Skipping this vendor to avoid false positives from internal emails")
                self.log(f"{'='*60}\n")
                
                # Return empty result
                return pd.DataFrame([{
                    'Vendor': vendor_name,
                    'Retained Emails': f'[SKIPPED - matches employee: {display_name}]',
                    'Source Mailboxes': ''
                }])
        
        # Results storage
        results = []
        vendor_emails_found = {}  # Dict: email -> set of source mailboxes
        
        # Process each mailbox
        total_mailboxes = len(mailboxes)
        total_emails_found = 0
        high_confidence_count = 0
        
        for idx, mailbox in enumerate(mailboxes, 1):
            user_email = mailbox.get("mail")
            display_name = mailbox.get("displayName", "Unknown")
            
            self.log(f"\n[{idx}/{total_mailboxes}] Searching: {display_name} ({user_email})")
            
            # Search mailbox
            emails = self.search_mailbox(
                user_email,
                vendor_keywords,
                max_emails=max_emails_per_mailbox,
                days_back=days_back
            )
            
            total_emails_found += len(emails)
            self.log(f"  Found {len(emails)} matching emails (Total so far: {total_emails_found} emails)")
            
            # Analyze each email with LLM
            emails_analyzed = 0
            emails_with_results = 0
            
            for email_idx, email in enumerate(emails, 1):
                emails_analyzed += 1
                
                # Show progress for large batches
                if len(emails) > 5 and email_idx % 5 == 0:
                    self.log(f"    Analyzing emails: {email_idx}/{len(emails)}...")
                
                analysis = self.analyze_email_with_llm(email, vendor_name)
                
                if analysis['vendor_emails']:
                    emails_with_results += 1
                    
                    # Only store HIGH confidence results
                    if analysis['confidence'] == 'high':
                        high_confidence_count += 1
                        for vendor_email in analysis['vendor_emails']:
                            if vendor_email not in vendor_emails_found:
                                vendor_emails_found[vendor_email] = set()
                            vendor_emails_found[vendor_email].add(user_email)
                        
                        self.log(f"    ✓ Email {email_idx}: Extracted {len(analysis['vendor_emails'])} HIGH confidence address(es) - {', '.join(analysis['vendor_emails'])}")
                    else:
                        self.log(f"    • Email {email_idx}: Skipped {len(analysis['vendor_emails'])} address(es) - confidence too low ({analysis['confidence']})")
            
            # Summary for this mailbox
            if emails_with_results > 0:
                self.log(f"  ✓ Mailbox summary: {emails_with_results} emails analyzed")
            else:
                self.log(f"  • No high-confidence vendor emails found in this mailbox")
            
            # Rate limiting between mailboxes
            if idx < total_mailboxes:
                time.sleep(1)
        
        # Check if we should extend search to all mailboxes
        if not search_extended and len(vendor_emails_found) == 0 and len(other_mailboxes) > 0:
            self.log(f"\n{'='*60}")
            self.log(f"⚠️  No HIGH confidence emails found in priority mailboxes")
            self.log(f"Extending search to all {len(other_mailboxes)} remaining mailboxes...")
            self.log(f"{'='*60}\n")
            
            search_extended = True
            mailboxes = other_mailboxes
            
            # Process remaining mailboxes
            for idx, mailbox in enumerate(mailboxes, len(priority_mailboxes) + 1):
                user_email = mailbox.get("mail")
                display_name = mailbox.get("displayName", "Unknown")
                
                self.log(f"\n[{idx}/{len(priority_mailboxes) + len(other_mailboxes)}] Searching: {display_name} ({user_email})")
                
                # Search mailbox
                emails = self.search_mailbox(
                    user_email,
                    vendor_keywords,
                    max_emails=max_emails_per_mailbox,
                    days_back=days_back
                )
                
                total_emails_found += len(emails)
                self.log(f"  Found {len(emails)} matching emails (Total so far: {total_emails_found} emails)")
                
                # Analyze each email with LLM
                emails_analyzed = 0
                emails_with_results = 0
                
                for email_idx, email in enumerate(emails, 1):
                    emails_analyzed += 1
                    
                    # Show progress for large batches
                    if len(emails) > 5 and email_idx % 5 == 0:
                        self.log(f"    Analyzing emails: {email_idx}/{len(emails)}...")
                    
                    analysis = self.analyze_email_with_llm(email, vendor_name)
                    
                    if analysis['vendor_emails']:
                        emails_with_results += 1
                        
                        # Only store HIGH confidence results
                        if analysis['confidence'] == 'high':
                            high_confidence_count += 1
                            for vendor_email in analysis['vendor_emails']:
                                if vendor_email not in vendor_emails_found:
                                    vendor_emails_found[vendor_email] = set()
                                vendor_emails_found[vendor_email].add(user_email)
                            
                            self.log(f"    ✓ Email {email_idx}: Extracted {len(analysis['vendor_emails'])} HIGH confidence address(es) - {', '.join(analysis['vendor_emails'])}")
                        else:
                            self.log(f"    • Email {email_idx}: Skipped {len(analysis['vendor_emails'])} address(es) - confidence too low ({analysis['confidence']})")
                
                # Summary for this mailbox
                if emails_with_results > 0:
                    self.log(f"  ✓ Mailbox summary: {emails_with_results} emails analyzed")
                else:
                    self.log(f"  • No high-confidence vendor emails found in this mailbox")
                
                # Rate limiting between mailboxes
                if idx < len(priority_mailboxes) + len(other_mailboxes):
                    time.sleep(1)
                
                # Stop extended search if we found high-confidence emails
                if len(vendor_emails_found) > 0:
                    self.log(f"\n✓ Found HIGH confidence emails, stopping extended search")
                    break
        
        # Create DataFrame with 3-column format
        if vendor_emails_found:
            # Emails are already deduplicated in the dict keys
            # Prepare data: one row per vendor
            unique_emails = sorted(vendor_emails_found.keys())  # Already deduplicated
            vendor_emails_list = ', '.join(unique_emails)
            
            # Collect all source mailboxes (deduplicated)
            source_mailboxes_set = set()
            for mailboxes in vendor_emails_found.values():
                source_mailboxes_set.update(mailboxes)
            source_mailboxes_list = ', '.join(sorted(source_mailboxes_set))
            
            df = pd.DataFrame([{
                'Vendor': vendor_name,
                'Retained Emails': vendor_emails_list,
                'Source Mailboxes': source_mailboxes_list
            }])
        else:
            # No results: return empty row with vendor name
            df = pd.DataFrame([{
                'Vendor': vendor_name,
                'Retained Emails': '',
                'Source Mailboxes': ''
            }])
        
        self.log(f"\n{'='*60}")
        self.log(f"Extraction Complete for {vendor_name}")
        self.log(f"{'='*60}")
        self.log(f"Total mailboxes searched: {total_mailboxes}")
        self.log(f"Total emails analyzed: {total_emails_found}")
        self.log(f"High-confidence emails found: {high_confidence_count}")
        self.log(f"Unique vendor email addresses (HIGH confidence only): {len(vendor_emails_found)}")
        if vendor_emails_found:
            self.log(f"Vendor emails:")
            for email, mailboxes in sorted(vendor_emails_found.items()):
                self.log(f"  • {email} (found in {len(mailboxes)} mailbox(es): {', '.join(sorted(mailboxes))})")
        self.log(f"{'='*60}\n")
        
        return df
    
    def extract_for_vendor_list(
        self,
        vendor_list: List[str],
        max_mailboxes: Optional[int] = None,
        max_emails_per_mailbox: Optional[int] = None,
        days_back: Optional[int] = None,
        resume: bool = True
    ) -> pd.DataFrame:
        """
        Extract emails for multiple vendors
        
        Args:
            vendor_list: List of vendor names
            resume: If True, skip vendors already processed
        """
        self.log(f"\nStarting batch extraction for {len(vendor_list)} vendors")
        
        all_results = []
        processed_vendors = set()
        
        # Load progress if resuming
        if resume and self.progress_file.exists():
            with open(self.progress_file, 'r') as f:
                progress = json.load(f)
                processed_vendors = set(progress.get('processed_vendors', []))
                self.log(f"Resuming: {len(processed_vendors)} vendors already processed")
        
        for idx, vendor in enumerate(vendor_list, 1):
            if vendor in processed_vendors:
                self.log(f"[{idx}/{len(vendor_list)}] Skipping {vendor} (already processed)")
                continue
            
            try:
                self.log(f"\n[{idx}/{len(vendor_list)}] Processing: {vendor}")
                
                df = self.extract_for_vendor(
                    vendor,
                    max_mailboxes=max_mailboxes,
                    max_emails_per_mailbox=max_emails_per_mailbox,
                    days_back=days_back
                )
                
                if not df.empty:
                    all_results.append(df)
                
                # Save progress
                processed_vendors.add(vendor)
                with open(self.progress_file, 'w') as f:
                    json.dump({'processed_vendors': list(processed_vendors)}, f)
                
                # Save intermediate results
                if all_results:
                    combined_df = pd.concat(all_results, ignore_index=True)
                    combined_df.to_excel(self.results_file, index=False)
                    self.log(f"✓ Intermediate results saved: {self.results_file}")
                
            except Exception as e:
                self.log(f"ERROR processing {vendor}: {str(e)}")
                continue
        
        # Final results (already in 3-column format)
        if all_results:
            final_df = pd.concat(all_results, ignore_index=True)
            final_df.to_excel(self.results_file, index=False)
            
            self.log(f"\n{'='*60}")
            self.log(f"BATCH EXTRACTION COMPLETE")
            self.log(f"Results saved: {self.results_file}")
            self.log(f"Total vendors processed: {len(final_df)}")
            self.log(f"Vendors with emails: {len(final_df[final_df['Retained Emails'].str.len() > 0])}")
            self.log(f"{'='*60}\n")
            
            return final_df
        else:
            self.log("No results found")
            return pd.DataFrame()

Parameters

Name Type Default Kind
bases - -

Parameter Details

bases: Parameter of type

Return Value

Returns unspecified type

Class Interface

Methods

__init__(self, tenant_id, client_id, client_secret, openai_api_key, domain, output_dir)

Purpose: Internal method: init

Parameters:

  • tenant_id: Type: str
  • client_id: Type: str
  • client_secret: Type: str
  • openai_api_key: Type: str
  • domain: Type: str
  • output_dir: Type: str

Returns: None

log(self, message)

Purpose: Log message to console and file

Parameters:

  • message: Type: str

Returns: None

authenticate(self) -> str

Purpose: Authenticate using client credentials (application permissions)

Returns: Returns str

get_mailboxes(self, max_mailboxes) -> List[Dict]

Purpose: Get list of all mailboxes in the organization

Parameters:

  • max_mailboxes: Type: Optional[int]

Returns: Returns List[Dict]

search_mailbox(self, user_email, vendor_keywords, max_emails, days_back) -> List[Dict]

Purpose: Search a specific mailbox for emails containing vendor keywords Uses adaptive search strategy for high-volume results: - Tests result count first - If > 300 matches, tries individual keywords - If still too many, limits to 200 emails max Args: user_email: Email address of the mailbox to search vendor_keywords: List of vendor name keywords to search for max_emails: Maximum emails to retrieve per mailbox (None = no limit, retrieve all matching) days_back: Number of days to search back (None = no limit, search all emails)

Parameters:

  • user_email: Type: str
  • vendor_keywords: Type: List[str]
  • max_emails: Type: Optional[int]
  • days_back: Type: Optional[int]

Returns: Returns List[Dict]

extract_emails_from_text(self, text) -> Set[str]

Purpose: Extract email addresses from text using regex

Parameters:

  • text: Type: str

Returns: Returns Set[str]

_is_valid_vendor_email(self, email) -> bool

Purpose: Check if email passes basic validation

Parameters:

  • email: Type: str

Returns: Returns bool

_vendor_name_in_email(self, email_address, vendor_name) -> float

Purpose: Check if vendor name appears in email DOMAIN with fuzzy matching Returns similarity score (0.0 to 1.0) Focus: Domain matching is primary indicator of vendor ownership

Parameters:

  • email_address: Type: str
  • vendor_name: Type: str

Returns: Returns float

analyze_email_with_llm(self, email_data, vendor_name) -> Dict[str, any]

Purpose: Use LLM to extract relevant vendor email addresses from email content Returns: { 'vendor_emails': [list of extracted emails], 'confidence': 'high'/'medium'/'low', 'reasoning': 'explanation' }

Parameters:

  • email_data: Type: Dict
  • vendor_name: Type: str

Returns: Returns Dict[str, any]

_create_search_keywords(self, vendor_name) -> List[str]

Purpose: Create search-friendly keywords from vendor name - Remove special characters - Remove legal suffixes (B.V., Ltd, Inc, etc.) - Split multi-word names

Parameters:

  • vendor_name: Type: str

Returns: Returns List[str]

extract_for_vendor(self, vendor_name, vendor_keywords, max_mailboxes, max_emails_per_mailbox, days_back) -> pd.DataFrame

Purpose: Extract vendor emails from all mailboxes Args: vendor_name: Name of the vendor vendor_keywords: Keywords to search (defaults to vendor_name) max_mailboxes: Limit number of mailboxes to search (None = all mailboxes) max_emails_per_mailbox: Max emails to retrieve per mailbox (None = all matching emails) days_back: Days to search back (None = all emails)

Parameters:

  • vendor_name: Type: str
  • vendor_keywords: Type: Optional[List[str]]
  • max_mailboxes: Type: Optional[int]
  • max_emails_per_mailbox: Type: Optional[int]
  • days_back: Type: Optional[int]

Returns: Returns pd.DataFrame

extract_for_vendor_list(self, vendor_list, max_mailboxes, max_emails_per_mailbox, days_back, resume) -> pd.DataFrame

Purpose: Extract emails for multiple vendors Args: vendor_list: List of vendor names resume: If True, skip vendors already processed

Parameters:

  • vendor_list: Type: List[str]
  • max_mailboxes: Type: Optional[int]
  • max_emails_per_mailbox: Type: Optional[int]
  • days_back: Type: Optional[int]
  • resume: Type: bool

Returns: Returns pd.DataFrame

Required Imports

import os
import json
import time
import msal
import requests

Usage Example

# Example usage:
# result = VendorEmailExtractor(bases)

Similar Components

AI-powered semantic similarity - components with related functionality:

  • function main_v26 70.6% similar

    Demonstrates example usage of the VendorEmailExtractor class by searching for vendor emails across Office 365 mailboxes and displaying results.

    From: /tf/active/vicechatdev/find_email/vendor_email_extractor.py
  • function extract_batch 62.2% similar

    Batch processes a list of vendors from an Excel file to extract their email addresses by searching through Microsoft 365 mailboxes using AI-powered email analysis.

    From: /tf/active/vicechatdev/find_email/extract_vendor_batch.py
  • function test_email_search 61.0% similar

    Tests the email search functionality of a VendorEmailExtractor instance by searching for emails containing common business terms in the first available mailbox.

    From: /tf/active/vicechatdev/find_email/test_vendor_extractor.py
  • function main_v27 59.7% similar

    Command-line entry point that parses arguments and orchestrates the extraction of vendor emails from all vicebio.com mailboxes using Microsoft Graph API.

    From: /tf/active/vicechatdev/find_email/extract_vendor_batch.py
  • class VendorEnricher 54.1% similar

    A class that enriches vendor information by finding official email addresses and VAT numbers using RAG (Retrieval-Augmented Generation) with ChromaDB document search and web search capabilities.

    From: /tf/active/vicechatdev/find_email/vendor_enrichment.py
← Back to Browse