300 lines
8.1 KiB
Python
300 lines
8.1 KiB
Python
"""
|
|
Authentication utilities for FastHTML application.
|
|
|
|
This module provides:
|
|
- JWT token validation and refresh logic
|
|
- Beforeware for protecting routes
|
|
- Helper functions for API communication
|
|
"""
|
|
|
|
from datetime import datetime, timezone
|
|
from typing import Optional, Dict, Any, List
|
|
|
|
import httpx
|
|
from fasthtml.common import RedirectResponse, Beforeware
|
|
from jose import jwt, JWTError
|
|
|
|
# Configuration
|
|
API_BASE_URL = "http://localhost:5001" # Base URL for FastAPI backend
|
|
JWT_SECRET = "jwt-secret-to-change" # Must match FastAPI secret
|
|
JWT_ALGORITHM = "HS256"
|
|
TOKEN_REFRESH_THRESHOLD_MINUTES = 5 # Refresh token if expires in less than 5 minutes
|
|
|
|
# Default patterns to skip authentication
|
|
DEFAULT_SKIP_PATTERNS = [
|
|
r'/favicon\.ico',
|
|
r'/static/.*',
|
|
r'.*\.css',
|
|
r'.*\.js',
|
|
'/login',
|
|
'/login-p',
|
|
'/register',
|
|
'/register-p',
|
|
'/logout',
|
|
]
|
|
|
|
|
|
def create_auth_beforeware(additional_patterns: Optional[List[str]] = None) -> Beforeware:
|
|
"""
|
|
Create a Beforeware instance for route protection.
|
|
|
|
Args:
|
|
additional_patterns: Optional list of additional URL patterns to skip authentication
|
|
|
|
Returns:
|
|
Beforeware instance configured for authentication
|
|
"""
|
|
patterns = DEFAULT_SKIP_PATTERNS.copy()
|
|
if additional_patterns:
|
|
patterns.extend(additional_patterns)
|
|
|
|
return Beforeware(auth_before, skip=patterns)
|
|
|
|
|
|
def auth_before(request, session):
|
|
"""
|
|
Beforeware function that runs before each protected route.
|
|
|
|
Checks authentication status and automatically refreshes tokens if needed.
|
|
Redirects to login if authentication fails.
|
|
|
|
Args:
|
|
request: Starlette request object
|
|
session: FastHTML session object
|
|
|
|
Returns:
|
|
RedirectResponse to login page if authentication fails, None otherwise
|
|
"""
|
|
# Get tokens from session
|
|
access_token = session.get('access_token')
|
|
refresh_token = session.get('refresh_token')
|
|
print(f"path={request.scope['path']}, {session=}, {access_token=}, {refresh_token=}")
|
|
# If no access token, redirect to login
|
|
if not access_token:
|
|
return RedirectResponse('/login', status_code=303)
|
|
|
|
# Validate access token and check expiration
|
|
try:
|
|
payload = decode_jwt(access_token)
|
|
exp_timestamp = payload.get('exp')
|
|
|
|
if exp_timestamp:
|
|
exp_datetime = datetime.fromtimestamp(exp_timestamp, tz=timezone.utc)
|
|
now = datetime.now(timezone.utc)
|
|
time_until_expiry = (exp_datetime - now).total_seconds() / 60
|
|
|
|
# If token expires in less than 5 minutes, try to refresh
|
|
if time_until_expiry < TOKEN_REFRESH_THRESHOLD_MINUTES:
|
|
if refresh_token:
|
|
new_tokens = refresh_access_token(refresh_token)
|
|
if new_tokens:
|
|
# Update session with new tokens
|
|
session['access_token'] = new_tokens['access_token']
|
|
session['refresh_token'] = new_tokens['refresh_token']
|
|
# Store auth info in request scope
|
|
request.scope['auth'] = session.get('user_info')
|
|
return None
|
|
else:
|
|
# Refresh failed, redirect to login
|
|
return RedirectResponse('/login', status_code=303)
|
|
else:
|
|
# No refresh token available, redirect to login
|
|
return RedirectResponse('/login', status_code=303)
|
|
|
|
# Token is valid, store auth info in request scope
|
|
request.scope['auth'] = session.get('user_info')
|
|
return None
|
|
|
|
except JWTError:
|
|
# Token is invalid or expired, try to refresh
|
|
if refresh_token:
|
|
new_tokens = refresh_access_token(refresh_token)
|
|
if new_tokens:
|
|
# Update session with new tokens
|
|
session['access_token'] = new_tokens['access_token']
|
|
session['refresh_token'] = new_tokens['refresh_token']
|
|
request.scope['auth'] = session.get('user_info')
|
|
return None
|
|
|
|
# Could not refresh, redirect to login
|
|
return RedirectResponse('/login', status_code=303)
|
|
|
|
|
|
def decode_jwt(token: str) -> Dict[str, Any]:
|
|
"""
|
|
Decode and validate a JWT token.
|
|
|
|
Args:
|
|
token: JWT token string
|
|
|
|
Returns:
|
|
Dictionary containing the token payload
|
|
|
|
Raises:
|
|
JWTError: If token is invalid or expired
|
|
"""
|
|
return jwt.decode(token, JWT_SECRET, algorithms=[JWT_ALGORITHM])
|
|
|
|
|
|
def check_token_expiry(token: str) -> Optional[float]:
|
|
"""
|
|
Check how many minutes until token expires.
|
|
|
|
Args:
|
|
token: JWT token string
|
|
|
|
Returns:
|
|
Minutes until expiration, or None if token is invalid
|
|
"""
|
|
try:
|
|
payload = decode_jwt(token)
|
|
exp_timestamp = payload.get('exp')
|
|
|
|
if exp_timestamp:
|
|
exp_datetime = datetime.fromtimestamp(exp_timestamp, tz=timezone.utc)
|
|
now = datetime.now(timezone.utc)
|
|
time_until_expiry = (exp_datetime - now).total_seconds() / 60
|
|
return time_until_expiry
|
|
|
|
return None
|
|
except JWTError:
|
|
return None
|
|
|
|
|
|
def login_user(email: str, password: str) -> Optional[Dict[str, Any]]:
|
|
"""
|
|
Authenticate user with email and password.
|
|
|
|
Args:
|
|
email: User email address
|
|
password: User password
|
|
|
|
Returns:
|
|
Dictionary containing access_token, refresh_token, and user_info if successful,
|
|
None if authentication fails
|
|
"""
|
|
try:
|
|
response = httpx.post(
|
|
f"{API_BASE_URL}/auth/login",
|
|
data={"username": email, "password": password},
|
|
headers={"Content-Type": "application/x-www-form-urlencoded"},
|
|
timeout=10.0
|
|
)
|
|
|
|
if response.status_code == 200:
|
|
data = response.json()
|
|
return {
|
|
'access_token': data.get('access_token'),
|
|
'refresh_token': data.get('refresh_token'),
|
|
'token_type': data.get('token_type'),
|
|
}
|
|
|
|
return None
|
|
except httpx.HTTPError:
|
|
return None
|
|
|
|
|
|
def register_user(email: str, username: str, password: str) -> Optional[Dict[str, Any]]:
|
|
"""
|
|
Register a new user.
|
|
|
|
Args:
|
|
email: User email address
|
|
password: User password
|
|
|
|
Returns:
|
|
Dictionary containing success message if registration succeeds,
|
|
None if registration fails
|
|
"""
|
|
try:
|
|
response = httpx.post(
|
|
f"{API_BASE_URL}/auth/register",
|
|
json={"email": email, "username": username, "password": password},
|
|
timeout=10.0
|
|
)
|
|
|
|
if response.status_code == 200:
|
|
return response.json()
|
|
|
|
return None
|
|
except httpx.HTTPError:
|
|
return None
|
|
|
|
|
|
def refresh_access_token(refresh_token: str) -> Optional[Dict[str, Any]]:
|
|
"""
|
|
Refresh the access token using a refresh token.
|
|
|
|
Args:
|
|
refresh_token: Valid refresh token
|
|
|
|
Returns:
|
|
Dictionary containing new access_token and refresh_token if successful,
|
|
None if refresh fails
|
|
"""
|
|
try:
|
|
response = httpx.post(
|
|
f"{API_BASE_URL}/auth/refresh",
|
|
json={"refresh_token": refresh_token},
|
|
timeout=10.0
|
|
)
|
|
|
|
if response.status_code == 200:
|
|
data = response.json()
|
|
return {
|
|
'access_token': data.get('access_token'),
|
|
'refresh_token': data.get('refresh_token'),
|
|
}
|
|
|
|
return None
|
|
except httpx.HTTPError:
|
|
return None
|
|
|
|
|
|
def get_user_info(access_token: str) -> Optional[Dict[str, Any]]:
|
|
"""
|
|
Get current user information using access token.
|
|
|
|
Args:
|
|
access_token: Valid access token
|
|
|
|
Returns:
|
|
Dictionary containing user information if successful,
|
|
None if request fails
|
|
"""
|
|
try:
|
|
response = httpx.get(
|
|
f"{API_BASE_URL}/auth/me",
|
|
headers={"Authorization": f"Bearer {access_token}"},
|
|
timeout=10.0
|
|
)
|
|
|
|
if response.status_code == 200:
|
|
return response.json()
|
|
|
|
return None
|
|
except httpx.HTTPError:
|
|
return None
|
|
|
|
|
|
def logout_user(refresh_token: str) -> bool:
|
|
"""
|
|
Logout user by revoking the refresh token.
|
|
|
|
Args:
|
|
refresh_token: Refresh token to revoke
|
|
|
|
Returns:
|
|
True if logout successful, False otherwise
|
|
"""
|
|
try:
|
|
response = httpx.post(
|
|
f"{API_BASE_URL}/auth/logout",
|
|
json={"refresh_token": refresh_token},
|
|
timeout=10.0
|
|
)
|
|
|
|
return response.status_code == 200
|
|
except httpx.HTTPError:
|
|
return False
|