Files
MyAuth/src/myauth/api/routes.py

380 lines
12 KiB
Python

"""
FastAPI routes for authentication module.
This module provides ready-to-use FastAPI routes for all authentication
operations. Routes are organized in an APIRouter with /auth prefix.
"""
from typing import Annotated
from fastapi import Depends, HTTPException, status, FastAPI
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from ..core.auth import AuthService
from ..exceptions import AuthError
from ..models.email_verification import (
EmailVerificationRequest,
PasswordResetRequest,
PasswordResetConfirm
)
from ..models.token import AccessTokenResponse, RefreshTokenRequest
from ..models.user import UserCreate, UserResponse
# OAuth2 scheme for token authentication
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/login")
def create_auth_app(auth_service: AuthService) -> FastAPI:
"""
Create and configure the authentication router.
This factory function creates an APIRouter with all authentication
endpoints configured. The router includes automatic exception handling
for authentication errors.
Args:
auth_service: Configured authentication service instance.
Returns:
Configured APIRouter ready to be included in a FastAPI app.
Example:
>>> from fastapi import FastAPI
>>> from myauth.api.routes import create_auth_app
>>>
>>> app = FastAPI()
>>> auth_api = create_auth_app(auth_service)
>>> app.mount(auth_api)
"""
auth_app = FastAPI(prefix="/auth", tags=["authentication"])
def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]) -> UserResponse:
"""
Dependency to extract and validate the current user from access token.
This dependency can be used in any route that requires authentication.
It extracts the Bearer token from the Authorization header and validates it.
Args:
token: JWT access token from Authorization header.
Returns:
The current authenticated user (as UserResponse).
Raises:
HTTPException: 401 if token is invalid or expired.
Example:
>>> @auth_app.get("/protected")
>>> def protected_route(user: UserResponse = Depends(get_current_user)):
>>> return {"user_id": user.id}
"""
try:
user = auth_service.get_current_user(token)
return UserResponse(
id=user.id,
email=user.email,
username=user.username,
roles=user.roles,
user_settings=user.user_settings,
created_at=user.created_at,
updated_at=user.updated_at
)
except AuthError as e:
raise HTTPException(
status_code=e.status_code,
detail=e.message
)
@auth_app.post(
"/register",
response_model=UserResponse,
status_code=status.HTTP_201_CREATED,
summary="Register a new user",
description="Create a new user account with email, username, and password."
)
def register(user_data: UserCreate) -> UserResponse:
"""
Register a new user.
Creates a new user account with the provided credentials. The password
is automatically hashed before storage. Email verification is optional
and the account is created with is_verified=False.
Args:
user_data: User registration data including email, username, and password.
Returns:
The created user information (without password).
Raises:
HTTPException: 409 if email already exists.
HTTPException: 422 if validation fails (password strength, etc.).
"""
try:
user = auth_service.register(user_data)
return UserResponse(
id=user.id,
email=user.email,
username=user.username,
roles=user.roles,
user_settings=user.user_settings,
created_at=user.created_at,
updated_at=user.updated_at
)
except AuthError as e:
raise HTTPException(
status_code=e.status_code,
detail=e.message
)
@auth_app.post(
"/login",
response_model=AccessTokenResponse,
summary="Login with email and password",
description="Authenticate a user and receive access and refresh tokens."
)
def login(form_data: Annotated[OAuth2PasswordRequestForm, Depends()]) -> AccessTokenResponse:
"""
Authenticate a user and generate tokens.
This endpoint accepts form data with username (which should contain the email)
and password. It returns both access and refresh tokens upon successful authentication.
Note: OAuth2PasswordRequestForm uses 'username' field, but we treat it as email.
Args:
form_data: OAuth2 form with username (email) and password fields.
Returns:
Access token (JWT) and refresh token with token_type="bearer".
Raises:
HTTPException: 401 if credentials are invalid.
HTTPException: 403 if account is disabled.
"""
try:
# OAuth2PasswordRequestForm uses 'username' but we treat it as email
user, tokens = auth_service.login(form_data.username, form_data.password)
return tokens
except AuthError as e:
raise HTTPException(
status_code=e.status_code,
detail=e.message
)
@auth_app.post(
"/logout",
status_code=status.HTTP_204_NO_CONTENT,
summary="Logout user",
description="Revoke the refresh token to logout the user."
)
def logout(request: RefreshTokenRequest) -> None:
"""
Logout a user by revoking their refresh token.
This prevents the refresh token from being used to obtain new access tokens.
The current access token remains valid until it expires naturally (30 minutes).
Args:
request: Request body containing the refresh token to revoke.
Returns:
204 No Content on success.
"""
auth_service.logout(request.refresh_token)
return None
@auth_app.post(
"/refresh",
response_model=AccessTokenResponse,
summary="Refresh access token",
description="Exchange a refresh token for new access and refresh tokens."
)
def refresh_token(request: RefreshTokenRequest) -> AccessTokenResponse:
"""
Obtain a new access token using a refresh token.
This endpoint allows clients to obtain a new access token without
requiring the user to re-enter their password. The old refresh token
is revoked and a new one is issued.
Args:
request: Request body containing the refresh token.
Returns:
New access token and refresh token.
Raises:
HTTPException: 401 if refresh token is invalid, expired, or revoked.
"""
try:
tokens = auth_service.refresh_access_token(request.refresh_token)
return tokens
except AuthError as e:
raise HTTPException(
status_code=e.status_code,
detail=e.message
)
@auth_app.post(
"/password-reset-request",
status_code=status.HTTP_200_OK,
summary="Request password reset",
description="Generate a password reset token and return it (to be sent via email)."
)
def request_password_reset(request: PasswordResetRequest) -> dict:
"""
Request a password reset token.
This endpoint generates a secure token that can be used to reset the password.
In production, this token should be sent via email. For this module, the token
is returned in the response so the consuming application can handle email delivery.
Args:
request: Request body containing the email address.
Returns:
Dictionary with the reset token and a message.
Raises:
HTTPException: 404 if email is not registered.
"""
try:
token = auth_service.request_password_reset(request.email)
return {
"message": "Password reset token generated",
"token": token
}
except AuthError as e:
raise HTTPException(
status_code=e.status_code,
detail=e.message
)
@auth_app.post(
"/password-reset",
status_code=status.HTTP_200_OK,
summary="Reset password with token",
description="Reset user password using a valid reset token."
)
def reset_password(request: PasswordResetConfirm) -> dict:
"""
Reset a user's password using a reset token.
This endpoint validates the reset token and updates the user's password.
All existing refresh tokens are revoked for security.
Args:
request: Request body containing the reset token and new password.
Returns:
Success message.
Raises:
HTTPException: 401 if token is invalid, expired, or already used.
HTTPException: 422 if new password doesn't meet requirements.
"""
try:
auth_service.reset_password(request.token, request.new_password)
return {"message": "Password reset successfully"}
except AuthError as e:
raise HTTPException(
status_code=e.status_code,
detail=e.message
)
@auth_app.post(
"/verify-email-request",
status_code=status.HTTP_200_OK,
summary="Request email verification",
description="Generate an email verification token and return it (to be sent via email)."
)
def request_email_verification(request: EmailVerificationRequest) -> dict:
"""
Request an email verification token.
This endpoint generates a JWT token for email verification. In production,
this token should be sent via email with a verification link. For this module,
the token is returned in the response so the consuming application can handle
email delivery.
Args:
request: Request body containing the email address.
Returns:
Dictionary with the verification token and a message.
Raises:
HTTPException: 404 if email is not registered.
"""
try:
token = auth_service.request_email_verification(request.email)
return {
"message": "Email verification token generated",
"token": token
}
except AuthError as e:
raise HTTPException(
status_code=e.status_code,
detail=e.message
)
@auth_app.get(
"/verify-email",
status_code=status.HTTP_200_OK,
summary="Verify email with token",
description="Verify user email using a verification token from query parameter."
)
def verify_email(token: str) -> dict:
"""
Verify a user's email address.
This endpoint validates the verification token and marks the user's
email as verified. It uses a query parameter so it can be easily
accessed via a link in an email.
Args:
token: Email verification token (JWT) from query parameter.
Returns:
Success message.
Raises:
HTTPException: 401 if token is invalid or expired.
"""
try:
auth_service.verify_email(token)
return {"message": "Email verified successfully"}
except AuthError as e:
raise HTTPException(
status_code=e.status_code,
detail=e.message
)
@auth_app.get(
"/me",
response_model=UserResponse,
summary="Get current user",
description="Get information about the currently authenticated user."
)
def get_me(current_user: Annotated[UserResponse, Depends(get_current_user)]) -> UserResponse:
"""
Get current authenticated user information.
This is a protected route that requires a valid access token in the
Authorization header (Bearer token).
Args:
current_user: The authenticated user (injected by dependency).
Returns:
Current user information.
Raises:
HTTPException: 401 if token is invalid or expired.
"""
return current_user
return auth_app