Files
MyAuth/tests/api/test_api_routes.py

874 lines
27 KiB
Python

"""
Unit tests for FastAPI authentication routes.
This module tests all authentication API endpoints using FastAPI's TestClient
and mocked dependencies to ensure proper behavior and error handling.
"""
from datetime import datetime
from unittest.mock import Mock
import pytest
from fastapi import FastAPI
from fastapi.testclient import TestClient
from myauth.api import create_auth_app
from myauth.core.auth import AuthService
from myauth.exceptions import (
UserAlreadyExistsError,
InvalidCredentialsError,
UserNotFoundError,
InvalidTokenError,
ExpiredTokenError,
RevokedTokenError,
AccountDisabledError
)
from myauth.models.token import AccessTokenResponse
from myauth.models.user import UserInDB
@pytest.fixture
def mock_auth_service():
"""
Create a mock AuthService for testing.
Returns:
Mock AuthService instance with all methods mocked.
"""
service = Mock(spec=AuthService)
return service
@pytest.fixture
def test_app(mock_auth_service):
"""
Create a FastAPI test application with auth router.
Args:
mock_auth_service: Mocked authentication service.
Returns:
FastAPI application configured for testing.
"""
app = FastAPI()
auth_app = create_auth_app(mock_auth_service)
app.mount("/auth", auth_app)
return app
@pytest.fixture
def client(test_app):
"""
Create a test client for the FastAPI application.
Args:
test_app: FastAPI test application.
Returns:
TestClient instance.
"""
return TestClient(test_app)
@pytest.fixture
def sample_user():
"""
Create a sample user for testing.
Returns:
UserInDB instance with sample data.
"""
return UserInDB(
id="user123",
email="test@example.com",
username="testuser",
hashed_password="hashed_password_here",
roles=["user"],
user_settings={},
is_verified=False,
is_active=True,
created_at=datetime.utcnow(),
updated_at=datetime.utcnow()
)
@pytest.fixture
def sample_tokens():
"""
Create sample access and refresh tokens.
Returns:
AccessTokenResponse with sample tokens.
"""
return AccessTokenResponse(
access_token="sample.access.token",
refresh_token="sample_refresh_token",
token_type="bearer"
)
def test_i_can_register_user(client, mock_auth_service, sample_user):
"""
Test successful user registration.
Verifies that a POST request to /auth/register with valid data
returns 201 status and the created user information.
"""
mock_auth_service.register.return_value = sample_user
response = client.post("/auth/register", json={
"email": "test@example.com",
"username": "testuser",
"password": "SecurePass123!",
"roles": ["user"],
"user_settings": {}
})
assert response.status_code == 201
data = response.json()
assert data["email"] == "test@example.com"
assert data["username"] == "testuser"
assert data["id"] == "user123"
assert "hashed_password" not in data
mock_auth_service.register.assert_called_once()
def test_i_cannot_register_with_existing_email(client, mock_auth_service):
"""
Test registration fails with existing email.
Verifies that attempting to register with an already registered
email returns 409 Conflict status.
"""
mock_auth_service.register.side_effect = UserAlreadyExistsError(
"User with this email already exists"
)
response = client.post("/auth/register", json={
"email": "existing@example.com",
"username": "testuser",
"password": "SecurePass123!",
"roles": [],
"user_settings": {}
})
assert response.status_code == 409
assert "already exists" in response.json()["detail"].lower()
def test_i_can_login(client, mock_auth_service, sample_user, sample_tokens):
"""
Test successful login.
Verifies that a POST request to /auth/login with valid credentials
returns access and refresh tokens.
"""
mock_auth_service.login.return_value = (sample_user, sample_tokens)
response = client.post("/auth/login", data={
"username": "test@example.com", # OAuth2 uses 'username' field
"password": "SecurePass123!"
})
assert response.status_code == 200
data = response.json()
assert data["access_token"] == "sample.access.token"
assert data["refresh_token"] == "sample_refresh_token"
assert data["token_type"] == "bearer"
mock_auth_service.login.assert_called_once_with("test@example.com", "SecurePass123!")
def test_i_cannot_login_with_invalid_credentials(client, mock_auth_service):
"""
Test login fails with invalid credentials.
Verifies that attempting to login with incorrect email or password
returns 401 Unauthorized status.
"""
mock_auth_service.login.side_effect = InvalidCredentialsError()
response = client.post("/auth/login", data={
"username": "test@example.com",
"password": "WrongPassword"
})
assert response.status_code == 401
assert "invalid" in response.json()["detail"].lower()
def test_i_cannot_login_with_disabled_account(client, mock_auth_service):
"""
Test login fails with disabled account.
Verifies that attempting to login to a disabled account
returns 403 Forbidden status.
"""
mock_auth_service.login.side_effect = AccountDisabledError()
response = client.post("/auth/login", data={
"username": "test@example.com",
"password": "SecurePass123!"
})
assert response.status_code == 403
assert "disabled" in response.json()["detail"].lower()
def test_i_can_refresh_token(client, mock_auth_service, sample_tokens):
"""
Test successful token refresh.
Verifies that a POST request to /auth/refresh with a valid refresh token
returns new access and refresh tokens.
"""
new_tokens = AccessTokenResponse(
access_token="new.access.token",
refresh_token="new_refresh_token",
token_type="bearer"
)
mock_auth_service.refresh_access_token.return_value = new_tokens
response = client.post("/auth/refresh", json={
"refresh_token": "sample_refresh_token"
})
assert response.status_code == 200
data = response.json()
assert data["access_token"] == "new.access.token"
assert data["refresh_token"] == "new_refresh_token"
mock_auth_service.refresh_access_token.assert_called_once_with("sample_refresh_token")
def test_i_cannot_refresh_with_invalid_token(client, mock_auth_service):
"""
Test token refresh fails with invalid token.
Verifies that attempting to refresh with an invalid token
returns 401 Unauthorized status.
"""
mock_auth_service.refresh_access_token.side_effect = InvalidTokenError(
"Invalid refresh token"
)
response = client.post("/auth/refresh", json={
"refresh_token": "invalid_token"
})
assert response.status_code == 401
assert "invalid" in response.json()["detail"].lower()
def test_i_cannot_refresh_with_expired_token(client, mock_auth_service):
"""
Test token refresh fails with expired token.
Verifies that attempting to refresh with an expired token
returns 401 Unauthorized status.
"""
mock_auth_service.refresh_access_token.side_effect = ExpiredTokenError()
response = client.post("/auth/refresh", json={
"refresh_token": "expired_token"
})
assert response.status_code == 401
assert "expired" in response.json()["detail"].lower()
def test_i_cannot_refresh_with_revoked_token(client, mock_auth_service):
"""
Test token refresh fails with revoked token.
Verifies that attempting to refresh with a revoked token
returns 401 Unauthorized status.
"""
mock_auth_service.refresh_access_token.side_effect = RevokedTokenError()
response = client.post("/auth/refresh", json={
"refresh_token": "revoked_token"
})
assert response.status_code == 401
assert "revoked" in response.json()["detail"].lower()
def test_i_can_logout(client, mock_auth_service):
"""
Test successful logout.
Verifies that a POST request to /auth/logout successfully
revokes the refresh token and returns 204 status.
"""
mock_auth_service.logout.return_value = True
response = client.post("/auth/logout", json={
"refresh_token": "sample_refresh_token"
})
assert response.status_code == 204
mock_auth_service.logout.assert_called_once_with("sample_refresh_token")
def test_i_can_request_password_reset(client, mock_auth_service):
"""
Test password reset request.
Verifies that a POST request to /auth/password-reset-request
generates a reset token for the given email.
"""
mock_auth_service.request_password_reset.return_value = "reset_token_123"
response = client.post("/auth/password-reset-request", json={
"email": "test@example.com"
})
assert response.status_code == 200
data = response.json()
assert "token" in data
assert data["token"] == "reset_token_123"
mock_auth_service.request_password_reset.assert_called_once_with("test@example.com")
def test_i_cannot_request_password_reset_for_unknown_email(client, mock_auth_service):
"""
Test password reset request fails for unknown email.
Verifies that requesting a password reset for a non-existent email
returns 404 Not Found status.
"""
mock_auth_service.request_password_reset.side_effect = UserNotFoundError(
"No user found with email"
)
response = client.post("/auth/password-reset-request", json={
"email": "unknown@example.com"
})
assert response.status_code == 404
def test_i_can_reset_password(client, mock_auth_service):
"""
Test successful password reset.
Verifies that a POST request to /auth/password-reset with valid
token and new password successfully resets the password.
"""
mock_auth_service.reset_password.return_value = True
response = client.post("/auth/password-reset", json={
"token": "reset_token_123",
"new_password": "NewSecurePass123!"
})
assert response.status_code == 200
data = response.json()
assert "success" in data["message"].lower()
mock_auth_service.reset_password.assert_called_once_with(
"reset_token_123",
"NewSecurePass123!"
)
def test_i_cannot_reset_password_with_invalid_token(client, mock_auth_service):
"""
Test password reset fails with invalid token.
Verifies that attempting to reset password with an invalid token
returns 401 Unauthorized status.
"""
mock_auth_service.reset_password.side_effect = InvalidTokenError(
"Invalid password reset token"
)
response = client.post("/auth/password-reset", json={
"token": "invalid_token",
"new_password": "NewSecurePass123!"
})
assert response.status_code == 401
def test_i_cannot_reset_password_with_expired_token(client, mock_auth_service):
"""
Test password reset fails with expired token.
Verifies that attempting to reset password with an expired token
returns 401 Unauthorized status.
"""
mock_auth_service.reset_password.side_effect = ExpiredTokenError(
"Password reset token has expired"
)
response = client.post("/auth/password-reset", json={
"token": "expired_token",
"new_password": "NewSecurePass123!"
})
assert response.status_code == 401
def test_i_can_request_email_verification(client, mock_auth_service):
"""
Test email verification request.
Verifies that a POST request to /auth/verify-email-request
generates a verification token for the given email.
"""
mock_auth_service.request_email_verification.return_value = "verify_token_jwt"
response = client.post("/auth/verify-email-request", json={
"email": "test@example.com"
})
assert response.status_code == 200
data = response.json()
assert "token" in data
assert data["token"] == "verify_token_jwt"
mock_auth_service.request_email_verification.assert_called_once_with("test@example.com")
def test_i_can_verify_email(client, mock_auth_service):
"""
Test successful email verification.
Verifies that a GET request to /auth/verify-email with a valid
token successfully verifies the email address.
"""
mock_auth_service.verify_email.return_value = True
response = client.get("/auth/verify-email?token=verify_token_jwt")
assert response.status_code == 200
data = response.json()
assert "success" in data["message"].lower()
mock_auth_service.verify_email.assert_called_once_with("verify_token_jwt")
def test_i_cannot_verify_email_with_invalid_token(client, mock_auth_service):
"""
Test email verification fails with invalid token.
Verifies that attempting to verify email with an invalid token
returns 401 Unauthorized status.
"""
mock_auth_service.verify_email.side_effect = InvalidTokenError(
"Invalid email verification token"
)
response = client.get("/auth/verify-email?token=invalid_token")
assert response.status_code == 401
def test_i_can_get_current_user(client, mock_auth_service, sample_user):
"""
Test retrieving current authenticated user.
Verifies that a GET request to /auth/me with a valid Bearer token
returns the current user's information.
"""
mock_auth_service.get_current_user.return_value = sample_user
response = client.get(
"/auth/me",
headers={"Authorization": "Bearer sample.access.token"}
)
assert response.status_code == 200
data = response.json()
assert data["email"] == "test@example.com"
assert data["username"] == "testuser"
assert data["id"] == "user123"
assert "hashed_password" not in data
mock_auth_service.get_current_user.assert_called_once_with("sample.access.token")
def test_i_cannot_access_protected_route_without_token(client):
"""
Test protected route fails without authentication token.
Verifies that attempting to access /auth/me without a Bearer token
returns 401 Unauthorized status.
"""
response = client.get("/auth/me")
assert response.status_code == 401
def test_i_cannot_access_protected_route_with_invalid_token(client, mock_auth_service):
"""
Test protected route fails with invalid token.
Verifies that attempting to access /auth/me with an invalid token
returns 401 Unauthorized status.
"""
mock_auth_service.get_current_user.side_effect = InvalidTokenError(
"Invalid access token"
)
response = client.get(
"/auth/me",
headers={"Authorization": "Bearer invalid.token"}
)
assert response.status_code == 401
def test_i_can_update_my_email(client, mock_auth_service, sample_user):
"""
Test user can update their own email.
Verifies that a PATCH request to /auth/me with a new email
successfully updates the user's email address.
"""
updated_user = sample_user.model_copy(update={"email": "newemail@example.com"})
mock_auth_service.get_current_user.return_value = sample_user
mock_auth_service.update_user.return_value = updated_user
response = client.patch(
"/auth/me",
headers={"Authorization": "Bearer sample.access.token"},
json={"email": "newemail@example.com"}
)
assert response.status_code == 200
data = response.json()
assert data["email"] == "newemail@example.com"
mock_auth_service.update_user.assert_called_once()
def test_i_can_update_my_username(client, mock_auth_service, sample_user):
"""
Test user can update their own username.
Verifies that a PATCH request to /auth/me with a new username
successfully updates the user's username.
"""
updated_user = sample_user.model_copy(update={"username": "newusername"})
mock_auth_service.get_current_user.return_value = sample_user
mock_auth_service.update_user.return_value = updated_user
response = client.patch(
"/auth/me",
headers={"Authorization": "Bearer sample.access.token"},
json={"username": "newusername"}
)
assert response.status_code == 200
data = response.json()
assert data["username"] == "newusername"
mock_auth_service.update_user.assert_called_once()
def test_i_can_update_my_password(client, mock_auth_service, sample_user):
"""
Test user can update their own password.
Verifies that a PATCH request to /auth/me with a new password
successfully updates the password (which will be hashed by the service).
"""
mock_auth_service.get_current_user.return_value = sample_user
mock_auth_service.update_user.return_value = sample_user
response = client.patch(
"/auth/me",
headers={"Authorization": "Bearer sample.access.token"},
json={"password": "NewSecurePass123!"}
)
assert response.status_code == 200
mock_auth_service.update_user.assert_called_once()
# Verify password was included in the update
call_args = mock_auth_service.update_user.call_args
assert call_args[1]["updates"].password == "NewSecurePass123!"
def test_i_can_update_my_password_and_preserve_session(client, mock_auth_service, sample_user):
"""
Test user can update password while preserving current session.
Verifies that when a refresh_token is provided in the request body,
it is passed to the service to preserve the current session.
"""
mock_auth_service.get_current_user.return_value = sample_user
mock_auth_service.update_user.return_value = sample_user
response = client.patch(
"/auth/me",
headers={"Authorization": "Bearer sample.access.token"},
json={
"password": "NewSecurePass123!",
"refresh_token": "current_refresh_token"
}
)
assert response.status_code == 200
mock_auth_service.update_user.assert_called_once()
# Verify refresh_token was passed to preserve session
call_args = mock_auth_service.update_user.call_args
assert call_args[1]["refresh_token"] == "current_refresh_token"
def test_i_can_update_my_user_settings(client, mock_auth_service, sample_user):
"""
Test user can update their own settings.
Verifies that a PATCH request to /auth/me with new user_settings
successfully updates the user's custom settings.
"""
new_settings = {"theme": "dark", "language": "fr", "notifications": True}
updated_user = sample_user.model_copy(update={"user_settings": new_settings})
mock_auth_service.get_current_user.return_value = sample_user
mock_auth_service.update_user.return_value = updated_user
response = client.patch(
"/auth/me",
headers={"Authorization": "Bearer sample.access.token"},
json={"user_settings": new_settings}
)
assert response.status_code == 200
data = response.json()
assert data["user_settings"] == new_settings
mock_auth_service.update_user.assert_called_once()
def test_i_can_update_multiple_fields_on_my_profile(client, mock_auth_service, sample_user):
"""
Test user can update multiple fields simultaneously.
Verifies that a PATCH request to /auth/me can update multiple
fields (email, username, user_settings) at once.
"""
updated_user = sample_user.model_copy(update={
"email": "multiemail@example.com",
"username": "multiuser",
"user_settings": {"theme": "light"}
})
mock_auth_service.get_current_user.return_value = sample_user
mock_auth_service.update_user.return_value = updated_user
response = client.patch(
"/auth/me",
headers={"Authorization": "Bearer sample.access.token"},
json={
"email": "multiemail@example.com",
"username": "multiuser",
"user_settings": {"theme": "light"}
}
)
assert response.status_code == 200
data = response.json()
assert data["email"] == "multiemail@example.com"
assert data["username"] == "multiuser"
assert data["user_settings"] == {"theme": "light"}
mock_auth_service.update_user.assert_called_once()
def test_i_cannot_update_my_profile_without_authentication(client):
"""
Test updating profile fails without authentication.
Verifies that a PATCH request to /auth/me without a Bearer token
returns 401 Unauthorized status.
"""
response = client.patch(
"/auth/me",
json={"username": "shouldfail"}
)
assert response.status_code == 401
def test_i_cannot_update_my_email_to_existing_email(client, mock_auth_service, sample_user):
"""
Test updating email fails if already exists.
Verifies that attempting to update email to an already registered
email returns 409 Conflict status.
"""
mock_auth_service.get_current_user.return_value = sample_user
mock_auth_service.update_user.side_effect = UserAlreadyExistsError(
"Email already in use"
)
response = client.patch(
"/auth/me",
headers={"Authorization": "Bearer sample.access.token"},
json={"email": "existing@example.com"}
)
assert response.status_code == 409
assert "already" in response.json()["detail"].lower()
def test_i_cannot_update_my_profile_with_invalid_token(client, mock_auth_service):
"""
Test updating profile fails with invalid token.
Verifies that attempting to access /auth/me with an invalid token
returns 401 Unauthorized status.
"""
mock_auth_service.get_current_user.side_effect = InvalidTokenError(
"Invalid access token"
)
response = client.patch(
"/auth/me",
headers={"Authorization": "Bearer invalid.token"},
json={"username": "shouldfail"}
)
assert response.status_code == 401
def test_admin_can_update_any_user(client, mock_auth_service, sample_user):
"""
Test admin can update any user.
Verifies that an admin can successfully update another user's
information via PATCH /auth/users/{user_id}.
"""
admin_user = sample_user.model_copy(update={"roles": ["admin"]})
target_user = sample_user.model_copy(update={"id": "target_user_id", "username": "targetuser"})
updated_user = target_user.model_copy(update={"username": "updatedusername"})
mock_auth_service.get_current_user.return_value = admin_user
mock_auth_service.update_user.return_value = updated_user
response = client.patch(
"/auth/users/target_user_id",
headers={"Authorization": "Bearer admin.access.token"},
json={"username": "updatedusername"}
)
assert response.status_code == 200
data = response.json()
assert data["username"] == "updatedusername"
mock_auth_service.update_user.assert_called_once()
def test_admin_can_update_user_roles(client, mock_auth_service, sample_user):
"""
Test admin can update user roles.
Verifies that an admin can change a user's roles, which is forbidden
for regular users on the /auth/me endpoint.
"""
admin_user = sample_user.model_copy(update={"roles": ["admin"]})
target_user = sample_user.model_copy(update={"id": "target_user_id", "roles": ["user"]})
updated_user = target_user.model_copy(update={"roles": ["admin", "moderator"]})
mock_auth_service.get_current_user.return_value = admin_user
mock_auth_service.update_user.return_value = updated_user
response = client.patch(
"/auth/users/target_user_id",
headers={"Authorization": "Bearer admin.access.token"},
json={"roles": ["admin", "moderator"]}
)
assert response.status_code == 200
data = response.json()
assert data["roles"] == ["admin", "moderator"]
mock_auth_service.update_user.assert_called_once()
def test_admin_can_update_user_is_active(client, mock_auth_service, sample_user):
"""
Test admin can update user active status.
Verifies that an admin can activate or deactivate a user account,
which is forbidden for regular users on the /auth/me endpoint.
"""
admin_user = sample_user.model_copy(update={"roles": ["admin"]})
target_user = sample_user.model_copy(update={"id": "target_user_id", "is_active": True})
updated_user = target_user.model_copy(update={"is_active": False})
mock_auth_service.get_current_user.return_value = admin_user
mock_auth_service.update_user.return_value = updated_user
response = client.patch(
"/auth/users/target_user_id",
headers={"Authorization": "Bearer admin.access.token"},
json={"is_active": False}
)
assert response.status_code == 200
# Note: is_active is not in UserResponse, so we just verify the call was made
mock_auth_service.update_user.assert_called_once()
call_args = mock_auth_service.update_user.call_args
assert call_args[1]["updates"].is_active is False
def test_admin_can_update_user_is_verified(client, mock_auth_service, sample_user):
"""
Test admin can update user verification status.
Verifies that an admin can change a user's email verification status,
which is forbidden for regular users on the /auth/me endpoint.
"""
admin_user = sample_user.model_copy(update={"roles": ["admin"]})
target_user = sample_user.model_copy(update={"id": "target_user_id", "is_verified": False})
updated_user = target_user.model_copy(update={"is_verified": True})
mock_auth_service.get_current_user.return_value = admin_user
mock_auth_service.update_user.return_value = updated_user
response = client.patch(
"/auth/users/target_user_id",
headers={"Authorization": "Bearer admin.access.token"},
json={"is_verified": True}
)
assert response.status_code == 200
# Note: is_verified is not in UserResponse, so we just verify the call was made
mock_auth_service.update_user.assert_called_once()
call_args = mock_auth_service.update_user.call_args
assert call_args[1]["updates"].is_verified is True
def test_non_admin_cannot_update_other_users(client, mock_auth_service, sample_user):
"""
Test non-admin cannot update other users.
Verifies that a regular user (without admin role) cannot access
the PATCH /auth/users/{user_id} endpoint and receives 403 Forbidden.
"""
regular_user = sample_user.model_copy(update={"roles": ["user"]})
mock_auth_service.get_current_user.return_value = regular_user
response = client.patch(
"/auth/users/other_user_id",
headers={"Authorization": "Bearer user.access.token"},
json={"username": "shouldfail"}
)
assert response.status_code == 403
assert "admin" in response.json()["detail"].lower()
def test_admin_cannot_update_non_existent_user(client, mock_auth_service, sample_user):
"""
Test admin cannot update non-existent user.
Verifies that attempting to update a non-existent user returns
404 Not Found status.
"""
admin_user = sample_user.model_copy(update={"roles": ["admin"]})
mock_auth_service.get_current_user.return_value = admin_user
mock_auth_service.update_user.side_effect = UserNotFoundError(
"User not found"
)
response = client.patch(
"/auth/users/non_existent_id",
headers={"Authorization": "Bearer admin.access.token"},
json={"username": "shouldfail"}
)
assert response.status_code == 404
assert "not found" in response.json()["detail"].lower()