🔍 Code Extractor

class EmailHandler

Maturity: 45

EmailHandler is a comprehensive email processing class that parses incoming email data, extracts content and attachments, enforces rate limits, and forwards emails via Office 365 using the O365Client.

File:
/tf/active/vicechatdev/email-forwarder/src/forwarder/email_handler.py
Lines:
26 - 354
Complexity:
complex

Purpose

This class serves as the core email processing engine for an SMTP forwarding service. It handles the complete lifecycle of email processing: parsing raw email bytes, decoding headers, extracting body content and attachments, validating email addresses, enforcing rate limits (per-minute and per-hour), and forwarding emails through Office 365. It maintains statistics on processed and failed emails, manages attachment size and type restrictions, and provides comprehensive logging throughout the process. The class is designed to be instantiated once and used to process multiple emails while maintaining state for rate limiting and statistics.

Source Code

class EmailHandler:
    def __init__(self):
        self.o365_client = O365Client()
        self.processed_count = 0
        self.failed_count = 0
        self.start_time = time.time()
        
        # Rate limiting
        self.email_timestamps = []
        self.max_per_minute = getattr(settings, 'MAX_EMAILS_PER_MINUTE', 30)
        self.max_per_hour = getattr(settings, 'MAX_EMAILS_PER_HOUR', 300)
        
        # Attachment settings
        self.max_attachment_size = getattr(settings, 'MAX_ATTACHMENT_SIZE', 25 * 1024 * 1024)  # 25MB default
        self.allowed_attachment_types = getattr(settings, 'ALLOWED_ATTACHMENT_TYPES', None)
        
    def decode_header_value(self, value: str) -> str:
        """Decode email header values that might be encoded"""
        if not value:
            return ""
            
        try:
            decoded_parts = decode_header(value)
            decoded_value = ""
            
            for part, encoding in decoded_parts:
                if isinstance(part, bytes):
                    if encoding:
                        decoded_value += part.decode(encoding)
                    else:
                        decoded_value += part.decode('utf-8', errors='ignore')
                else:
                    decoded_value += str(part)
                    
            return decoded_value.strip()
            
        except Exception as e:
            logger.warning(f"Failed to decode header value '{value}': {str(e)}")
            return str(value)
    
    def extract_email_address(self, email_field: str) -> Optional[str]:
        """Extract clean email address from email field"""
        if not email_field:
            return None
            
        try:
            # Decode the header first
            decoded_field = self.decode_header_value(email_field)
            
            # Parse the email address
            name, addr = parseaddr(decoded_field)
            
            # Validate the address
            if addr and '@' in addr:
                # Clean up the address
                addr = addr.strip('<>').strip()
                
                # Basic email validation
                if re.match(r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$', addr):
                    return addr
                    
        except Exception as e:
            logger.warning(f"Failed to extract email from '{email_field}': {str(e)}")
            
        return None
    
    def extract_attachments(self, message) -> List[Dict[str, Any]]:
        """Extract attachments from email message"""
        attachments = []
        
        try:
            for part in message.walk():
                content_disposition = str(part.get("Content-Disposition", ""))
                content_type = part.get_content_type()
                
                # Check if this part is an attachment
                if "attachment" in content_disposition or part.get_filename():
                    filename = part.get_filename()
                    
                    # Decode filename if encoded
                    if filename:
                        filename = self.decode_header_value(filename)
                    else:
                        # Generate filename if not provided
                        ext = mimetypes.guess_extension(content_type) or '.bin'
                        filename = f"attachment{ext}"
                    
                    # Get attachment content
                    payload = part.get_payload(decode=True)
                    
                    if payload:
                        # Check file size
                        if len(payload) > self.max_attachment_size:
                            logger.warning(f"Attachment '{filename}' too large ({len(payload)} bytes), skipping")
                            continue
                        
                        # Check file type if restrictions are in place
                        if self.allowed_attachment_types:
                            file_ext = filename.split('.')[-1].lower() if '.' in filename else ''
                            if file_ext not in self.allowed_attachment_types:
                                logger.warning(f"Attachment '{filename}' type not allowed, skipping")
                                continue
                        
                        # Encode content as base64 for MS Graph API
                        content_base64 = base64.b64encode(payload).decode('utf-8')
                        
                        attachment_info = {
                            'filename': filename,
                            'content_type': content_type,
                            'size': len(payload),
                            'content': content_base64
                        }
                        
                        attachments.append(attachment_info)
                        logger.info(f"Extracted attachment: {filename} ({len(payload)} bytes)")
                    
        except Exception as e:
            logger.error(f"Failed to extract attachments: {str(e)}")
        
        return attachments
    
    def parse_email(self, email_data: bytes) -> Dict[str, Any]:
        """Parse email data and extract relevant information"""
        try:
            # Parse the email
            message = email.message_from_bytes(email_data)
            
            # Extract and decode headers
            subject = self.decode_header_value(message.get('Subject', ''))
            from_header = message.get('From', '')
            to_header = message.get('To', '')
            reply_to_header = message.get('Reply-To', '')
            
            # Extract email addresses
            from_email = self.extract_email_address(from_header)
            to_email = self.extract_email_address(to_header)
            reply_to_email = self.extract_email_address(reply_to_header)
            
            # If we don't have a reply-to, use the from address
            if not reply_to_email and from_email:
                reply_to_email = from_email
            
            # Extract body
            body = self.extract_body(message)
            
            # Extract attachments
            attachments = self.extract_attachments(message)
            
            # Log the extracted information
            logger.info(f"Parsed email - From: {from_email}, To: {to_email}, Subject: {subject}")
            logger.debug(f"Reply-To: {reply_to_email}")
            if attachments:
                logger.info(f"Found {len(attachments)} attachments")
                for att in attachments:
                    logger.debug(f"  - {att['filename']} ({att['size']} bytes)")
            
            return {
                'subject': subject,
                'from_email': from_email,
                'to_email': to_email,
                'reply_to': reply_to_email,
                'body': body,
                'attachments': attachments,
                'raw_from': from_header,
                'raw_to': to_header,
                'raw_reply_to': reply_to_header
            }
            
        except Exception as e:
            logger.error(f"Failed to parse email: {str(e)}")
            return {}
    
    def extract_body(self, message) -> str:
        """Extract the email body content"""
        try:
            body = ""
            
            if message.is_multipart():
                for part in message.walk():
                    content_type = part.get_content_type()
                    content_disposition = str(part.get("Content-Disposition", ""))
                    
                    # Skip attachments
                    if "attachment" in content_disposition or part.get_filename():
                        continue
                    
                    if content_type == "text/plain":
                        charset = part.get_content_charset() or 'utf-8'
                        part_body = part.get_payload(decode=True)
                        if part_body:
                            body += part_body.decode(charset, errors='ignore')
                    elif content_type == "text/html":
                        charset = part.get_content_charset() or 'utf-8'
                        part_body = part.get_payload(decode=True)
                        if part_body:
                            body += part_body.decode(charset, errors='ignore')
                            break  # Prefer HTML over plain text
            else:
                # Single part message
                charset = message.get_content_charset() or 'utf-8'
                body_bytes = message.get_payload(decode=True)
                if body_bytes:
                    body = body_bytes.decode(charset, errors='ignore')
                else:
                    body = str(message.get_payload())
            
            return body.strip()
            
        except Exception as e:
            logger.error(f"Failed to extract email body: {str(e)}")
            return "Failed to extract email content"
    
    def check_rate_limit(self) -> bool:
        """Check if we're within rate limits"""
        current_time = time.time()
        
        # Remove timestamps older than 1 hour
        self.email_timestamps = [ts for ts in self.email_timestamps if current_time - ts < 3600]
        
        # Check hourly limit
        if len(self.email_timestamps) >= self.max_per_hour:
            logger.warning(f"Hourly rate limit reached ({self.max_per_hour} emails/hour)")
            return False
        
        # Check per-minute limit
        recent_emails = [ts for ts in self.email_timestamps if current_time - ts < 60]
        if len(recent_emails) >= self.max_per_minute:
            logger.warning(f"Per-minute rate limit reached ({self.max_per_minute} emails/minute)")
            return False
        
        return True
    
    def process_email(self, peer_address: str, email_data: bytes) -> bool:
        """Process and forward an email"""
        try:
            # Check rate limits
            if not self.check_rate_limit():
                logger.error("Rate limit exceeded, email rejected")
                return False
            
            # Parse the email
            parsed_email = self.parse_email(email_data)
            
            if not parsed_email:
                logger.error("Failed to parse email")
                return False
            
            # Validate required fields
            if not parsed_email.get('to_email'):
                logger.error("No valid recipient email address found")
                return False
            
            # Forward the email
            success = self.forward_email(
                to_email=parsed_email['to_email'],
                subject=parsed_email['subject'],
                body=parsed_email['body'],
                from_email=parsed_email['from_email'],
                reply_to=parsed_email['reply_to'],
                attachments=parsed_email.get('attachments', [])
            )
            
            if success:
                self.processed_count += 1
                self.email_timestamps.append(time.time())
                logger.info(f"Successfully processed email from {peer_address}")
            else:
                self.failed_count += 1
                logger.error(f"Failed to process email from {peer_address}")
            
            return success
            
        except Exception as e:
            logger.error(f"Exception processing email from {peer_address}: {str(e)}")
            self.failed_count += 1
            return False
    
    def forward_email(self, to_email: str, subject: str, body: str, 
                     from_email: Optional[str] = None, 
                     reply_to: Optional[str] = None,
                     attachments: Optional[List[Dict[str, Any]]] = None) -> bool:
        """Forward email using O365"""
        try:
            # Log the forwarding attempt
            logger.info(f"Forwarding email to {to_email}")
            logger.debug(f"Subject: {subject}")
            logger.debug(f"From: {from_email}")
            logger.debug(f"Reply-To: {reply_to}")
            if attachments:
                logger.info(f"Including {len(attachments)} attachments")
            
            # Use O365 client to send
            success = self.o365_client.send_email(
                to_email=to_email,
                subject=subject,
                body=body,
                from_email=from_email,
                reply_to=reply_to,
                attachments=attachments or []
            )
            
            if success:
                logger.info(f"Successfully forwarded email: {subject}")
            else:
                logger.error(f"Failed to forward email: {subject}")
            
            return success
            
        except Exception as e:
            logger.error(f"Exception forwarding email: {str(e)}")
            return False
    
    def get_stats(self) -> Dict[str, Any]:
        """Get handler statistics"""
        uptime = time.time() - self.start_time
        return {
            'processed_count': self.processed_count,
            'failed_count': self.failed_count,
            'uptime_seconds': uptime,
            'uptime_hours': uptime / 3600,
            'success_rate': (self.processed_count / (self.processed_count + self.failed_count) * 100) if (self.processed_count + self.failed_count) > 0 else 0
        }
    
    def reset_stats(self):
        """Reset statistics"""
        self.processed_count = 0
        self.failed_count = 0
        self.start_time = time.time()
        self.email_timestamps = []

Parameters

Name Type Default Kind
bases - -

Parameter Details

__init__: The constructor takes no parameters. It initializes the O365Client for email forwarding, sets up counters for processed and failed emails, records the start time for uptime tracking, configures rate limiting parameters from settings (MAX_EMAILS_PER_MINUTE defaults to 30, MAX_EMAILS_PER_HOUR defaults to 300), and sets attachment restrictions (MAX_ATTACHMENT_SIZE defaults to 25MB, ALLOWED_ATTACHMENT_TYPES can restrict file extensions).

Return Value

Instantiation returns an EmailHandler object ready to process emails. Key method returns: process_email() returns bool indicating success/failure; parse_email() returns Dict with keys 'subject', 'from_email', 'to_email', 'reply_to', 'body', 'attachments', 'raw_from', 'raw_to', 'raw_reply_to'; extract_attachments() returns List[Dict] with attachment metadata including 'filename', 'content_type', 'size', 'content' (base64); get_stats() returns Dict with 'processed_count', 'failed_count', 'uptime_seconds', 'uptime_hours', 'success_rate'; check_rate_limit() returns bool indicating if within limits; forward_email() returns bool indicating success.

Class Interface

Methods

__init__(self)

Purpose: Initialize the EmailHandler with O365Client, counters, rate limiting, and attachment settings

Returns: None - initializes instance attributes

decode_header_value(self, value: str) -> str

Purpose: Decode email header values that might be encoded (e.g., RFC 2047 encoded-words)

Parameters:

  • value: The encoded header value string to decode

Returns: Decoded string with proper character encoding, or original value if decoding fails

extract_email_address(self, email_field: str) -> Optional[str]

Purpose: Extract and validate a clean email address from an email header field

Parameters:

  • email_field: Raw email header field that may contain name and email address

Returns: Validated email address string or None if extraction/validation fails

extract_attachments(self, message) -> List[Dict[str, Any]]

Purpose: Extract all attachments from an email message with size and type validation

Parameters:

  • message: email.message.Message object to extract attachments from

Returns: List of dicts with keys 'filename', 'content_type', 'size', 'content' (base64-encoded)

parse_email(self, email_data: bytes) -> Dict[str, Any]

Purpose: Parse raw email bytes and extract all relevant information including headers, body, and attachments

Parameters:

  • email_data: Raw email data as bytes (RFC 822 format)

Returns: Dict with keys 'subject', 'from_email', 'to_email', 'reply_to', 'body', 'attachments', 'raw_from', 'raw_to', 'raw_reply_to', or empty dict on failure

extract_body(self, message) -> str

Purpose: Extract the email body content, preferring HTML over plain text for multipart messages

Parameters:

  • message: email.message.Message object to extract body from

Returns: Email body as string, or error message if extraction fails

check_rate_limit(self) -> bool

Purpose: Check if current email processing is within configured rate limits (per-minute and per-hour)

Returns: True if within limits, False if rate limit exceeded

process_email(self, peer_address: str, email_data: bytes) -> bool

Purpose: Main method to process an incoming email: check rate limits, parse, validate, and forward

Parameters:

  • peer_address: IP address or identifier of the email sender/peer
  • email_data: Raw email data as bytes

Returns: True if email was successfully processed and forwarded, False otherwise

forward_email(self, to_email: str, subject: str, body: str, from_email: Optional[str] = None, reply_to: Optional[str] = None, attachments: Optional[List[Dict[str, Any]]] = None) -> bool

Purpose: Forward an email using the O365Client with optional attachments and reply-to address

Parameters:

  • to_email: Recipient email address
  • subject: Email subject line
  • body: Email body content
  • from_email: Original sender email address (optional)
  • reply_to: Reply-to email address (optional)
  • attachments: List of attachment dicts with 'filename', 'content_type', 'size', 'content' keys (optional)

Returns: True if email was successfully sent via O365, False otherwise

get_stats(self) -> Dict[str, Any]

Purpose: Get current statistics about email processing performance

Returns: Dict with keys 'processed_count', 'failed_count', 'uptime_seconds', 'uptime_hours', 'success_rate'

reset_stats(self)

Purpose: Reset all statistics counters and timestamps to start fresh tracking

Returns: None - modifies instance state

Attributes

Name Type Description Scope
o365_client O365Client Instance of O365Client used for forwarding emails via Office 365 instance
processed_count int Counter for successfully processed emails instance
failed_count int Counter for failed email processing attempts instance
start_time float Timestamp when the handler was initialized, used for uptime calculation instance
email_timestamps List[float] List of timestamps for processed emails, used for rate limiting instance
max_per_minute int Maximum number of emails allowed per minute (from settings, default 30) instance
max_per_hour int Maximum number of emails allowed per hour (from settings, default 300) instance
max_attachment_size int Maximum allowed attachment size in bytes (from settings, default 25MB) instance
allowed_attachment_types Optional[List[str]] List of allowed file extensions for attachments, None means all types allowed (from settings) instance

Dependencies

  • email
  • logging
  • time
  • base64
  • mimetypes
  • re
  • typing

Required Imports

import email
import logging
import time
import base64
import mimetypes
from email.header import decode_header
from email.utils import parseaddr
from typing import Dict, Any, Optional, List
import re
from o365_client import O365Client
from config import settings

Usage Example

# Initialize the handler
handler = EmailHandler()

# Process an incoming email
email_data = b'From: sender@example.com\r\nTo: recipient@example.com\r\nSubject: Test\r\n\r\nEmail body'
peer_address = '192.168.1.100'
success = handler.process_email(peer_address, email_data)

if success:
    print('Email processed successfully')

# Check statistics
stats = handler.get_stats()
print(f"Processed: {stats['processed_count']}, Failed: {stats['failed_count']}")
print(f"Success rate: {stats['success_rate']:.2f}%")

# Parse email manually for inspection
parsed = handler.parse_email(email_data)
print(f"Subject: {parsed['subject']}")
print(f"From: {parsed['from_email']}")
print(f"Attachments: {len(parsed['attachments'])}")

# Forward email directly
handler.forward_email(
    to_email='recipient@example.com',
    subject='Test Email',
    body='This is a test',
    from_email='sender@example.com',
    reply_to='reply@example.com',
    attachments=[]
)

# Reset statistics
handler.reset_stats()

Best Practices

  • Instantiate EmailHandler once and reuse it for processing multiple emails to maintain rate limiting state and statistics
  • The handler maintains internal state (processed_count, failed_count, email_timestamps) that persists across multiple email processing calls
  • Rate limits are enforced automatically in process_email() - check_rate_limit() is called internally
  • Always check the return value of process_email() to determine if the email was successfully processed
  • The handler logs extensively - ensure logging is properly configured before use
  • Attachment size and type restrictions are enforced during extraction - configure settings.MAX_ATTACHMENT_SIZE and settings.ALLOWED_ATTACHMENT_TYPES as needed
  • The parse_email() method returns an empty dict on failure - always check if the result is truthy before accessing keys
  • Email timestamps are automatically cleaned up (older than 1 hour removed) during rate limit checks
  • The handler depends on O365Client being properly configured - ensure O365 authentication is set up before instantiating EmailHandler
  • Use get_stats() periodically to monitor email processing performance and success rates
  • Call reset_stats() if you need to clear counters and start fresh tracking
  • The extract_email_address() method performs basic validation - invalid email formats return None
  • Attachments are base64-encoded for MS Graph API compatibility
  • HTML email bodies are preferred over plain text when both are available
  • Reply-To header defaults to From address if not explicitly set in the email

Similar Components

AI-powered semantic similarity - components with related functionality:

  • class EmailForwardingHandler 75.8% similar

    SMTP message handler class that processes incoming email messages and forwards them using an EmailHandler instance while tracking server statistics.

    From: /tf/active/vicechatdev/email-forwarder/src/forwarder/smtp_server.py
  • class TestEmailHandler 71.5% similar

    A unit test class that validates the functionality of the EmailHandler class, specifically testing email forwarding success and failure scenarios using mocked O365Client dependencies.

    From: /tf/active/vicechatdev/email-forwarder/tests/test_email_handler.py
  • class O365Client 59.6% similar

    A client class for interacting with Microsoft 365 Graph API to send emails with authentication, validation, and attachment support.

    From: /tf/active/vicechatdev/email-forwarder/src/forwarder/o365_client.py
  • function test_email_handler 59.0% similar

    A test function that initializes an EmailHandler instance and verifies it can retrieve statistics, printing success or failure messages.

    From: /tf/active/vicechatdev/email-forwarder/test_service.py
  • class FileCloudEmailProcessor 59.0% similar

    A class that processes email files (.msg format) stored in FileCloud by finding, downloading, converting them to EML and PDF formats, and organizing them into mail_archive folders.

    From: /tf/active/vicechatdev/msg_to_eml.py
← Back to Browse