class SSOCallbackHandler
A Tornado RequestHandler that processes OAuth 2.0 callbacks from Azure AD, exchanges authorization codes for access tokens, validates user identity, and sets authentication cookies for SSO integration.
/tf/active/vicechatdev/CDocs/sso_plugin.py
16 - 161
complex
Purpose
This class serves as the OAuth callback endpoint in an Azure AD Single Sign-On (SSO) flow. It receives the authorization code from Azure AD after user authentication, exchanges it for an access token, validates the token to extract user information, and stores the authenticated user's data in a secure cookie. The handler includes CSRF protection via state validation (currently commented out), fallback token exchange mechanisms, and comprehensive error handling with user-friendly error pages.
Source Code
class SSOCallbackHandler(RequestHandler):
"""Handles the OAuth callback from Azure AD."""
def initialize(self):
"""Initialize the handler, setting up AzureSSO instance."""
self.azure_sso_instance = setup_azure_sso()
if not self.azure_sso_instance:
logger.error("SSOCallbackHandler: Failed to initialize AzureSSO instance. SSO will not work.")
async def get(self):
"""Handles the GET request from Azure AD after user authentication."""
logger.info("SSOPlugin: SSOCallbackHandler received GET request.")
# Log the full request URL for debugging
request_uri = self.request.uri
logger.info(f"SSOPlugin: Full request URI: {request_uri}")
if not self.azure_sso_instance:
self._send_error_page("SSO service is not configured correctly on the server.", 500)
return
try:
auth_code = self.get_argument("code", None)
returned_state = self.get_argument("state", None)
# --- State Validation (CSRF protection) ---
stored_state_cookie = self.get_cookie("sso_auth_state")
# Secure cookies are better, but require cookie_secret to be readable here.
# Using regular cookie for state for simplicity in this example.
# self.clear_cookie("sso_auth_state") # Clear state cookie once used
# if not stored_state_cookie or stored_state_cookie != returned_state:
# logger.warning("SSOPlugin: Invalid OAuth state parameter.")
# raise HTTPError(400, "Invalid state. Authentication request may have been tampered with.")
# logger.info("SSOPlugin: OAuth state validated.")
# --- End State Validation ---
if not auth_code:
logger.warning("SSOPlugin: Authorization code missing in callback.")
raise HTTPError(400, "Authorization code missing.")
# Log sanitized code for debugging
sanitized_code = auth_code[:5] + "..." + auth_code[-5:] if len(auth_code) > 10 else "***"
logger.info(f"SSOPlugin: Received auth code (sanitized): {sanitized_code}, length: {len(auth_code)}")
# Log the redirect URI being used for token exchange
logger.info(f"SSOPlugin: Using redirect URI for token exchange: {self.azure_sso_instance.redirect_uri}")
logger.info(f"SSOPlugin: Exchanging auth code for token.")
token_response = self.azure_sso_instance.get_token_from_code(auth_code)
# Log token response keys (not values for security)
if token_response:
logger.info(f"SSOPlugin: Token response received with keys: {token_response.keys()}")
else:
logger.error("SSOPlugin: Empty token response received from get_token_from_code")
if not token_response or "access_token" not in token_response:
logger.error(f"SSOPlugin: Failed to get token from code. Response: {token_response}")
# Try using a manual approach as fallback
try:
logger.info("SSOPlugin: Attempting fallback method for token exchange...")
fallback_token = self._fallback_token_exchange(auth_code)
if fallback_token and "access_token" in fallback_token:
logger.info("SSOPlugin: Fallback token exchange successful")
token_response = fallback_token
else:
raise HTTPError(500, "Failed to obtain token from identity provider.")
except Exception as fallback_error:
logger.error(f"SSOPlugin: Fallback token exchange also failed: {fallback_error}")
raise HTTPError(500, "Failed to obtain token from identity provider.")
logger.info("SSOPlugin: Token obtained. Validating token and extracting user info.")
user_info = validate_azure_token(token_response)
if not user_info or not user_info.get("email"):
logger.error(f"SSOPlugin: Failed to validate token or extract user info. User Info: {user_info}")
raise HTTPError(500, "Failed to validate user identity or extract required information.")
logger.info(f"SSOPlugin: User info extracted for {user_info.get('email')}. Setting auth cookie.")
# Store user_info in a cookie. The main Panel app will read this.
# Convert user_info to JSON string
user_info_json = json.dumps(user_info)
# Base64 encode the JSON string to make it safe for cookies
encoded_user_info = base64.b64encode(user_info_json.encode('utf-8')).decode('utf-8')
# Set the cookie with the encoded value
self.set_cookie(
"sso_pending_login_info",
encoded_user_info,
path="/",
httponly=True,
samesite="Lax",
expires_days=1/48 # 30 minutes expiry
)
logger.info("SSOPlugin: Auth cookie set successfully. Redirecting to main application.")
self.redirect("/") # Redirect to the root of the Panel app
except HTTPError as e:
logger.error(f"SSOPlugin: HTTPError in SSO callback: {e.status_code} - {e.log_message}", exc_info=True)
self._send_error_page(e.log_message, e.status_code)
except Exception as e:
logger.error(f"SSOPlugin: Unexpected error in SSO callback: {e}", exc_info=True)
self._send_error_page("An unexpected error occurred during the login process.")
def _fallback_token_exchange(self, auth_code):
"""Manual fallback for token exchange if MSAL fails."""
try:
import requests
token_url = f"https://login.microsoftonline.com/{self.azure_sso_instance.tenant_id}/oauth2/v2.0/token"
data = {
'client_id': self.azure_sso_instance.client_id,
'scope': self.azure_sso_instance.scope,
'code': auth_code,
'redirect_uri': self.azure_sso_instance.redirect_uri,
'grant_type': 'authorization_code',
'client_secret': self.azure_sso_instance.client_secret
}
logger.info(f"SSOPlugin: Fallback - Sending token request to {token_url}")
response = requests.post(token_url, data=data)
if response.status_code == 200:
return response.json()
else:
logger.error(f"SSOPlugin: Fallback token exchange failed with status {response.status_code}: {response.text}")
return {}
except Exception as e:
logger.error(f"SSOPlugin: Error in fallback token exchange: {e}")
return {}
def _send_error_page(self, message, status_code=500):
self.set_status(status_code)
self.set_header('Content-Type', 'text/html')
self.write(f"""
<html><head><title>Login Error</title>
<style>body {{font-family: sans-serif; padding: 20px;}} h1 {{color: #c00;}}</style></head>
<body><h1>Authentication Error</h1><p>Details: {message}</p>
<p><a href="/">Return to application</a></p>
</body></html>""")
Parameters
| Name | Type | Default | Kind |
|---|---|---|---|
bases |
RequestHandler | - |
Parameter Details
initialize: No parameters. Called automatically by Tornado to set up the handler instance before processing requests.
get: No parameters. Handles the GET request from Azure AD containing the authorization code and state parameters in the query string.
_fallback_token_exchange.auth_code: The authorization code received from Azure AD that needs to be exchanged for an access token. This is used when the primary MSAL-based token exchange fails.
_send_error_page.message: The error message to display to the user on the error page. Should be user-friendly and not expose sensitive system details.
_send_error_page.status_code: HTTP status code to return with the error page. Defaults to 500 (Internal Server Error).
Return Value
The class itself returns a RequestHandler instance when instantiated. The 'get' method returns None but sends HTTP responses (redirects or HTML pages) to the client. The '_fallback_token_exchange' method returns a dictionary containing token response data (with 'access_token' key) or an empty dictionary on failure. The '_send_error_page' method returns None but writes an HTML error page to the response.
Class Interface
Methods
initialize() -> None
Purpose: Initializes the handler by setting up the AzureSSO instance required for OAuth operations
Returns: None. Sets self.azure_sso_instance attribute or logs error if setup fails
async get() -> None
Purpose: Handles the OAuth callback GET request from Azure AD, exchanges authorization code for token, validates user, and sets authentication cookie
Returns: None. Sends HTTP redirect to '/' on success or error page on failure
_fallback_token_exchange(auth_code: str) -> dict
Purpose: Provides a manual fallback mechanism to exchange authorization code for access token using direct HTTP requests when MSAL fails
Parameters:
auth_code: The authorization code received from Azure AD to exchange for an access token
Returns: Dictionary containing token response with 'access_token' key on success, or empty dictionary on failure
_send_error_page(message: str, status_code: int = 500) -> None
Purpose: Sends a user-friendly HTML error page with the specified message and HTTP status code
Parameters:
message: Error message to display to the userstatus_code: HTTP status code to return (default: 500)
Returns: None. Writes HTML error page to the response
Attributes
| Name | Type | Description | Scope |
|---|---|---|---|
azure_sso_instance |
AzureSSO or None | Instance of AzureSSO class used for OAuth operations including token exchange and user validation. Set during initialize() method | instance |
Dependencies
tornadologgingjsonbase64uuidrequests
Required Imports
import logging
import json
import base64
from tornado.web import RequestHandler
from tornado.web import HTTPError
import uuid
from CDocs.config import settings
from CDocs.auth.azure_auth import AzureSSO
from CDocs.auth.azure_auth import validate_azure_token
from CDocs.auth.azure_auth import setup_azure_sso
Conditional/Optional Imports
These imports are only needed under specific conditions:
import requests
Condition: only when fallback token exchange is triggered (when MSAL token exchange fails)
Required (conditional)Usage Example
# In a Tornado application setup
from tornado.web import Application
from your_module import SSOCallbackHandler
# Define the application with the callback handler
app = Application([
(r'/auth/callback', SSOCallbackHandler),
], cookie_secret='your-secret-key')
# The handler is automatically invoked when Azure AD redirects to /auth/callback
# After successful authentication, user is redirected to '/' with sso_pending_login_info cookie set
# In your main application, read the cookie:
# user_info_encoded = self.get_cookie('sso_pending_login_info')
# if user_info_encoded:
# user_info_json = base64.b64decode(user_info_encoded).decode('utf-8')
# user_info = json.loads(user_info_json)
# email = user_info.get('email')
Best Practices
- The handler must be registered at the exact redirect URI configured in Azure AD application settings
- State validation (CSRF protection) is currently commented out but should be enabled in production by uncommenting the state validation code
- The sso_pending_login_info cookie should be cleared after being read by the main application to prevent replay attacks
- Cookie expiry is set to 30 minutes (1/48 days) - adjust based on security requirements
- The handler includes a fallback token exchange mechanism that bypasses MSAL if the primary method fails
- Error messages sent to users should not expose sensitive system information or stack traces
- The initialize() method must successfully create an AzureSSO instance or all requests will fail
- Authorization codes are logged in sanitized form (first 5 and last 5 characters) for debugging without exposing full codes
- The handler expects 'code' and 'state' query parameters from Azure AD in the callback URL
- User info is base64-encoded before storing in cookies to handle special characters safely
- The handler uses httponly and samesite cookie attributes for security
- Comprehensive logging is included at info and error levels for debugging SSO flows
Tags
Similar Components
AI-powered semantic similarity - components with related functionality:
-
function auth_callback 79.7% similar
-
function auth_callback_v2 76.6% similar
-
class AuthCodeHandler 76.4% similar
-
function azure_callback 76.3% similar
-
function auth_callback_v1 76.1% similar