874 lines
27 KiB
Python
874 lines
27 KiB
Python
"""
|
|
Unit tests for FastAPI authentication routes.
|
|
|
|
This module tests all authentication API endpoints using FastAPI's TestClient
|
|
and mocked dependencies to ensure proper behavior and error handling.
|
|
"""
|
|
|
|
from datetime import datetime
|
|
from unittest.mock import Mock
|
|
|
|
import pytest
|
|
from fastapi import FastAPI
|
|
from fastapi.testclient import TestClient
|
|
|
|
from myauth.api import create_auth_app
|
|
from myauth.core.auth import AuthService
|
|
from myauth.exceptions import (
|
|
UserAlreadyExistsError,
|
|
InvalidCredentialsError,
|
|
UserNotFoundError,
|
|
InvalidTokenError,
|
|
ExpiredTokenError,
|
|
RevokedTokenError,
|
|
AccountDisabledError
|
|
)
|
|
from myauth.models.token import AccessTokenResponse
|
|
from myauth.models.user import UserInDB
|
|
|
|
|
|
@pytest.fixture
|
|
def mock_auth_service():
|
|
"""
|
|
Create a mock AuthService for testing.
|
|
|
|
Returns:
|
|
Mock AuthService instance with all methods mocked.
|
|
"""
|
|
service = Mock(spec=AuthService)
|
|
return service
|
|
|
|
|
|
@pytest.fixture
|
|
def test_app(mock_auth_service):
|
|
"""
|
|
Create a FastAPI test application with auth router.
|
|
|
|
Args:
|
|
mock_auth_service: Mocked authentication service.
|
|
|
|
Returns:
|
|
FastAPI application configured for testing.
|
|
"""
|
|
app = FastAPI()
|
|
auth_app = create_auth_app(mock_auth_service)
|
|
app.mount("/auth", auth_app)
|
|
return app
|
|
|
|
|
|
@pytest.fixture
|
|
def client(test_app):
|
|
"""
|
|
Create a test client for the FastAPI application.
|
|
|
|
Args:
|
|
test_app: FastAPI test application.
|
|
|
|
Returns:
|
|
TestClient instance.
|
|
"""
|
|
return TestClient(test_app)
|
|
|
|
|
|
@pytest.fixture
|
|
def sample_user():
|
|
"""
|
|
Create a sample user for testing.
|
|
|
|
Returns:
|
|
UserInDB instance with sample data.
|
|
"""
|
|
return UserInDB(
|
|
id="user123",
|
|
email="test@example.com",
|
|
username="testuser",
|
|
hashed_password="hashed_password_here",
|
|
roles=["user"],
|
|
user_settings={},
|
|
is_verified=False,
|
|
is_active=True,
|
|
created_at=datetime.utcnow(),
|
|
updated_at=datetime.utcnow()
|
|
)
|
|
|
|
|
|
@pytest.fixture
|
|
def sample_tokens():
|
|
"""
|
|
Create sample access and refresh tokens.
|
|
|
|
Returns:
|
|
AccessTokenResponse with sample tokens.
|
|
"""
|
|
return AccessTokenResponse(
|
|
access_token="sample.access.token",
|
|
refresh_token="sample_refresh_token",
|
|
token_type="bearer"
|
|
)
|
|
|
|
|
|
def test_i_can_register_user(client, mock_auth_service, sample_user):
|
|
"""
|
|
Test successful user registration.
|
|
|
|
Verifies that a POST request to /auth/register with valid data
|
|
returns 201 status and the created user information.
|
|
"""
|
|
mock_auth_service.register.return_value = sample_user
|
|
|
|
response = client.post("/auth/register", json={
|
|
"email": "test@example.com",
|
|
"username": "testuser",
|
|
"password": "SecurePass123!",
|
|
"roles": ["user"],
|
|
"user_settings": {}
|
|
})
|
|
|
|
assert response.status_code == 201
|
|
data = response.json()
|
|
assert data["email"] == "test@example.com"
|
|
assert data["username"] == "testuser"
|
|
assert data["id"] == "user123"
|
|
assert "hashed_password" not in data
|
|
mock_auth_service.register.assert_called_once()
|
|
|
|
|
|
def test_i_cannot_register_with_existing_email(client, mock_auth_service):
|
|
"""
|
|
Test registration fails with existing email.
|
|
|
|
Verifies that attempting to register with an already registered
|
|
email returns 409 Conflict status.
|
|
"""
|
|
mock_auth_service.register.side_effect = UserAlreadyExistsError(
|
|
"User with this email already exists"
|
|
)
|
|
|
|
response = client.post("/auth/register", json={
|
|
"email": "existing@example.com",
|
|
"username": "testuser",
|
|
"password": "SecurePass123!",
|
|
"roles": [],
|
|
"user_settings": {}
|
|
})
|
|
|
|
assert response.status_code == 409
|
|
assert "already exists" in response.json()["detail"].lower()
|
|
|
|
|
|
def test_i_can_login(client, mock_auth_service, sample_user, sample_tokens):
|
|
"""
|
|
Test successful login.
|
|
|
|
Verifies that a POST request to /auth/login with valid credentials
|
|
returns access and refresh tokens.
|
|
"""
|
|
mock_auth_service.login.return_value = (sample_user, sample_tokens)
|
|
|
|
response = client.post("/auth/login", data={
|
|
"username": "test@example.com", # OAuth2 uses 'username' field
|
|
"password": "SecurePass123!"
|
|
})
|
|
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert data["access_token"] == "sample.access.token"
|
|
assert data["refresh_token"] == "sample_refresh_token"
|
|
assert data["token_type"] == "bearer"
|
|
mock_auth_service.login.assert_called_once_with("test@example.com", "SecurePass123!")
|
|
|
|
|
|
def test_i_cannot_login_with_invalid_credentials(client, mock_auth_service):
|
|
"""
|
|
Test login fails with invalid credentials.
|
|
|
|
Verifies that attempting to login with incorrect email or password
|
|
returns 401 Unauthorized status.
|
|
"""
|
|
mock_auth_service.login.side_effect = InvalidCredentialsError()
|
|
|
|
response = client.post("/auth/login", data={
|
|
"username": "test@example.com",
|
|
"password": "WrongPassword"
|
|
})
|
|
|
|
assert response.status_code == 401
|
|
assert "invalid" in response.json()["detail"].lower()
|
|
|
|
|
|
def test_i_cannot_login_with_disabled_account(client, mock_auth_service):
|
|
"""
|
|
Test login fails with disabled account.
|
|
|
|
Verifies that attempting to login to a disabled account
|
|
returns 403 Forbidden status.
|
|
"""
|
|
mock_auth_service.login.side_effect = AccountDisabledError()
|
|
|
|
response = client.post("/auth/login", data={
|
|
"username": "test@example.com",
|
|
"password": "SecurePass123!"
|
|
})
|
|
|
|
assert response.status_code == 403
|
|
assert "disabled" in response.json()["detail"].lower()
|
|
|
|
|
|
def test_i_can_refresh_token(client, mock_auth_service, sample_tokens):
|
|
"""
|
|
Test successful token refresh.
|
|
|
|
Verifies that a POST request to /auth/refresh with a valid refresh token
|
|
returns new access and refresh tokens.
|
|
"""
|
|
new_tokens = AccessTokenResponse(
|
|
access_token="new.access.token",
|
|
refresh_token="new_refresh_token",
|
|
token_type="bearer"
|
|
)
|
|
mock_auth_service.refresh_access_token.return_value = new_tokens
|
|
|
|
response = client.post("/auth/refresh", json={
|
|
"refresh_token": "sample_refresh_token"
|
|
})
|
|
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert data["access_token"] == "new.access.token"
|
|
assert data["refresh_token"] == "new_refresh_token"
|
|
mock_auth_service.refresh_access_token.assert_called_once_with("sample_refresh_token")
|
|
|
|
|
|
def test_i_cannot_refresh_with_invalid_token(client, mock_auth_service):
|
|
"""
|
|
Test token refresh fails with invalid token.
|
|
|
|
Verifies that attempting to refresh with an invalid token
|
|
returns 401 Unauthorized status.
|
|
"""
|
|
mock_auth_service.refresh_access_token.side_effect = InvalidTokenError(
|
|
"Invalid refresh token"
|
|
)
|
|
|
|
response = client.post("/auth/refresh", json={
|
|
"refresh_token": "invalid_token"
|
|
})
|
|
|
|
assert response.status_code == 401
|
|
assert "invalid" in response.json()["detail"].lower()
|
|
|
|
|
|
def test_i_cannot_refresh_with_expired_token(client, mock_auth_service):
|
|
"""
|
|
Test token refresh fails with expired token.
|
|
|
|
Verifies that attempting to refresh with an expired token
|
|
returns 401 Unauthorized status.
|
|
"""
|
|
mock_auth_service.refresh_access_token.side_effect = ExpiredTokenError()
|
|
|
|
response = client.post("/auth/refresh", json={
|
|
"refresh_token": "expired_token"
|
|
})
|
|
|
|
assert response.status_code == 401
|
|
assert "expired" in response.json()["detail"].lower()
|
|
|
|
|
|
def test_i_cannot_refresh_with_revoked_token(client, mock_auth_service):
|
|
"""
|
|
Test token refresh fails with revoked token.
|
|
|
|
Verifies that attempting to refresh with a revoked token
|
|
returns 401 Unauthorized status.
|
|
"""
|
|
mock_auth_service.refresh_access_token.side_effect = RevokedTokenError()
|
|
|
|
response = client.post("/auth/refresh", json={
|
|
"refresh_token": "revoked_token"
|
|
})
|
|
|
|
assert response.status_code == 401
|
|
assert "revoked" in response.json()["detail"].lower()
|
|
|
|
|
|
def test_i_can_logout(client, mock_auth_service):
|
|
"""
|
|
Test successful logout.
|
|
|
|
Verifies that a POST request to /auth/logout successfully
|
|
revokes the refresh token and returns 204 status.
|
|
"""
|
|
mock_auth_service.logout.return_value = True
|
|
|
|
response = client.post("/auth/logout", json={
|
|
"refresh_token": "sample_refresh_token"
|
|
})
|
|
|
|
assert response.status_code == 204
|
|
mock_auth_service.logout.assert_called_once_with("sample_refresh_token")
|
|
|
|
|
|
def test_i_can_request_password_reset(client, mock_auth_service):
|
|
"""
|
|
Test password reset request.
|
|
|
|
Verifies that a POST request to /auth/password-reset-request
|
|
generates a reset token for the given email.
|
|
"""
|
|
mock_auth_service.request_password_reset.return_value = "reset_token_123"
|
|
|
|
response = client.post("/auth/password-reset-request", json={
|
|
"email": "test@example.com"
|
|
})
|
|
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert "token" in data
|
|
assert data["token"] == "reset_token_123"
|
|
mock_auth_service.request_password_reset.assert_called_once_with("test@example.com")
|
|
|
|
|
|
def test_i_cannot_request_password_reset_for_unknown_email(client, mock_auth_service):
|
|
"""
|
|
Test password reset request fails for unknown email.
|
|
|
|
Verifies that requesting a password reset for a non-existent email
|
|
returns 404 Not Found status.
|
|
"""
|
|
mock_auth_service.request_password_reset.side_effect = UserNotFoundError(
|
|
"No user found with email"
|
|
)
|
|
|
|
response = client.post("/auth/password-reset-request", json={
|
|
"email": "unknown@example.com"
|
|
})
|
|
|
|
assert response.status_code == 404
|
|
|
|
|
|
def test_i_can_reset_password(client, mock_auth_service):
|
|
"""
|
|
Test successful password reset.
|
|
|
|
Verifies that a POST request to /auth/password-reset with valid
|
|
token and new password successfully resets the password.
|
|
"""
|
|
mock_auth_service.reset_password.return_value = True
|
|
|
|
response = client.post("/auth/password-reset", json={
|
|
"token": "reset_token_123",
|
|
"new_password": "NewSecurePass123!"
|
|
})
|
|
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert "success" in data["message"].lower()
|
|
mock_auth_service.reset_password.assert_called_once_with(
|
|
"reset_token_123",
|
|
"NewSecurePass123!"
|
|
)
|
|
|
|
|
|
def test_i_cannot_reset_password_with_invalid_token(client, mock_auth_service):
|
|
"""
|
|
Test password reset fails with invalid token.
|
|
|
|
Verifies that attempting to reset password with an invalid token
|
|
returns 401 Unauthorized status.
|
|
"""
|
|
mock_auth_service.reset_password.side_effect = InvalidTokenError(
|
|
"Invalid password reset token"
|
|
)
|
|
|
|
response = client.post("/auth/password-reset", json={
|
|
"token": "invalid_token",
|
|
"new_password": "NewSecurePass123!"
|
|
})
|
|
|
|
assert response.status_code == 401
|
|
|
|
|
|
def test_i_cannot_reset_password_with_expired_token(client, mock_auth_service):
|
|
"""
|
|
Test password reset fails with expired token.
|
|
|
|
Verifies that attempting to reset password with an expired token
|
|
returns 401 Unauthorized status.
|
|
"""
|
|
mock_auth_service.reset_password.side_effect = ExpiredTokenError(
|
|
"Password reset token has expired"
|
|
)
|
|
|
|
response = client.post("/auth/password-reset", json={
|
|
"token": "expired_token",
|
|
"new_password": "NewSecurePass123!"
|
|
})
|
|
|
|
assert response.status_code == 401
|
|
|
|
|
|
def test_i_can_request_email_verification(client, mock_auth_service):
|
|
"""
|
|
Test email verification request.
|
|
|
|
Verifies that a POST request to /auth/verify-email-request
|
|
generates a verification token for the given email.
|
|
"""
|
|
mock_auth_service.request_email_verification.return_value = "verify_token_jwt"
|
|
|
|
response = client.post("/auth/verify-email-request", json={
|
|
"email": "test@example.com"
|
|
})
|
|
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert "token" in data
|
|
assert data["token"] == "verify_token_jwt"
|
|
mock_auth_service.request_email_verification.assert_called_once_with("test@example.com")
|
|
|
|
|
|
def test_i_can_verify_email(client, mock_auth_service):
|
|
"""
|
|
Test successful email verification.
|
|
|
|
Verifies that a GET request to /auth/verify-email with a valid
|
|
token successfully verifies the email address.
|
|
"""
|
|
mock_auth_service.verify_email.return_value = True
|
|
|
|
response = client.get("/auth/verify-email?token=verify_token_jwt")
|
|
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert "success" in data["message"].lower()
|
|
mock_auth_service.verify_email.assert_called_once_with("verify_token_jwt")
|
|
|
|
|
|
def test_i_cannot_verify_email_with_invalid_token(client, mock_auth_service):
|
|
"""
|
|
Test email verification fails with invalid token.
|
|
|
|
Verifies that attempting to verify email with an invalid token
|
|
returns 401 Unauthorized status.
|
|
"""
|
|
mock_auth_service.verify_email.side_effect = InvalidTokenError(
|
|
"Invalid email verification token"
|
|
)
|
|
|
|
response = client.get("/auth/verify-email?token=invalid_token")
|
|
|
|
assert response.status_code == 401
|
|
|
|
|
|
def test_i_can_get_current_user(client, mock_auth_service, sample_user):
|
|
"""
|
|
Test retrieving current authenticated user.
|
|
|
|
Verifies that a GET request to /auth/me with a valid Bearer token
|
|
returns the current user's information.
|
|
"""
|
|
mock_auth_service.get_current_user.return_value = sample_user
|
|
|
|
response = client.get(
|
|
"/auth/me",
|
|
headers={"Authorization": "Bearer sample.access.token"}
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert data["email"] == "test@example.com"
|
|
assert data["username"] == "testuser"
|
|
assert data["id"] == "user123"
|
|
assert "hashed_password" not in data
|
|
mock_auth_service.get_current_user.assert_called_once_with("sample.access.token")
|
|
|
|
|
|
def test_i_cannot_access_protected_route_without_token(client):
|
|
"""
|
|
Test protected route fails without authentication token.
|
|
|
|
Verifies that attempting to access /auth/me without a Bearer token
|
|
returns 401 Unauthorized status.
|
|
"""
|
|
response = client.get("/auth/me")
|
|
|
|
assert response.status_code == 401
|
|
|
|
|
|
def test_i_cannot_access_protected_route_with_invalid_token(client, mock_auth_service):
|
|
"""
|
|
Test protected route fails with invalid token.
|
|
|
|
Verifies that attempting to access /auth/me with an invalid token
|
|
returns 401 Unauthorized status.
|
|
"""
|
|
mock_auth_service.get_current_user.side_effect = InvalidTokenError(
|
|
"Invalid access token"
|
|
)
|
|
|
|
response = client.get(
|
|
"/auth/me",
|
|
headers={"Authorization": "Bearer invalid.token"}
|
|
)
|
|
|
|
assert response.status_code == 401
|
|
|
|
|
|
def test_i_can_update_my_email(client, mock_auth_service, sample_user):
|
|
"""
|
|
Test user can update their own email.
|
|
|
|
Verifies that a PATCH request to /auth/me with a new email
|
|
successfully updates the user's email address.
|
|
"""
|
|
updated_user = sample_user.model_copy(update={"email": "newemail@example.com"})
|
|
mock_auth_service.get_current_user.return_value = sample_user
|
|
mock_auth_service.update_user.return_value = updated_user
|
|
|
|
response = client.patch(
|
|
"/auth/me",
|
|
headers={"Authorization": "Bearer sample.access.token"},
|
|
json={"email": "newemail@example.com"}
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert data["email"] == "newemail@example.com"
|
|
mock_auth_service.update_user.assert_called_once()
|
|
|
|
|
|
def test_i_can_update_my_username(client, mock_auth_service, sample_user):
|
|
"""
|
|
Test user can update their own username.
|
|
|
|
Verifies that a PATCH request to /auth/me with a new username
|
|
successfully updates the user's username.
|
|
"""
|
|
updated_user = sample_user.model_copy(update={"username": "newusername"})
|
|
mock_auth_service.get_current_user.return_value = sample_user
|
|
mock_auth_service.update_user.return_value = updated_user
|
|
|
|
response = client.patch(
|
|
"/auth/me",
|
|
headers={"Authorization": "Bearer sample.access.token"},
|
|
json={"username": "newusername"}
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert data["username"] == "newusername"
|
|
mock_auth_service.update_user.assert_called_once()
|
|
|
|
|
|
def test_i_can_update_my_password(client, mock_auth_service, sample_user):
|
|
"""
|
|
Test user can update their own password.
|
|
|
|
Verifies that a PATCH request to /auth/me with a new password
|
|
successfully updates the password (which will be hashed by the service).
|
|
"""
|
|
mock_auth_service.get_current_user.return_value = sample_user
|
|
mock_auth_service.update_user.return_value = sample_user
|
|
|
|
response = client.patch(
|
|
"/auth/me",
|
|
headers={"Authorization": "Bearer sample.access.token"},
|
|
json={"password": "NewSecurePass123!"}
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
mock_auth_service.update_user.assert_called_once()
|
|
# Verify password was included in the update
|
|
call_args = mock_auth_service.update_user.call_args
|
|
assert call_args[1]["updates"].password == "NewSecurePass123!"
|
|
|
|
|
|
def test_i_can_update_my_password_and_preserve_session(client, mock_auth_service, sample_user):
|
|
"""
|
|
Test user can update password while preserving current session.
|
|
|
|
Verifies that when a refresh_token is provided in the request body,
|
|
it is passed to the service to preserve the current session.
|
|
"""
|
|
mock_auth_service.get_current_user.return_value = sample_user
|
|
mock_auth_service.update_user.return_value = sample_user
|
|
|
|
response = client.patch(
|
|
"/auth/me",
|
|
headers={"Authorization": "Bearer sample.access.token"},
|
|
json={
|
|
"password": "NewSecurePass123!",
|
|
"refresh_token": "current_refresh_token"
|
|
}
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
mock_auth_service.update_user.assert_called_once()
|
|
# Verify refresh_token was passed to preserve session
|
|
call_args = mock_auth_service.update_user.call_args
|
|
assert call_args[1]["refresh_token"] == "current_refresh_token"
|
|
|
|
|
|
def test_i_can_update_my_user_settings(client, mock_auth_service, sample_user):
|
|
"""
|
|
Test user can update their own settings.
|
|
|
|
Verifies that a PATCH request to /auth/me with new user_settings
|
|
successfully updates the user's custom settings.
|
|
"""
|
|
new_settings = {"theme": "dark", "language": "fr", "notifications": True}
|
|
updated_user = sample_user.model_copy(update={"user_settings": new_settings})
|
|
mock_auth_service.get_current_user.return_value = sample_user
|
|
mock_auth_service.update_user.return_value = updated_user
|
|
|
|
response = client.patch(
|
|
"/auth/me",
|
|
headers={"Authorization": "Bearer sample.access.token"},
|
|
json={"user_settings": new_settings}
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert data["user_settings"] == new_settings
|
|
mock_auth_service.update_user.assert_called_once()
|
|
|
|
|
|
def test_i_can_update_multiple_fields_on_my_profile(client, mock_auth_service, sample_user):
|
|
"""
|
|
Test user can update multiple fields simultaneously.
|
|
|
|
Verifies that a PATCH request to /auth/me can update multiple
|
|
fields (email, username, user_settings) at once.
|
|
"""
|
|
updated_user = sample_user.model_copy(update={
|
|
"email": "multiemail@example.com",
|
|
"username": "multiuser",
|
|
"user_settings": {"theme": "light"}
|
|
})
|
|
mock_auth_service.get_current_user.return_value = sample_user
|
|
mock_auth_service.update_user.return_value = updated_user
|
|
|
|
response = client.patch(
|
|
"/auth/me",
|
|
headers={"Authorization": "Bearer sample.access.token"},
|
|
json={
|
|
"email": "multiemail@example.com",
|
|
"username": "multiuser",
|
|
"user_settings": {"theme": "light"}
|
|
}
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert data["email"] == "multiemail@example.com"
|
|
assert data["username"] == "multiuser"
|
|
assert data["user_settings"] == {"theme": "light"}
|
|
mock_auth_service.update_user.assert_called_once()
|
|
|
|
|
|
def test_i_cannot_update_my_profile_without_authentication(client):
|
|
"""
|
|
Test updating profile fails without authentication.
|
|
|
|
Verifies that a PATCH request to /auth/me without a Bearer token
|
|
returns 401 Unauthorized status.
|
|
"""
|
|
response = client.patch(
|
|
"/auth/me",
|
|
json={"username": "shouldfail"}
|
|
)
|
|
|
|
assert response.status_code == 401
|
|
|
|
|
|
def test_i_cannot_update_my_email_to_existing_email(client, mock_auth_service, sample_user):
|
|
"""
|
|
Test updating email fails if already exists.
|
|
|
|
Verifies that attempting to update email to an already registered
|
|
email returns 409 Conflict status.
|
|
"""
|
|
mock_auth_service.get_current_user.return_value = sample_user
|
|
mock_auth_service.update_user.side_effect = UserAlreadyExistsError(
|
|
"Email already in use"
|
|
)
|
|
|
|
response = client.patch(
|
|
"/auth/me",
|
|
headers={"Authorization": "Bearer sample.access.token"},
|
|
json={"email": "existing@example.com"}
|
|
)
|
|
|
|
assert response.status_code == 409
|
|
assert "already" in response.json()["detail"].lower()
|
|
|
|
|
|
def test_i_cannot_update_my_profile_with_invalid_token(client, mock_auth_service):
|
|
"""
|
|
Test updating profile fails with invalid token.
|
|
|
|
Verifies that attempting to access /auth/me with an invalid token
|
|
returns 401 Unauthorized status.
|
|
"""
|
|
mock_auth_service.get_current_user.side_effect = InvalidTokenError(
|
|
"Invalid access token"
|
|
)
|
|
|
|
response = client.patch(
|
|
"/auth/me",
|
|
headers={"Authorization": "Bearer invalid.token"},
|
|
json={"username": "shouldfail"}
|
|
)
|
|
|
|
assert response.status_code == 401
|
|
|
|
|
|
def test_admin_can_update_any_user(client, mock_auth_service, sample_user):
|
|
"""
|
|
Test admin can update any user.
|
|
|
|
Verifies that an admin can successfully update another user's
|
|
information via PATCH /auth/users/{user_id}.
|
|
"""
|
|
admin_user = sample_user.model_copy(update={"roles": ["admin"]})
|
|
target_user = sample_user.model_copy(update={"id": "target_user_id", "username": "targetuser"})
|
|
updated_user = target_user.model_copy(update={"username": "updatedusername"})
|
|
|
|
mock_auth_service.get_current_user.return_value = admin_user
|
|
mock_auth_service.update_user.return_value = updated_user
|
|
|
|
response = client.patch(
|
|
"/auth/users/target_user_id",
|
|
headers={"Authorization": "Bearer admin.access.token"},
|
|
json={"username": "updatedusername"}
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert data["username"] == "updatedusername"
|
|
mock_auth_service.update_user.assert_called_once()
|
|
|
|
|
|
def test_admin_can_update_user_roles(client, mock_auth_service, sample_user):
|
|
"""
|
|
Test admin can update user roles.
|
|
|
|
Verifies that an admin can change a user's roles, which is forbidden
|
|
for regular users on the /auth/me endpoint.
|
|
"""
|
|
admin_user = sample_user.model_copy(update={"roles": ["admin"]})
|
|
target_user = sample_user.model_copy(update={"id": "target_user_id", "roles": ["user"]})
|
|
updated_user = target_user.model_copy(update={"roles": ["admin", "moderator"]})
|
|
|
|
mock_auth_service.get_current_user.return_value = admin_user
|
|
mock_auth_service.update_user.return_value = updated_user
|
|
|
|
response = client.patch(
|
|
"/auth/users/target_user_id",
|
|
headers={"Authorization": "Bearer admin.access.token"},
|
|
json={"roles": ["admin", "moderator"]}
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert data["roles"] == ["admin", "moderator"]
|
|
mock_auth_service.update_user.assert_called_once()
|
|
|
|
|
|
def test_admin_can_update_user_is_active(client, mock_auth_service, sample_user):
|
|
"""
|
|
Test admin can update user active status.
|
|
|
|
Verifies that an admin can activate or deactivate a user account,
|
|
which is forbidden for regular users on the /auth/me endpoint.
|
|
"""
|
|
admin_user = sample_user.model_copy(update={"roles": ["admin"]})
|
|
target_user = sample_user.model_copy(update={"id": "target_user_id", "is_active": True})
|
|
updated_user = target_user.model_copy(update={"is_active": False})
|
|
|
|
mock_auth_service.get_current_user.return_value = admin_user
|
|
mock_auth_service.update_user.return_value = updated_user
|
|
|
|
response = client.patch(
|
|
"/auth/users/target_user_id",
|
|
headers={"Authorization": "Bearer admin.access.token"},
|
|
json={"is_active": False}
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
# Note: is_active is not in UserResponse, so we just verify the call was made
|
|
mock_auth_service.update_user.assert_called_once()
|
|
call_args = mock_auth_service.update_user.call_args
|
|
assert call_args[1]["updates"].is_active is False
|
|
|
|
|
|
def test_admin_can_update_user_is_verified(client, mock_auth_service, sample_user):
|
|
"""
|
|
Test admin can update user verification status.
|
|
|
|
Verifies that an admin can change a user's email verification status,
|
|
which is forbidden for regular users on the /auth/me endpoint.
|
|
"""
|
|
admin_user = sample_user.model_copy(update={"roles": ["admin"]})
|
|
target_user = sample_user.model_copy(update={"id": "target_user_id", "is_verified": False})
|
|
updated_user = target_user.model_copy(update={"is_verified": True})
|
|
|
|
mock_auth_service.get_current_user.return_value = admin_user
|
|
mock_auth_service.update_user.return_value = updated_user
|
|
|
|
response = client.patch(
|
|
"/auth/users/target_user_id",
|
|
headers={"Authorization": "Bearer admin.access.token"},
|
|
json={"is_verified": True}
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
# Note: is_verified is not in UserResponse, so we just verify the call was made
|
|
mock_auth_service.update_user.assert_called_once()
|
|
call_args = mock_auth_service.update_user.call_args
|
|
assert call_args[1]["updates"].is_verified is True
|
|
|
|
|
|
def test_non_admin_cannot_update_other_users(client, mock_auth_service, sample_user):
|
|
"""
|
|
Test non-admin cannot update other users.
|
|
|
|
Verifies that a regular user (without admin role) cannot access
|
|
the PATCH /auth/users/{user_id} endpoint and receives 403 Forbidden.
|
|
"""
|
|
regular_user = sample_user.model_copy(update={"roles": ["user"]})
|
|
mock_auth_service.get_current_user.return_value = regular_user
|
|
|
|
response = client.patch(
|
|
"/auth/users/other_user_id",
|
|
headers={"Authorization": "Bearer user.access.token"},
|
|
json={"username": "shouldfail"}
|
|
)
|
|
|
|
assert response.status_code == 403
|
|
assert "admin" in response.json()["detail"].lower()
|
|
|
|
|
|
def test_admin_cannot_update_non_existent_user(client, mock_auth_service, sample_user):
|
|
"""
|
|
Test admin cannot update non-existent user.
|
|
|
|
Verifies that attempting to update a non-existent user returns
|
|
404 Not Found status.
|
|
"""
|
|
admin_user = sample_user.model_copy(update={"roles": ["admin"]})
|
|
mock_auth_service.get_current_user.return_value = admin_user
|
|
mock_auth_service.update_user.side_effect = UserNotFoundError(
|
|
"User not found"
|
|
)
|
|
|
|
response = client.patch(
|
|
"/auth/users/non_existent_id",
|
|
headers={"Authorization": "Bearer admin.access.token"},
|
|
json={"username": "shouldfail"}
|
|
)
|
|
|
|
assert response.status_code == 404
|
|
assert "not found" in response.json()["detail"].lower()
|