class EmailHandler
EmailHandler is a comprehensive email processing class that parses incoming email data, extracts content and attachments, enforces rate limits, and forwards emails via Office 365 using the O365Client.
/tf/active/vicechatdev/email-forwarder/src/forwarder/email_handler.py
26 - 354
complex
Purpose
This class serves as the core email processing engine for an SMTP forwarding service. It handles the complete lifecycle of email processing: parsing raw email bytes, decoding headers, extracting body content and attachments, validating email addresses, enforcing rate limits (per-minute and per-hour), and forwarding emails through Office 365. It maintains statistics on processed and failed emails, manages attachment size and type restrictions, and provides comprehensive logging throughout the process. The class is designed to be instantiated once and used to process multiple emails while maintaining state for rate limiting and statistics.
Source Code
class EmailHandler:
def __init__(self):
self.o365_client = O365Client()
self.processed_count = 0
self.failed_count = 0
self.start_time = time.time()
# Rate limiting
self.email_timestamps = []
self.max_per_minute = getattr(settings, 'MAX_EMAILS_PER_MINUTE', 30)
self.max_per_hour = getattr(settings, 'MAX_EMAILS_PER_HOUR', 300)
# Attachment settings
self.max_attachment_size = getattr(settings, 'MAX_ATTACHMENT_SIZE', 25 * 1024 * 1024) # 25MB default
self.allowed_attachment_types = getattr(settings, 'ALLOWED_ATTACHMENT_TYPES', None)
def decode_header_value(self, value: str) -> str:
"""Decode email header values that might be encoded"""
if not value:
return ""
try:
decoded_parts = decode_header(value)
decoded_value = ""
for part, encoding in decoded_parts:
if isinstance(part, bytes):
if encoding:
decoded_value += part.decode(encoding)
else:
decoded_value += part.decode('utf-8', errors='ignore')
else:
decoded_value += str(part)
return decoded_value.strip()
except Exception as e:
logger.warning(f"Failed to decode header value '{value}': {str(e)}")
return str(value)
def extract_email_address(self, email_field: str) -> Optional[str]:
"""Extract clean email address from email field"""
if not email_field:
return None
try:
# Decode the header first
decoded_field = self.decode_header_value(email_field)
# Parse the email address
name, addr = parseaddr(decoded_field)
# Validate the address
if addr and '@' in addr:
# Clean up the address
addr = addr.strip('<>').strip()
# Basic email validation
if re.match(r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$', addr):
return addr
except Exception as e:
logger.warning(f"Failed to extract email from '{email_field}': {str(e)}")
return None
def extract_attachments(self, message) -> List[Dict[str, Any]]:
"""Extract attachments from email message"""
attachments = []
try:
for part in message.walk():
content_disposition = str(part.get("Content-Disposition", ""))
content_type = part.get_content_type()
# Check if this part is an attachment
if "attachment" in content_disposition or part.get_filename():
filename = part.get_filename()
# Decode filename if encoded
if filename:
filename = self.decode_header_value(filename)
else:
# Generate filename if not provided
ext = mimetypes.guess_extension(content_type) or '.bin'
filename = f"attachment{ext}"
# Get attachment content
payload = part.get_payload(decode=True)
if payload:
# Check file size
if len(payload) > self.max_attachment_size:
logger.warning(f"Attachment '{filename}' too large ({len(payload)} bytes), skipping")
continue
# Check file type if restrictions are in place
if self.allowed_attachment_types:
file_ext = filename.split('.')[-1].lower() if '.' in filename else ''
if file_ext not in self.allowed_attachment_types:
logger.warning(f"Attachment '{filename}' type not allowed, skipping")
continue
# Encode content as base64 for MS Graph API
content_base64 = base64.b64encode(payload).decode('utf-8')
attachment_info = {
'filename': filename,
'content_type': content_type,
'size': len(payload),
'content': content_base64
}
attachments.append(attachment_info)
logger.info(f"Extracted attachment: {filename} ({len(payload)} bytes)")
except Exception as e:
logger.error(f"Failed to extract attachments: {str(e)}")
return attachments
def parse_email(self, email_data: bytes) -> Dict[str, Any]:
"""Parse email data and extract relevant information"""
try:
# Parse the email
message = email.message_from_bytes(email_data)
# Extract and decode headers
subject = self.decode_header_value(message.get('Subject', ''))
from_header = message.get('From', '')
to_header = message.get('To', '')
reply_to_header = message.get('Reply-To', '')
# Extract email addresses
from_email = self.extract_email_address(from_header)
to_email = self.extract_email_address(to_header)
reply_to_email = self.extract_email_address(reply_to_header)
# If we don't have a reply-to, use the from address
if not reply_to_email and from_email:
reply_to_email = from_email
# Extract body
body = self.extract_body(message)
# Extract attachments
attachments = self.extract_attachments(message)
# Log the extracted information
logger.info(f"Parsed email - From: {from_email}, To: {to_email}, Subject: {subject}")
logger.debug(f"Reply-To: {reply_to_email}")
if attachments:
logger.info(f"Found {len(attachments)} attachments")
for att in attachments:
logger.debug(f" - {att['filename']} ({att['size']} bytes)")
return {
'subject': subject,
'from_email': from_email,
'to_email': to_email,
'reply_to': reply_to_email,
'body': body,
'attachments': attachments,
'raw_from': from_header,
'raw_to': to_header,
'raw_reply_to': reply_to_header
}
except Exception as e:
logger.error(f"Failed to parse email: {str(e)}")
return {}
def extract_body(self, message) -> str:
"""Extract the email body content"""
try:
body = ""
if message.is_multipart():
for part in message.walk():
content_type = part.get_content_type()
content_disposition = str(part.get("Content-Disposition", ""))
# Skip attachments
if "attachment" in content_disposition or part.get_filename():
continue
if content_type == "text/plain":
charset = part.get_content_charset() or 'utf-8'
part_body = part.get_payload(decode=True)
if part_body:
body += part_body.decode(charset, errors='ignore')
elif content_type == "text/html":
charset = part.get_content_charset() or 'utf-8'
part_body = part.get_payload(decode=True)
if part_body:
body += part_body.decode(charset, errors='ignore')
break # Prefer HTML over plain text
else:
# Single part message
charset = message.get_content_charset() or 'utf-8'
body_bytes = message.get_payload(decode=True)
if body_bytes:
body = body_bytes.decode(charset, errors='ignore')
else:
body = str(message.get_payload())
return body.strip()
except Exception as e:
logger.error(f"Failed to extract email body: {str(e)}")
return "Failed to extract email content"
def check_rate_limit(self) -> bool:
"""Check if we're within rate limits"""
current_time = time.time()
# Remove timestamps older than 1 hour
self.email_timestamps = [ts for ts in self.email_timestamps if current_time - ts < 3600]
# Check hourly limit
if len(self.email_timestamps) >= self.max_per_hour:
logger.warning(f"Hourly rate limit reached ({self.max_per_hour} emails/hour)")
return False
# Check per-minute limit
recent_emails = [ts for ts in self.email_timestamps if current_time - ts < 60]
if len(recent_emails) >= self.max_per_minute:
logger.warning(f"Per-minute rate limit reached ({self.max_per_minute} emails/minute)")
return False
return True
def process_email(self, peer_address: str, email_data: bytes) -> bool:
"""Process and forward an email"""
try:
# Check rate limits
if not self.check_rate_limit():
logger.error("Rate limit exceeded, email rejected")
return False
# Parse the email
parsed_email = self.parse_email(email_data)
if not parsed_email:
logger.error("Failed to parse email")
return False
# Validate required fields
if not parsed_email.get('to_email'):
logger.error("No valid recipient email address found")
return False
# Forward the email
success = self.forward_email(
to_email=parsed_email['to_email'],
subject=parsed_email['subject'],
body=parsed_email['body'],
from_email=parsed_email['from_email'],
reply_to=parsed_email['reply_to'],
attachments=parsed_email.get('attachments', [])
)
if success:
self.processed_count += 1
self.email_timestamps.append(time.time())
logger.info(f"Successfully processed email from {peer_address}")
else:
self.failed_count += 1
logger.error(f"Failed to process email from {peer_address}")
return success
except Exception as e:
logger.error(f"Exception processing email from {peer_address}: {str(e)}")
self.failed_count += 1
return False
def forward_email(self, to_email: str, subject: str, body: str,
from_email: Optional[str] = None,
reply_to: Optional[str] = None,
attachments: Optional[List[Dict[str, Any]]] = None) -> bool:
"""Forward email using O365"""
try:
# Log the forwarding attempt
logger.info(f"Forwarding email to {to_email}")
logger.debug(f"Subject: {subject}")
logger.debug(f"From: {from_email}")
logger.debug(f"Reply-To: {reply_to}")
if attachments:
logger.info(f"Including {len(attachments)} attachments")
# Use O365 client to send
success = self.o365_client.send_email(
to_email=to_email,
subject=subject,
body=body,
from_email=from_email,
reply_to=reply_to,
attachments=attachments or []
)
if success:
logger.info(f"Successfully forwarded email: {subject}")
else:
logger.error(f"Failed to forward email: {subject}")
return success
except Exception as e:
logger.error(f"Exception forwarding email: {str(e)}")
return False
def get_stats(self) -> Dict[str, Any]:
"""Get handler statistics"""
uptime = time.time() - self.start_time
return {
'processed_count': self.processed_count,
'failed_count': self.failed_count,
'uptime_seconds': uptime,
'uptime_hours': uptime / 3600,
'success_rate': (self.processed_count / (self.processed_count + self.failed_count) * 100) if (self.processed_count + self.failed_count) > 0 else 0
}
def reset_stats(self):
"""Reset statistics"""
self.processed_count = 0
self.failed_count = 0
self.start_time = time.time()
self.email_timestamps = []
Parameters
| Name | Type | Default | Kind |
|---|---|---|---|
bases |
- | - |
Parameter Details
__init__: The constructor takes no parameters. It initializes the O365Client for email forwarding, sets up counters for processed and failed emails, records the start time for uptime tracking, configures rate limiting parameters from settings (MAX_EMAILS_PER_MINUTE defaults to 30, MAX_EMAILS_PER_HOUR defaults to 300), and sets attachment restrictions (MAX_ATTACHMENT_SIZE defaults to 25MB, ALLOWED_ATTACHMENT_TYPES can restrict file extensions).
Return Value
Instantiation returns an EmailHandler object ready to process emails. Key method returns: process_email() returns bool indicating success/failure; parse_email() returns Dict with keys 'subject', 'from_email', 'to_email', 'reply_to', 'body', 'attachments', 'raw_from', 'raw_to', 'raw_reply_to'; extract_attachments() returns List[Dict] with attachment metadata including 'filename', 'content_type', 'size', 'content' (base64); get_stats() returns Dict with 'processed_count', 'failed_count', 'uptime_seconds', 'uptime_hours', 'success_rate'; check_rate_limit() returns bool indicating if within limits; forward_email() returns bool indicating success.
Class Interface
Methods
__init__(self)
Purpose: Initialize the EmailHandler with O365Client, counters, rate limiting, and attachment settings
Returns: None - initializes instance attributes
decode_header_value(self, value: str) -> str
Purpose: Decode email header values that might be encoded (e.g., RFC 2047 encoded-words)
Parameters:
value: The encoded header value string to decode
Returns: Decoded string with proper character encoding, or original value if decoding fails
extract_email_address(self, email_field: str) -> Optional[str]
Purpose: Extract and validate a clean email address from an email header field
Parameters:
email_field: Raw email header field that may contain name and email address
Returns: Validated email address string or None if extraction/validation fails
extract_attachments(self, message) -> List[Dict[str, Any]]
Purpose: Extract all attachments from an email message with size and type validation
Parameters:
message: email.message.Message object to extract attachments from
Returns: List of dicts with keys 'filename', 'content_type', 'size', 'content' (base64-encoded)
parse_email(self, email_data: bytes) -> Dict[str, Any]
Purpose: Parse raw email bytes and extract all relevant information including headers, body, and attachments
Parameters:
email_data: Raw email data as bytes (RFC 822 format)
Returns: Dict with keys 'subject', 'from_email', 'to_email', 'reply_to', 'body', 'attachments', 'raw_from', 'raw_to', 'raw_reply_to', or empty dict on failure
extract_body(self, message) -> str
Purpose: Extract the email body content, preferring HTML over plain text for multipart messages
Parameters:
message: email.message.Message object to extract body from
Returns: Email body as string, or error message if extraction fails
check_rate_limit(self) -> bool
Purpose: Check if current email processing is within configured rate limits (per-minute and per-hour)
Returns: True if within limits, False if rate limit exceeded
process_email(self, peer_address: str, email_data: bytes) -> bool
Purpose: Main method to process an incoming email: check rate limits, parse, validate, and forward
Parameters:
peer_address: IP address or identifier of the email sender/peeremail_data: Raw email data as bytes
Returns: True if email was successfully processed and forwarded, False otherwise
forward_email(self, to_email: str, subject: str, body: str, from_email: Optional[str] = None, reply_to: Optional[str] = None, attachments: Optional[List[Dict[str, Any]]] = None) -> bool
Purpose: Forward an email using the O365Client with optional attachments and reply-to address
Parameters:
to_email: Recipient email addresssubject: Email subject linebody: Email body contentfrom_email: Original sender email address (optional)reply_to: Reply-to email address (optional)attachments: List of attachment dicts with 'filename', 'content_type', 'size', 'content' keys (optional)
Returns: True if email was successfully sent via O365, False otherwise
get_stats(self) -> Dict[str, Any]
Purpose: Get current statistics about email processing performance
Returns: Dict with keys 'processed_count', 'failed_count', 'uptime_seconds', 'uptime_hours', 'success_rate'
reset_stats(self)
Purpose: Reset all statistics counters and timestamps to start fresh tracking
Returns: None - modifies instance state
Attributes
| Name | Type | Description | Scope |
|---|---|---|---|
o365_client |
O365Client | Instance of O365Client used for forwarding emails via Office 365 | instance |
processed_count |
int | Counter for successfully processed emails | instance |
failed_count |
int | Counter for failed email processing attempts | instance |
start_time |
float | Timestamp when the handler was initialized, used for uptime calculation | instance |
email_timestamps |
List[float] | List of timestamps for processed emails, used for rate limiting | instance |
max_per_minute |
int | Maximum number of emails allowed per minute (from settings, default 30) | instance |
max_per_hour |
int | Maximum number of emails allowed per hour (from settings, default 300) | instance |
max_attachment_size |
int | Maximum allowed attachment size in bytes (from settings, default 25MB) | instance |
allowed_attachment_types |
Optional[List[str]] | List of allowed file extensions for attachments, None means all types allowed (from settings) | instance |
Dependencies
emailloggingtimebase64mimetypesretyping
Required Imports
import email
import logging
import time
import base64
import mimetypes
from email.header import decode_header
from email.utils import parseaddr
from typing import Dict, Any, Optional, List
import re
from o365_client import O365Client
from config import settings
Usage Example
# Initialize the handler
handler = EmailHandler()
# Process an incoming email
email_data = b'From: sender@example.com\r\nTo: recipient@example.com\r\nSubject: Test\r\n\r\nEmail body'
peer_address = '192.168.1.100'
success = handler.process_email(peer_address, email_data)
if success:
print('Email processed successfully')
# Check statistics
stats = handler.get_stats()
print(f"Processed: {stats['processed_count']}, Failed: {stats['failed_count']}")
print(f"Success rate: {stats['success_rate']:.2f}%")
# Parse email manually for inspection
parsed = handler.parse_email(email_data)
print(f"Subject: {parsed['subject']}")
print(f"From: {parsed['from_email']}")
print(f"Attachments: {len(parsed['attachments'])}")
# Forward email directly
handler.forward_email(
to_email='recipient@example.com',
subject='Test Email',
body='This is a test',
from_email='sender@example.com',
reply_to='reply@example.com',
attachments=[]
)
# Reset statistics
handler.reset_stats()
Best Practices
- Instantiate EmailHandler once and reuse it for processing multiple emails to maintain rate limiting state and statistics
- The handler maintains internal state (processed_count, failed_count, email_timestamps) that persists across multiple email processing calls
- Rate limits are enforced automatically in process_email() - check_rate_limit() is called internally
- Always check the return value of process_email() to determine if the email was successfully processed
- The handler logs extensively - ensure logging is properly configured before use
- Attachment size and type restrictions are enforced during extraction - configure settings.MAX_ATTACHMENT_SIZE and settings.ALLOWED_ATTACHMENT_TYPES as needed
- The parse_email() method returns an empty dict on failure - always check if the result is truthy before accessing keys
- Email timestamps are automatically cleaned up (older than 1 hour removed) during rate limit checks
- The handler depends on O365Client being properly configured - ensure O365 authentication is set up before instantiating EmailHandler
- Use get_stats() periodically to monitor email processing performance and success rates
- Call reset_stats() if you need to clear counters and start fresh tracking
- The extract_email_address() method performs basic validation - invalid email formats return None
- Attachments are base64-encoded for MS Graph API compatibility
- HTML email bodies are preferred over plain text when both are available
- Reply-To header defaults to From address if not explicitly set in the email
Tags
Similar Components
AI-powered semantic similarity - components with related functionality:
-
class EmailForwardingHandler 75.8% similar
-
class TestEmailHandler 71.5% similar
-
class O365Client 59.6% similar
-
function test_email_handler 59.0% similar
-
class FileCloudEmailProcessor 59.0% similar