""" Authentication utilities for FastHTML application. This module provides: - JWT token validation and refresh logic - Beforeware for protecting routes - Helper functions for API communication """ from datetime import datetime, timezone from typing import Optional, Dict, Any, List import httpx from fasthtml.common import RedirectResponse, Beforeware from jose import jwt, JWTError # Configuration API_BASE_URL = "http://localhost:5001" # Base URL for FastAPI backend JWT_SECRET = "jwt-secret-to-change" # Must match FastAPI secret JWT_ALGORITHM = "HS256" TOKEN_REFRESH_THRESHOLD_MINUTES = 5 # Refresh token if expires in less than 5 minutes # Default patterns to skip authentication DEFAULT_SKIP_PATTERNS = [ r'/favicon\.ico', r'/static/.*', r'.*\.css', r'.*\.js', r'/myfasthtml/.*\.css', r'/myfasthtml/.*\.js', '/login', '/register', '/logout', ] http_client = httpx def create_auth_beforeware(additional_patterns: Optional[List[str]] = None) -> Beforeware: """ Create a Beforeware instance for route protection. Args: additional_patterns: Optional list of additional URL patterns to skip authentication Returns: Beforeware instance configured for authentication """ patterns = DEFAULT_SKIP_PATTERNS.copy() if additional_patterns: patterns.extend(additional_patterns) return Beforeware(auth_before, skip=patterns) def auth_before(request, session): """ Beforeware function that runs before each protected route. Checks authentication status and automatically refreshes tokens if needed. Redirects to login if authentication fails. Args: request: Starlette request object session: FastHTML session object Returns: RedirectResponse to login page if authentication fails, None otherwise """ # Get tokens from session access_token = session.get('access_token') refresh_token = session.get('refresh_token') print(f"path={request.scope['path']}, {session=}, {access_token=}, {refresh_token=}") # If no access token, redirect to login if not access_token: return RedirectResponse('/login', status_code=303) # Validate access token and check expiration try: payload = decode_jwt(access_token) exp_timestamp = payload.get('exp') if exp_timestamp: exp_datetime = datetime.fromtimestamp(exp_timestamp, tz=timezone.utc) now = datetime.now(timezone.utc) time_until_expiry = (exp_datetime - now).total_seconds() / 60 # If token expires in less than 5 minutes, try to refresh if time_until_expiry < TOKEN_REFRESH_THRESHOLD_MINUTES: if refresh_token: new_tokens = refresh_access_token(refresh_token) if new_tokens: # Update session with new tokens session['access_token'] = new_tokens['access_token'] session['refresh_token'] = new_tokens['refresh_token'] # Store auth info in request scope request.scope['auth'] = session.get('user_info') return None else: # Refresh failed, redirect to login return RedirectResponse('/login', status_code=303) else: # No refresh token available, redirect to login return RedirectResponse('/login', status_code=303) # Token is valid, store auth info in request scope request.scope['auth'] = session.get('user_info') return None except JWTError: # Token is invalid or expired, try to refresh if refresh_token: new_tokens = refresh_access_token(refresh_token) if new_tokens: # Update session with new tokens session['access_token'] = new_tokens['access_token'] session['refresh_token'] = new_tokens['refresh_token'] request.scope['auth'] = session.get('user_info') return None # Could not refresh, redirect to login return RedirectResponse('/login', status_code=303) def decode_jwt(token: str) -> Dict[str, Any]: """ Decode and validate a JWT token. Args: token: JWT token string Returns: Dictionary containing the token payload Raises: JWTError: If token is invalid or expired """ return jwt.decode(token, JWT_SECRET, algorithms=[JWT_ALGORITHM]) def check_token_expiry(token: str) -> Optional[float]: """ Check how many minutes until token expires. Args: token: JWT token string Returns: Minutes until expiration, or None if token is invalid """ try: payload = decode_jwt(token) exp_timestamp = payload.get('exp') if exp_timestamp: exp_datetime = datetime.fromtimestamp(exp_timestamp, tz=timezone.utc) now = datetime.now(timezone.utc) time_until_expiry = (exp_datetime - now).total_seconds() / 60 return time_until_expiry return None except JWTError: return None def login_user(email: str, password: str) -> Optional[Dict[str, Any]]: """ Authenticate user with email and password. Args: email: User email address password: User password Returns: Dictionary containing access_token, refresh_token, and user_info if successful, None if authentication fails """ try: response = http_client.post( f"{API_BASE_URL}/auth/login", data={"username": email, "password": password}, headers={"Content-Type": "application/x-www-form-urlencoded"}, timeout=10.0 ) if response.status_code == 200: data = response.json() return { 'access_token': data.get('access_token'), 'refresh_token': data.get('refresh_token'), 'token_type': data.get('token_type'), } return None except httpx.HTTPError: return None def register_user(email: str, username: str, password: str) -> Optional[Dict[str, Any]]: """ Register a new user. Args: email: User email address username: User name password: User password Returns: Dictionary containing success message if registration succeeds, None if registration fails """ try: response = http_client.post( f"{API_BASE_URL}/auth/register", json={"email": email, "username": username, "password": password}, timeout=10.0 ) if response.status_code in (200, 201): return response.json() return None except httpx.HTTPError as ex: return None def refresh_access_token(refresh_token: str) -> Optional[Dict[str, Any]]: """ Refresh the access token using a refresh token. Args: refresh_token: Valid refresh token Returns: Dictionary containing new access_token and refresh_token if successful, None if refresh fails """ try: response = http_client.post( f"{API_BASE_URL}/auth/refresh", json={"refresh_token": refresh_token}, timeout=10.0 ) if response.status_code == 200: data = response.json() return { 'access_token': data.get('access_token'), 'refresh_token': data.get('refresh_token'), } return None except httpx.HTTPError: return None def get_user_info(access_token: str) -> Optional[Dict[str, Any]]: """ Get current user information using access token. Args: access_token: Valid access token Returns: Dictionary containing user information if successful, None if request fails """ try: response = http_client.get( f"{API_BASE_URL}/auth/me", headers={"Authorization": f"Bearer {access_token}"}, timeout=10.0 ) if response.status_code == 200: return response.json() return None except httpx.HTTPError: return None def logout_user(refresh_token: str) -> bool: """ Logout user by revoking the refresh token. Args: refresh_token: Refresh token to revoke Returns: True if logout successful, False otherwise """ try: response = http_client.post( f"{API_BASE_URL}/auth/logout", json={"refresh_token": refresh_token}, timeout=10.0 ) return response.status_code == 200 except httpx.HTTPError: return False