Files
MyFastHtml/src/myfasthtml/auth/utils.py

303 lines
8.2 KiB
Python

"""
Authentication utilities for FastHTML application.
This module provides:
- JWT token validation and refresh logic
- Beforeware for protecting routes
- Helper functions for API communication
"""
from datetime import datetime, timezone
from typing import Optional, Dict, Any, List
import httpx
from fasthtml.common import RedirectResponse, Beforeware
from jose import jwt, JWTError
# Configuration
API_BASE_URL = "http://localhost:5001" # Base URL for FastAPI backend
JWT_SECRET = "jwt-secret-to-change" # Must match FastAPI secret
JWT_ALGORITHM = "HS256"
TOKEN_REFRESH_THRESHOLD_MINUTES = 5 # Refresh token if expires in less than 5 minutes
# Default patterns to skip authentication
DEFAULT_SKIP_PATTERNS = [
r'/favicon\.ico',
r'/static/.*',
r'.*\.css',
r'.*\.js',
r'/myfasthtml/.*\.css',
r'/myfasthtml/.*\.js',
'/login',
'/register',
'/logout',
]
http_client = httpx
def create_auth_beforeware(additional_patterns: Optional[List[str]] = None) -> Beforeware:
"""
Create a Beforeware instance for route protection.
Args:
additional_patterns: Optional list of additional URL patterns to skip authentication
Returns:
Beforeware instance configured for authentication
"""
patterns = DEFAULT_SKIP_PATTERNS.copy()
if additional_patterns:
patterns.extend(additional_patterns)
return Beforeware(auth_before, skip=patterns)
def auth_before(request, session):
"""
Beforeware function that runs before each protected route.
Checks authentication status and automatically refreshes tokens if needed.
Redirects to login if authentication fails.
Args:
request: Starlette request object
session: FastHTML session object
Returns:
RedirectResponse to login page if authentication fails, None otherwise
"""
# Get tokens from session
access_token = session.get('access_token')
refresh_token = session.get('refresh_token')
print(f"path={request.scope['path']}, {session=}, {access_token=}, {refresh_token=}")
# If no access token, redirect to login
if not access_token:
return RedirectResponse('/login', status_code=303)
# Validate access token and check expiration
try:
payload = decode_jwt(access_token)
exp_timestamp = payload.get('exp')
if exp_timestamp:
exp_datetime = datetime.fromtimestamp(exp_timestamp, tz=timezone.utc)
now = datetime.now(timezone.utc)
time_until_expiry = (exp_datetime - now).total_seconds() / 60
# If token expires in less than 5 minutes, try to refresh
if time_until_expiry < TOKEN_REFRESH_THRESHOLD_MINUTES:
if refresh_token:
new_tokens = refresh_access_token(refresh_token)
if new_tokens:
# Update session with new tokens
session['access_token'] = new_tokens['access_token']
session['refresh_token'] = new_tokens['refresh_token']
# Store auth info in request scope
request.scope['auth'] = session.get('user_info')
return None
else:
# Refresh failed, redirect to login
return RedirectResponse('/login', status_code=303)
else:
# No refresh token available, redirect to login
return RedirectResponse('/login', status_code=303)
# Token is valid, store auth info in request scope
request.scope['auth'] = session.get('user_info')
return None
except JWTError:
# Token is invalid or expired, try to refresh
if refresh_token:
new_tokens = refresh_access_token(refresh_token)
if new_tokens:
# Update session with new tokens
session['access_token'] = new_tokens['access_token']
session['refresh_token'] = new_tokens['refresh_token']
request.scope['auth'] = session.get('user_info')
return None
# Could not refresh, redirect to login
return RedirectResponse('/login', status_code=303)
def decode_jwt(token: str) -> Dict[str, Any]:
"""
Decode and validate a JWT token.
Args:
token: JWT token string
Returns:
Dictionary containing the token payload
Raises:
JWTError: If token is invalid or expired
"""
return jwt.decode(token, JWT_SECRET, algorithms=[JWT_ALGORITHM])
def check_token_expiry(token: str) -> Optional[float]:
"""
Check how many minutes until token expires.
Args:
token: JWT token string
Returns:
Minutes until expiration, or None if token is invalid
"""
try:
payload = decode_jwt(token)
exp_timestamp = payload.get('exp')
if exp_timestamp:
exp_datetime = datetime.fromtimestamp(exp_timestamp, tz=timezone.utc)
now = datetime.now(timezone.utc)
time_until_expiry = (exp_datetime - now).total_seconds() / 60
return time_until_expiry
return None
except JWTError:
return None
def login_user(email: str, password: str) -> Optional[Dict[str, Any]]:
"""
Authenticate user with email and password.
Args:
email: User email address
password: User password
Returns:
Dictionary containing access_token, refresh_token, and user_info if successful,
None if authentication fails
"""
try:
response = http_client.post(
f"{API_BASE_URL}/auth/login",
data={"username": email, "password": password},
headers={"Content-Type": "application/x-www-form-urlencoded"},
timeout=10.0
)
if response.status_code == 200:
data = response.json()
return {
'access_token': data.get('access_token'),
'refresh_token': data.get('refresh_token'),
'token_type': data.get('token_type'),
}
return None
except httpx.HTTPError:
return None
def register_user(email: str, username: str, password: str) -> Optional[Dict[str, Any]]:
"""
Register a new user.
Args:
email: User email address
username: User name
password: User password
Returns:
Dictionary containing success message if registration succeeds,
None if registration fails
"""
try:
response = http_client.post(
f"{API_BASE_URL}/auth/register",
json={"email": email, "username": username, "password": password},
timeout=10.0
)
if response.status_code in (200, 201):
return response.json()
return None
except httpx.HTTPError as ex:
return None
def refresh_access_token(refresh_token: str) -> Optional[Dict[str, Any]]:
"""
Refresh the access token using a refresh token.
Args:
refresh_token: Valid refresh token
Returns:
Dictionary containing new access_token and refresh_token if successful,
None if refresh fails
"""
try:
response = http_client.post(
f"{API_BASE_URL}/auth/refresh",
json={"refresh_token": refresh_token},
timeout=10.0
)
if response.status_code == 200:
data = response.json()
return {
'access_token': data.get('access_token'),
'refresh_token': data.get('refresh_token'),
}
return None
except httpx.HTTPError:
return None
def get_user_info(access_token: str) -> Optional[Dict[str, Any]]:
"""
Get current user information using access token.
Args:
access_token: Valid access token
Returns:
Dictionary containing user information if successful,
None if request fails
"""
try:
response = http_client.get(
f"{API_BASE_URL}/auth/me",
headers={"Authorization": f"Bearer {access_token}"},
timeout=10.0
)
if response.status_code == 200:
return response.json()
return None
except httpx.HTTPError:
return None
def logout_user(refresh_token: str) -> bool:
"""
Logout user by revoking the refresh token.
Args:
refresh_token: Refresh token to revoke
Returns:
True if logout successful, False otherwise
"""
try:
response = http_client.post(
f"{API_BASE_URL}/auth/logout",
json={"refresh_token": refresh_token},
timeout=10.0
)
return response.status_code == 200
except httpx.HTTPError:
return False