""" Unit tests for FastAPI authentication routes. This module tests all authentication API endpoints using FastAPI's TestClient and mocked dependencies to ensure proper behavior and error handling. """ from datetime import datetime from unittest.mock import Mock import pytest from fastapi import FastAPI from fastapi.testclient import TestClient from myauth.api import create_auth_app from myauth.core.auth import AuthService from myauth.exceptions import ( UserAlreadyExistsError, InvalidCredentialsError, UserNotFoundError, InvalidTokenError, ExpiredTokenError, RevokedTokenError, AccountDisabledError ) from myauth.models.token import AccessTokenResponse from myauth.models.user import UserInDB @pytest.fixture def mock_auth_service(): """ Create a mock AuthService for testing. Returns: Mock AuthService instance with all methods mocked. """ service = Mock(spec=AuthService) return service @pytest.fixture def test_app(mock_auth_service): """ Create a FastAPI test application with auth router. Args: mock_auth_service: Mocked authentication service. Returns: FastAPI application configured for testing. """ app = FastAPI() auth_app = create_auth_app(mock_auth_service) app.mount("/auth", auth_app) return app @pytest.fixture def client(test_app): """ Create a test client for the FastAPI application. Args: test_app: FastAPI test application. Returns: TestClient instance. """ return TestClient(test_app) @pytest.fixture def sample_user(): """ Create a sample user for testing. Returns: UserInDB instance with sample data. """ return UserInDB( id="user123", email="test@example.com", username="testuser", hashed_password="hashed_password_here", roles=["user"], user_settings={}, is_verified=False, is_active=True, created_at=datetime.utcnow(), updated_at=datetime.utcnow() ) @pytest.fixture def sample_tokens(): """ Create sample access and refresh tokens. Returns: AccessTokenResponse with sample tokens. """ return AccessTokenResponse( access_token="sample.access.token", refresh_token="sample_refresh_token", token_type="bearer" ) def test_i_can_register_user(client, mock_auth_service, sample_user): """ Test successful user registration. Verifies that a POST request to /auth/register with valid data returns 201 status and the created user information. """ mock_auth_service.register.return_value = sample_user response = client.post("/auth/register", json={ "email": "test@example.com", "username": "testuser", "password": "SecurePass123!", "roles": ["user"], "user_settings": {} }) assert response.status_code == 201 data = response.json() assert data["email"] == "test@example.com" assert data["username"] == "testuser" assert data["id"] == "user123" assert "hashed_password" not in data mock_auth_service.register.assert_called_once() def test_i_cannot_register_with_existing_email(client, mock_auth_service): """ Test registration fails with existing email. Verifies that attempting to register with an already registered email returns 409 Conflict status. """ mock_auth_service.register.side_effect = UserAlreadyExistsError( "User with this email already exists" ) response = client.post("/auth/register", json={ "email": "existing@example.com", "username": "testuser", "password": "SecurePass123!", "roles": [], "user_settings": {} }) assert response.status_code == 409 assert "already exists" in response.json()["detail"].lower() def test_i_can_login(client, mock_auth_service, sample_user, sample_tokens): """ Test successful login. Verifies that a POST request to /auth/login with valid credentials returns access and refresh tokens. """ mock_auth_service.login.return_value = (sample_user, sample_tokens) response = client.post("/auth/login", data={ "username": "test@example.com", # OAuth2 uses 'username' field "password": "SecurePass123!" }) assert response.status_code == 200 data = response.json() assert data["access_token"] == "sample.access.token" assert data["refresh_token"] == "sample_refresh_token" assert data["token_type"] == "bearer" mock_auth_service.login.assert_called_once_with("test@example.com", "SecurePass123!") def test_i_cannot_login_with_invalid_credentials(client, mock_auth_service): """ Test login fails with invalid credentials. Verifies that attempting to login with incorrect email or password returns 401 Unauthorized status. """ mock_auth_service.login.side_effect = InvalidCredentialsError() response = client.post("/auth/login", data={ "username": "test@example.com", "password": "WrongPassword" }) assert response.status_code == 401 assert "invalid" in response.json()["detail"].lower() def test_i_cannot_login_with_disabled_account(client, mock_auth_service): """ Test login fails with disabled account. Verifies that attempting to login to a disabled account returns 403 Forbidden status. """ mock_auth_service.login.side_effect = AccountDisabledError() response = client.post("/auth/login", data={ "username": "test@example.com", "password": "SecurePass123!" }) assert response.status_code == 403 assert "disabled" in response.json()["detail"].lower() def test_i_can_refresh_token(client, mock_auth_service, sample_tokens): """ Test successful token refresh. Verifies that a POST request to /auth/refresh with a valid refresh token returns new access and refresh tokens. """ new_tokens = AccessTokenResponse( access_token="new.access.token", refresh_token="new_refresh_token", token_type="bearer" ) mock_auth_service.refresh_access_token.return_value = new_tokens response = client.post("/auth/refresh", json={ "refresh_token": "sample_refresh_token" }) assert response.status_code == 200 data = response.json() assert data["access_token"] == "new.access.token" assert data["refresh_token"] == "new_refresh_token" mock_auth_service.refresh_access_token.assert_called_once_with("sample_refresh_token") def test_i_cannot_refresh_with_invalid_token(client, mock_auth_service): """ Test token refresh fails with invalid token. Verifies that attempting to refresh with an invalid token returns 401 Unauthorized status. """ mock_auth_service.refresh_access_token.side_effect = InvalidTokenError( "Invalid refresh token" ) response = client.post("/auth/refresh", json={ "refresh_token": "invalid_token" }) assert response.status_code == 401 assert "invalid" in response.json()["detail"].lower() def test_i_cannot_refresh_with_expired_token(client, mock_auth_service): """ Test token refresh fails with expired token. Verifies that attempting to refresh with an expired token returns 401 Unauthorized status. """ mock_auth_service.refresh_access_token.side_effect = ExpiredTokenError() response = client.post("/auth/refresh", json={ "refresh_token": "expired_token" }) assert response.status_code == 401 assert "expired" in response.json()["detail"].lower() def test_i_cannot_refresh_with_revoked_token(client, mock_auth_service): """ Test token refresh fails with revoked token. Verifies that attempting to refresh with a revoked token returns 401 Unauthorized status. """ mock_auth_service.refresh_access_token.side_effect = RevokedTokenError() response = client.post("/auth/refresh", json={ "refresh_token": "revoked_token" }) assert response.status_code == 401 assert "revoked" in response.json()["detail"].lower() def test_i_can_logout(client, mock_auth_service): """ Test successful logout. Verifies that a POST request to /auth/logout successfully revokes the refresh token and returns 204 status. """ mock_auth_service.logout.return_value = True response = client.post("/auth/logout", json={ "refresh_token": "sample_refresh_token" }) assert response.status_code == 204 mock_auth_service.logout.assert_called_once_with("sample_refresh_token") def test_i_can_request_password_reset(client, mock_auth_service): """ Test password reset request. Verifies that a POST request to /auth/password-reset-request generates a reset token for the given email. """ mock_auth_service.request_password_reset.return_value = "reset_token_123" response = client.post("/auth/password-reset-request", json={ "email": "test@example.com" }) assert response.status_code == 200 data = response.json() assert "token" in data assert data["token"] == "reset_token_123" mock_auth_service.request_password_reset.assert_called_once_with("test@example.com") def test_i_cannot_request_password_reset_for_unknown_email(client, mock_auth_service): """ Test password reset request fails for unknown email. Verifies that requesting a password reset for a non-existent email returns 404 Not Found status. """ mock_auth_service.request_password_reset.side_effect = UserNotFoundError( "No user found with email" ) response = client.post("/auth/password-reset-request", json={ "email": "unknown@example.com" }) assert response.status_code == 404 def test_i_can_reset_password(client, mock_auth_service): """ Test successful password reset. Verifies that a POST request to /auth/password-reset with valid token and new password successfully resets the password. """ mock_auth_service.reset_password.return_value = True response = client.post("/auth/password-reset", json={ "token": "reset_token_123", "new_password": "NewSecurePass123!" }) assert response.status_code == 200 data = response.json() assert "success" in data["message"].lower() mock_auth_service.reset_password.assert_called_once_with( "reset_token_123", "NewSecurePass123!" ) def test_i_cannot_reset_password_with_invalid_token(client, mock_auth_service): """ Test password reset fails with invalid token. Verifies that attempting to reset password with an invalid token returns 401 Unauthorized status. """ mock_auth_service.reset_password.side_effect = InvalidTokenError( "Invalid password reset token" ) response = client.post("/auth/password-reset", json={ "token": "invalid_token", "new_password": "NewSecurePass123!" }) assert response.status_code == 401 def test_i_cannot_reset_password_with_expired_token(client, mock_auth_service): """ Test password reset fails with expired token. Verifies that attempting to reset password with an expired token returns 401 Unauthorized status. """ mock_auth_service.reset_password.side_effect = ExpiredTokenError( "Password reset token has expired" ) response = client.post("/auth/password-reset", json={ "token": "expired_token", "new_password": "NewSecurePass123!" }) assert response.status_code == 401 def test_i_can_request_email_verification(client, mock_auth_service): """ Test email verification request. Verifies that a POST request to /auth/verify-email-request generates a verification token for the given email. """ mock_auth_service.request_email_verification.return_value = "verify_token_jwt" response = client.post("/auth/verify-email-request", json={ "email": "test@example.com" }) assert response.status_code == 200 data = response.json() assert "token" in data assert data["token"] == "verify_token_jwt" mock_auth_service.request_email_verification.assert_called_once_with("test@example.com") def test_i_can_verify_email(client, mock_auth_service): """ Test successful email verification. Verifies that a GET request to /auth/verify-email with a valid token successfully verifies the email address. """ mock_auth_service.verify_email.return_value = True response = client.get("/auth/verify-email?token=verify_token_jwt") assert response.status_code == 200 data = response.json() assert "success" in data["message"].lower() mock_auth_service.verify_email.assert_called_once_with("verify_token_jwt") def test_i_cannot_verify_email_with_invalid_token(client, mock_auth_service): """ Test email verification fails with invalid token. Verifies that attempting to verify email with an invalid token returns 401 Unauthorized status. """ mock_auth_service.verify_email.side_effect = InvalidTokenError( "Invalid email verification token" ) response = client.get("/auth/verify-email?token=invalid_token") assert response.status_code == 401 def test_i_can_get_current_user(client, mock_auth_service, sample_user): """ Test retrieving current authenticated user. Verifies that a GET request to /auth/me with a valid Bearer token returns the current user's information. """ mock_auth_service.get_current_user.return_value = sample_user response = client.get( "/auth/me", headers={"Authorization": "Bearer sample.access.token"} ) assert response.status_code == 200 data = response.json() assert data["email"] == "test@example.com" assert data["username"] == "testuser" assert data["id"] == "user123" assert "hashed_password" not in data mock_auth_service.get_current_user.assert_called_once_with("sample.access.token") def test_i_cannot_access_protected_route_without_token(client): """ Test protected route fails without authentication token. Verifies that attempting to access /auth/me without a Bearer token returns 401 Unauthorized status. """ response = client.get("/auth/me") assert response.status_code == 401 def test_i_cannot_access_protected_route_with_invalid_token(client, mock_auth_service): """ Test protected route fails with invalid token. Verifies that attempting to access /auth/me with an invalid token returns 401 Unauthorized status. """ mock_auth_service.get_current_user.side_effect = InvalidTokenError( "Invalid access token" ) response = client.get( "/auth/me", headers={"Authorization": "Bearer invalid.token"} ) assert response.status_code == 401 def test_i_can_update_my_email(client, mock_auth_service, sample_user): """ Test user can update their own email. Verifies that a PATCH request to /auth/me with a new email successfully updates the user's email address. """ updated_user = sample_user.model_copy(update={"email": "newemail@example.com"}) mock_auth_service.get_current_user.return_value = sample_user mock_auth_service.update_user.return_value = updated_user response = client.patch( "/auth/me", headers={"Authorization": "Bearer sample.access.token"}, json={"email": "newemail@example.com"} ) assert response.status_code == 200 data = response.json() assert data["email"] == "newemail@example.com" mock_auth_service.update_user.assert_called_once() def test_i_can_update_my_username(client, mock_auth_service, sample_user): """ Test user can update their own username. Verifies that a PATCH request to /auth/me with a new username successfully updates the user's username. """ updated_user = sample_user.model_copy(update={"username": "newusername"}) mock_auth_service.get_current_user.return_value = sample_user mock_auth_service.update_user.return_value = updated_user response = client.patch( "/auth/me", headers={"Authorization": "Bearer sample.access.token"}, json={"username": "newusername"} ) assert response.status_code == 200 data = response.json() assert data["username"] == "newusername" mock_auth_service.update_user.assert_called_once() def test_i_can_update_my_password(client, mock_auth_service, sample_user): """ Test user can update their own password. Verifies that a PATCH request to /auth/me with a new password successfully updates the password (which will be hashed by the service). """ mock_auth_service.get_current_user.return_value = sample_user mock_auth_service.update_user.return_value = sample_user response = client.patch( "/auth/me", headers={"Authorization": "Bearer sample.access.token"}, json={"password": "NewSecurePass123!"} ) assert response.status_code == 200 mock_auth_service.update_user.assert_called_once() # Verify password was included in the update call_args = mock_auth_service.update_user.call_args assert call_args[1]["updates"].password == "NewSecurePass123!" def test_i_can_update_my_password_and_preserve_session(client, mock_auth_service, sample_user): """ Test user can update password while preserving current session. Verifies that when a refresh_token is provided in the request body, it is passed to the service to preserve the current session. """ mock_auth_service.get_current_user.return_value = sample_user mock_auth_service.update_user.return_value = sample_user response = client.patch( "/auth/me", headers={"Authorization": "Bearer sample.access.token"}, json={ "password": "NewSecurePass123!", "refresh_token": "current_refresh_token" } ) assert response.status_code == 200 mock_auth_service.update_user.assert_called_once() # Verify refresh_token was passed to preserve session call_args = mock_auth_service.update_user.call_args assert call_args[1]["refresh_token"] == "current_refresh_token" def test_i_can_update_my_user_settings(client, mock_auth_service, sample_user): """ Test user can update their own settings. Verifies that a PATCH request to /auth/me with new user_settings successfully updates the user's custom settings. """ new_settings = {"theme": "dark", "language": "fr", "notifications": True} updated_user = sample_user.model_copy(update={"user_settings": new_settings}) mock_auth_service.get_current_user.return_value = sample_user mock_auth_service.update_user.return_value = updated_user response = client.patch( "/auth/me", headers={"Authorization": "Bearer sample.access.token"}, json={"user_settings": new_settings} ) assert response.status_code == 200 data = response.json() assert data["user_settings"] == new_settings mock_auth_service.update_user.assert_called_once() def test_i_can_update_multiple_fields_on_my_profile(client, mock_auth_service, sample_user): """ Test user can update multiple fields simultaneously. Verifies that a PATCH request to /auth/me can update multiple fields (email, username, user_settings) at once. """ updated_user = sample_user.model_copy(update={ "email": "multiemail@example.com", "username": "multiuser", "user_settings": {"theme": "light"} }) mock_auth_service.get_current_user.return_value = sample_user mock_auth_service.update_user.return_value = updated_user response = client.patch( "/auth/me", headers={"Authorization": "Bearer sample.access.token"}, json={ "email": "multiemail@example.com", "username": "multiuser", "user_settings": {"theme": "light"} } ) assert response.status_code == 200 data = response.json() assert data["email"] == "multiemail@example.com" assert data["username"] == "multiuser" assert data["user_settings"] == {"theme": "light"} mock_auth_service.update_user.assert_called_once() def test_i_cannot_update_my_profile_without_authentication(client): """ Test updating profile fails without authentication. Verifies that a PATCH request to /auth/me without a Bearer token returns 401 Unauthorized status. """ response = client.patch( "/auth/me", json={"username": "shouldfail"} ) assert response.status_code == 401 def test_i_cannot_update_my_email_to_existing_email(client, mock_auth_service, sample_user): """ Test updating email fails if already exists. Verifies that attempting to update email to an already registered email returns 409 Conflict status. """ mock_auth_service.get_current_user.return_value = sample_user mock_auth_service.update_user.side_effect = UserAlreadyExistsError( "Email already in use" ) response = client.patch( "/auth/me", headers={"Authorization": "Bearer sample.access.token"}, json={"email": "existing@example.com"} ) assert response.status_code == 409 assert "already" in response.json()["detail"].lower() def test_i_cannot_update_my_profile_with_invalid_token(client, mock_auth_service): """ Test updating profile fails with invalid token. Verifies that attempting to access /auth/me with an invalid token returns 401 Unauthorized status. """ mock_auth_service.get_current_user.side_effect = InvalidTokenError( "Invalid access token" ) response = client.patch( "/auth/me", headers={"Authorization": "Bearer invalid.token"}, json={"username": "shouldfail"} ) assert response.status_code == 401 def test_admin_can_update_any_user(client, mock_auth_service, sample_user): """ Test admin can update any user. Verifies that an admin can successfully update another user's information via PATCH /auth/users/{user_id}. """ admin_user = sample_user.model_copy(update={"roles": ["admin"]}) target_user = sample_user.model_copy(update={"id": "target_user_id", "username": "targetuser"}) updated_user = target_user.model_copy(update={"username": "updatedusername"}) mock_auth_service.get_current_user.return_value = admin_user mock_auth_service.update_user.return_value = updated_user response = client.patch( "/auth/users/target_user_id", headers={"Authorization": "Bearer admin.access.token"}, json={"username": "updatedusername"} ) assert response.status_code == 200 data = response.json() assert data["username"] == "updatedusername" mock_auth_service.update_user.assert_called_once() def test_admin_can_update_user_roles(client, mock_auth_service, sample_user): """ Test admin can update user roles. Verifies that an admin can change a user's roles, which is forbidden for regular users on the /auth/me endpoint. """ admin_user = sample_user.model_copy(update={"roles": ["admin"]}) target_user = sample_user.model_copy(update={"id": "target_user_id", "roles": ["user"]}) updated_user = target_user.model_copy(update={"roles": ["admin", "moderator"]}) mock_auth_service.get_current_user.return_value = admin_user mock_auth_service.update_user.return_value = updated_user response = client.patch( "/auth/users/target_user_id", headers={"Authorization": "Bearer admin.access.token"}, json={"roles": ["admin", "moderator"]} ) assert response.status_code == 200 data = response.json() assert data["roles"] == ["admin", "moderator"] mock_auth_service.update_user.assert_called_once() def test_admin_can_update_user_is_active(client, mock_auth_service, sample_user): """ Test admin can update user active status. Verifies that an admin can activate or deactivate a user account, which is forbidden for regular users on the /auth/me endpoint. """ admin_user = sample_user.model_copy(update={"roles": ["admin"]}) target_user = sample_user.model_copy(update={"id": "target_user_id", "is_active": True}) updated_user = target_user.model_copy(update={"is_active": False}) mock_auth_service.get_current_user.return_value = admin_user mock_auth_service.update_user.return_value = updated_user response = client.patch( "/auth/users/target_user_id", headers={"Authorization": "Bearer admin.access.token"}, json={"is_active": False} ) assert response.status_code == 200 # Note: is_active is not in UserResponse, so we just verify the call was made mock_auth_service.update_user.assert_called_once() call_args = mock_auth_service.update_user.call_args assert call_args[1]["updates"].is_active is False def test_admin_can_update_user_is_verified(client, mock_auth_service, sample_user): """ Test admin can update user verification status. Verifies that an admin can change a user's email verification status, which is forbidden for regular users on the /auth/me endpoint. """ admin_user = sample_user.model_copy(update={"roles": ["admin"]}) target_user = sample_user.model_copy(update={"id": "target_user_id", "is_verified": False}) updated_user = target_user.model_copy(update={"is_verified": True}) mock_auth_service.get_current_user.return_value = admin_user mock_auth_service.update_user.return_value = updated_user response = client.patch( "/auth/users/target_user_id", headers={"Authorization": "Bearer admin.access.token"}, json={"is_verified": True} ) assert response.status_code == 200 # Note: is_verified is not in UserResponse, so we just verify the call was made mock_auth_service.update_user.assert_called_once() call_args = mock_auth_service.update_user.call_args assert call_args[1]["updates"].is_verified is True def test_non_admin_cannot_update_other_users(client, mock_auth_service, sample_user): """ Test non-admin cannot update other users. Verifies that a regular user (without admin role) cannot access the PATCH /auth/users/{user_id} endpoint and receives 403 Forbidden. """ regular_user = sample_user.model_copy(update={"roles": ["user"]}) mock_auth_service.get_current_user.return_value = regular_user response = client.patch( "/auth/users/other_user_id", headers={"Authorization": "Bearer user.access.token"}, json={"username": "shouldfail"} ) assert response.status_code == 403 assert "admin" in response.json()["detail"].lower() def test_admin_cannot_update_non_existent_user(client, mock_auth_service, sample_user): """ Test admin cannot update non-existent user. Verifies that attempting to update a non-existent user returns 404 Not Found status. """ admin_user = sample_user.model_copy(update={"roles": ["admin"]}) mock_auth_service.get_current_user.return_value = admin_user mock_auth_service.update_user.side_effect = UserNotFoundError( "User not found" ) response = client.patch( "/auth/users/non_existent_id", headers={"Authorization": "Bearer admin.access.token"}, json={"username": "shouldfail"} ) assert response.status_code == 404 assert "not found" in response.json()["detail"].lower()