Added authentication forms and routes
This commit is contained in:
302
src/myfasthtml/auth/utils.py
Normal file
302
src/myfasthtml/auth/utils.py
Normal file
@@ -0,0 +1,302 @@
|
||||
"""
|
||||
Authentication utilities for FastHTML application.
|
||||
|
||||
This module provides:
|
||||
- JWT token validation and refresh logic
|
||||
- Beforeware for protecting routes
|
||||
- Helper functions for API communication
|
||||
"""
|
||||
|
||||
from datetime import datetime, timezone
|
||||
from typing import Optional, Dict, Any, List
|
||||
|
||||
import httpx
|
||||
from fasthtml.common import RedirectResponse, Beforeware
|
||||
from jose import jwt, JWTError
|
||||
|
||||
# Configuration
|
||||
API_BASE_URL = "http://localhost:5001" # Base URL for FastAPI backend
|
||||
JWT_SECRET = "jwt-secret-to-change" # Must match FastAPI secret
|
||||
JWT_ALGORITHM = "HS256"
|
||||
TOKEN_REFRESH_THRESHOLD_MINUTES = 5 # Refresh token if expires in less than 5 minutes
|
||||
|
||||
# Default patterns to skip authentication
|
||||
DEFAULT_SKIP_PATTERNS = [
|
||||
r'/favicon\.ico',
|
||||
r'/static/.*',
|
||||
r'.*\.css',
|
||||
r'.*\.js',
|
||||
'/login',
|
||||
'/login-p',
|
||||
'/register',
|
||||
'/register-p',
|
||||
'/logout',
|
||||
]
|
||||
|
||||
http_client = httpx
|
||||
|
||||
|
||||
def create_auth_beforeware(additional_patterns: Optional[List[str]] = None) -> Beforeware:
|
||||
"""
|
||||
Create a Beforeware instance for route protection.
|
||||
|
||||
Args:
|
||||
additional_patterns: Optional list of additional URL patterns to skip authentication
|
||||
|
||||
Returns:
|
||||
Beforeware instance configured for authentication
|
||||
"""
|
||||
patterns = DEFAULT_SKIP_PATTERNS.copy()
|
||||
if additional_patterns:
|
||||
patterns.extend(additional_patterns)
|
||||
|
||||
return Beforeware(auth_before, skip=patterns)
|
||||
|
||||
|
||||
def auth_before(request, session):
|
||||
"""
|
||||
Beforeware function that runs before each protected route.
|
||||
|
||||
Checks authentication status and automatically refreshes tokens if needed.
|
||||
Redirects to login if authentication fails.
|
||||
|
||||
Args:
|
||||
request: Starlette request object
|
||||
session: FastHTML session object
|
||||
|
||||
Returns:
|
||||
RedirectResponse to login page if authentication fails, None otherwise
|
||||
"""
|
||||
# Get tokens from session
|
||||
access_token = session.get('access_token')
|
||||
refresh_token = session.get('refresh_token')
|
||||
print(f"path={request.scope['path']}, {session=}, {access_token=}, {refresh_token=}")
|
||||
# If no access token, redirect to login
|
||||
if not access_token:
|
||||
return RedirectResponse('/login', status_code=303)
|
||||
|
||||
# Validate access token and check expiration
|
||||
try:
|
||||
payload = decode_jwt(access_token)
|
||||
exp_timestamp = payload.get('exp')
|
||||
|
||||
if exp_timestamp:
|
||||
exp_datetime = datetime.fromtimestamp(exp_timestamp, tz=timezone.utc)
|
||||
now = datetime.now(timezone.utc)
|
||||
time_until_expiry = (exp_datetime - now).total_seconds() / 60
|
||||
|
||||
# If token expires in less than 5 minutes, try to refresh
|
||||
if time_until_expiry < TOKEN_REFRESH_THRESHOLD_MINUTES:
|
||||
if refresh_token:
|
||||
new_tokens = refresh_access_token(refresh_token)
|
||||
if new_tokens:
|
||||
# Update session with new tokens
|
||||
session['access_token'] = new_tokens['access_token']
|
||||
session['refresh_token'] = new_tokens['refresh_token']
|
||||
# Store auth info in request scope
|
||||
request.scope['auth'] = session.get('user_info')
|
||||
return None
|
||||
else:
|
||||
# Refresh failed, redirect to login
|
||||
return RedirectResponse('/login', status_code=303)
|
||||
else:
|
||||
# No refresh token available, redirect to login
|
||||
return RedirectResponse('/login', status_code=303)
|
||||
|
||||
# Token is valid, store auth info in request scope
|
||||
request.scope['auth'] = session.get('user_info')
|
||||
return None
|
||||
|
||||
except JWTError:
|
||||
# Token is invalid or expired, try to refresh
|
||||
if refresh_token:
|
||||
new_tokens = refresh_access_token(refresh_token)
|
||||
if new_tokens:
|
||||
# Update session with new tokens
|
||||
session['access_token'] = new_tokens['access_token']
|
||||
session['refresh_token'] = new_tokens['refresh_token']
|
||||
request.scope['auth'] = session.get('user_info')
|
||||
return None
|
||||
|
||||
# Could not refresh, redirect to login
|
||||
return RedirectResponse('/login', status_code=303)
|
||||
|
||||
|
||||
def decode_jwt(token: str) -> Dict[str, Any]:
|
||||
"""
|
||||
Decode and validate a JWT token.
|
||||
|
||||
Args:
|
||||
token: JWT token string
|
||||
|
||||
Returns:
|
||||
Dictionary containing the token payload
|
||||
|
||||
Raises:
|
||||
JWTError: If token is invalid or expired
|
||||
"""
|
||||
return jwt.decode(token, JWT_SECRET, algorithms=[JWT_ALGORITHM])
|
||||
|
||||
|
||||
def check_token_expiry(token: str) -> Optional[float]:
|
||||
"""
|
||||
Check how many minutes until token expires.
|
||||
|
||||
Args:
|
||||
token: JWT token string
|
||||
|
||||
Returns:
|
||||
Minutes until expiration, or None if token is invalid
|
||||
"""
|
||||
try:
|
||||
payload = decode_jwt(token)
|
||||
exp_timestamp = payload.get('exp')
|
||||
|
||||
if exp_timestamp:
|
||||
exp_datetime = datetime.fromtimestamp(exp_timestamp, tz=timezone.utc)
|
||||
now = datetime.now(timezone.utc)
|
||||
time_until_expiry = (exp_datetime - now).total_seconds() / 60
|
||||
return time_until_expiry
|
||||
|
||||
return None
|
||||
except JWTError:
|
||||
return None
|
||||
|
||||
|
||||
def login_user(email: str, password: str) -> Optional[Dict[str, Any]]:
|
||||
"""
|
||||
Authenticate user with email and password.
|
||||
|
||||
Args:
|
||||
email: User email address
|
||||
password: User password
|
||||
|
||||
Returns:
|
||||
Dictionary containing access_token, refresh_token, and user_info if successful,
|
||||
None if authentication fails
|
||||
"""
|
||||
try:
|
||||
response = http_client.post(
|
||||
f"{API_BASE_URL}/auth/login",
|
||||
data={"username": email, "password": password},
|
||||
headers={"Content-Type": "application/x-www-form-urlencoded"},
|
||||
timeout=10.0
|
||||
)
|
||||
|
||||
if response.status_code == 200:
|
||||
data = response.json()
|
||||
return {
|
||||
'access_token': data.get('access_token'),
|
||||
'refresh_token': data.get('refresh_token'),
|
||||
'token_type': data.get('token_type'),
|
||||
}
|
||||
|
||||
return None
|
||||
except httpx.HTTPError:
|
||||
return None
|
||||
|
||||
|
||||
def register_user(email: str, username: str, password: str) -> Optional[Dict[str, Any]]:
|
||||
"""
|
||||
Register a new user.
|
||||
|
||||
Args:
|
||||
email: User email address
|
||||
username: User name
|
||||
password: User password
|
||||
|
||||
Returns:
|
||||
Dictionary containing success message if registration succeeds,
|
||||
None if registration fails
|
||||
"""
|
||||
try:
|
||||
response = http_client.post(
|
||||
f"{API_BASE_URL}/auth/register",
|
||||
json={"email": email, "username": username, "password": password},
|
||||
timeout=10.0
|
||||
)
|
||||
|
||||
if response.status_code in (200, 201):
|
||||
return response.json()
|
||||
|
||||
return None
|
||||
except httpx.HTTPError as ex:
|
||||
return None
|
||||
|
||||
|
||||
def refresh_access_token(refresh_token: str) -> Optional[Dict[str, Any]]:
|
||||
"""
|
||||
Refresh the access token using a refresh token.
|
||||
|
||||
Args:
|
||||
refresh_token: Valid refresh token
|
||||
|
||||
Returns:
|
||||
Dictionary containing new access_token and refresh_token if successful,
|
||||
None if refresh fails
|
||||
"""
|
||||
try:
|
||||
response = http_client.post(
|
||||
f"{API_BASE_URL}/auth/refresh",
|
||||
json={"refresh_token": refresh_token},
|
||||
timeout=10.0
|
||||
)
|
||||
|
||||
if response.status_code == 200:
|
||||
data = response.json()
|
||||
return {
|
||||
'access_token': data.get('access_token'),
|
||||
'refresh_token': data.get('refresh_token'),
|
||||
}
|
||||
|
||||
return None
|
||||
except httpx.HTTPError:
|
||||
return None
|
||||
|
||||
|
||||
def get_user_info(access_token: str) -> Optional[Dict[str, Any]]:
|
||||
"""
|
||||
Get current user information using access token.
|
||||
|
||||
Args:
|
||||
access_token: Valid access token
|
||||
|
||||
Returns:
|
||||
Dictionary containing user information if successful,
|
||||
None if request fails
|
||||
"""
|
||||
try:
|
||||
response = http_client.get(
|
||||
f"{API_BASE_URL}/auth/me",
|
||||
headers={"Authorization": f"Bearer {access_token}"},
|
||||
timeout=10.0
|
||||
)
|
||||
|
||||
if response.status_code == 200:
|
||||
return response.json()
|
||||
|
||||
return None
|
||||
except httpx.HTTPError:
|
||||
return None
|
||||
|
||||
|
||||
def logout_user(refresh_token: str) -> bool:
|
||||
"""
|
||||
Logout user by revoking the refresh token.
|
||||
|
||||
Args:
|
||||
refresh_token: Refresh token to revoke
|
||||
|
||||
Returns:
|
||||
True if logout successful, False otherwise
|
||||
"""
|
||||
try:
|
||||
response = http_client.post(
|
||||
f"{API_BASE_URL}/auth/logout",
|
||||
json={"refresh_token": refresh_token},
|
||||
timeout=10.0
|
||||
)
|
||||
|
||||
return response.status_code == 200
|
||||
except httpx.HTTPError:
|
||||
return False
|
||||
Reference in New Issue
Block a user