Files
MyAuth/tests/core/test_auth_service.py

737 lines
29 KiB
Python

# tests/core/test_auth_service.py
from datetime import datetime
from unittest.mock import patch
import pytest
from myauth.core.auth import AuthService
from myauth.exceptions import UserAlreadyExistsError, InvalidCredentialsError, InvalidTokenError, \
ExpiredTokenError, RevokedTokenError
from myauth.models.token import TokenPayload
from myauth.models.user import UserCreate
class TestAuthServiceRegisterLogin(object):
"""Tests for the user registration and login processes."""
# The mocks are injected via the auth_service fixture from conftest.py
def test_register_success(self, auth_service: AuthService,
test_user_data_create: UserCreate):
"""Success: Registration works and stores the predictable hash."""
user = auth_service.register(test_user_data_create)
assert user is not None
assert user.hashed_password == "PREDICTABLE_HASHED_PASSWORD_FOR_TESTING"
auth_service.password_manager.hash_password.assert_called_once_with(test_user_data_create.password)
def test_register_failure_if_email_exists(self, auth_service: AuthService,
test_user_data_create: UserCreate):
"""Failure: Cannot register if the email already exists."""
auth_service.register(test_user_data_create)
with pytest.raises(UserAlreadyExistsError):
auth_service.register(test_user_data_create)
def test_login_success(self, auth_service: AuthService, test_user_data_create: UserCreate):
"""Success: Logging in with correct credentials generates and saves tokens."""
# Execute register to insert the user
auth_service.register(test_user_data_create)
# Execute login
user, tokens = auth_service.login(test_user_data_create.email, test_user_data_create.password)
assert user.email == test_user_data_create.email
auth_service.password_manager.verify_password.assert_called_once()
auth_service.token_manager.create_access_token.assert_called_once()
assert tokens.access_token == "MOCKED_ACCESS_TOKEN"
assert tokens.refresh_token == "MOCKED_REFRESH_TOKEN"
def test_login_failure_invalid_password(self, auth_service: AuthService, # Removed mock_password_manager injection
test_user_data_create: UserCreate):
"""Failure: Login fails with InvalidCredentialsError if the password is wrong."""
# Access the manager via the service instance
pm = auth_service.password_manager
# Setup: Mock hash for registration
pm.hash_password.return_value = "HASH"
auth_service.register(test_user_data_create)
# Mock verify for failure
pm.verify_password.return_value = False
with pytest.raises(InvalidCredentialsError):
auth_service.login(test_user_data_create.email, "WrongPassword!")
# Restore the default mock behavior defined in conftest for subsequent tests
pm.hash_password.return_value = "PREDICTABLE_HASHED_PASSWORD_FOR_TESTING"
pm.verify_password.return_value = True
def test_login_failure_user_not_found(self, auth_service: AuthService):
"""Failure: Login fails if the user does not exist."""
with pytest.raises(InvalidCredentialsError):
auth_service.login("non.existent@example.com", "AnyPassword")
def test_create_admin_if_needed_success_with_custom_credentials(
self,
auth_service: AuthService
):
"""Success: Admin is created with custom credentials when no users exist."""
# Arrange
custom_email = "custom.admin@example.com"
custom_username = "custom_admin"
custom_password = "CustomAdminPass123!"
# Act
result = auth_service.create_admin_if_needed(
admin_email=custom_email,
admin_username=custom_username,
admin_password=custom_password
)
# Assert
assert result is True
# Verify admin user was created
admin_user = auth_service.user_repository.get_user_by_email(custom_email)
assert admin_user is not None
assert admin_user.email == custom_email
assert admin_user.username == custom_username
assert "admin" in admin_user.roles
# Verify password was hashed
auth_service.password_manager.hash_password.assert_called()
def test_create_admin_if_needed_success_with_default_credentials(
self,
auth_service: AuthService,
monkeypatch
):
"""Success: Admin is created with default credentials from environment variables."""
# Arrange
monkeypatch.setenv("AUTH_ADMIN_EMAIL", "env.admin@example.com")
monkeypatch.setenv("AUTH_ADMIN_USERNAME", "env_admin")
monkeypatch.setenv("AUTH_ADMIN_PASSWORD", "EnvAdminPass123!")
# Act
result = auth_service.create_admin_if_needed()
# Assert
assert result is True
# Verify admin user was created with env variables
admin_user = auth_service.user_repository.get_user_by_email("env.admin@example.com")
assert admin_user is not None
assert admin_user.email == "env.admin@example.com"
assert admin_user.username == "env_admin"
assert "admin" in admin_user.roles
def test_create_admin_if_needed_success_with_hardcoded_defaults(
self,
auth_service: AuthService,
monkeypatch
):
"""Success: Admin is created with hardcoded defaults when no env vars or params provided."""
# Arrange - Clear any existing env variables
monkeypatch.delenv("AUTH_ADMIN_EMAIL", raising=False)
monkeypatch.delenv("AUTH_ADMIN_USERNAME", raising=False)
monkeypatch.delenv("AUTH_ADMIN_PASSWORD", raising=False)
# Act
result = auth_service.create_admin_if_needed()
# Assert
assert result is True
# Verify admin user was created with hardcoded defaults
admin_user = auth_service.user_repository.get_user_by_email("admin@myauth.com")
assert admin_user is not None
assert admin_user.email == "admin@myauth.com"
assert admin_user.username == "admin"
assert "admin" in admin_user.roles
def test_create_admin_if_needed_no_creation_when_users_exist(
self,
auth_service: AuthService,
test_user_data_create: UserCreate
):
"""Failure: Admin is not created when users already exist in the system."""
# Arrange - Create a regular user first
auth_service.register(test_user_data_create)
# Act
result = auth_service.create_admin_if_needed(
admin_email="should.not.be.created@example.com",
admin_username="should_not_exist",
admin_password="ShouldNotExist123!"
)
# Assert
assert result is False
# Verify admin user was NOT created
admin_user = auth_service.user_repository.get_user_by_email(
"should.not.be.created@example.com"
)
assert admin_user is None
# Verify only the original user exists
assert auth_service.count_users() == 1
def test_create_admin_if_needed_parameters_override_env_variables(
self,
auth_service: AuthService,
monkeypatch
):
"""Success: Parameters take precedence over environment variables."""
# Arrange
monkeypatch.setenv("AUTH_ADMIN_EMAIL", "env.admin@example.com")
monkeypatch.setenv("AUTH_ADMIN_USERNAME", "env_admin")
monkeypatch.setenv("AUTH_ADMIN_PASSWORD", "EnvAdminPass123!")
param_email = "param.admin@example.com"
param_username = "param_admin"
param_password = "ParamAdminPass123!"
# Act
result = auth_service.create_admin_if_needed(
admin_email=param_email,
admin_username=param_username,
admin_password=param_password
)
# Assert
assert result is True
# Verify parameters were used, not env variables
admin_user = auth_service.user_repository.get_user_by_email(param_email)
assert admin_user is not None
assert admin_user.email == param_email
assert admin_user.username == param_username
# Verify env admin was NOT created
env_admin = auth_service.user_repository.get_user_by_email("env.admin@example.com")
assert env_admin is None
def test_create_admin_if_needed_mixed_parameters_and_env(
self,
auth_service: AuthService,
monkeypatch
):
"""Success: Partial parameters combine with environment variables."""
# Arrange
monkeypatch.setenv("AUTH_ADMIN_EMAIL", "env.admin@example.com")
monkeypatch.setenv("AUTH_ADMIN_USERNAME", "env_admin")
monkeypatch.setenv("AUTH_ADMIN_PASSWORD", "EnvAdminPass123!")
# Act - Only provide email as parameter
result = auth_service.create_admin_if_needed(
admin_email="partial.admin@example.com"
)
# Assert
assert result is True
# Verify email from parameter, username and password from env
admin_user = auth_service.user_repository.get_user_by_email("partial.admin@example.com")
assert admin_user is not None
assert admin_user.email == "partial.admin@example.com"
assert admin_user.username == "env_admin"
class TestAuthServiceTokenManagement(object):
"""Tests for token-related flows (Refresh, Logout, GetCurrentUser)."""
@pytest.fixture(autouse=True)
def setup_user_and_token(self, auth_service: AuthService, test_user_data_create: UserCreate):
"""
Sets up a registered user and an initial set of tokens for management tests.
Temporarily overrides manager behavior for setup.
"""
# Temporarily set up predictable values for the registration/login flow within the fixture setup
pm = auth_service.password_manager
tm = auth_service.token_manager
original_hash = pm.hash_password.return_value
original_verify = pm.verify_password.return_value
original_access = tm.create_access_token.return_value
original_refresh = tm.create_refresh_token.return_value
pm.hash_password.return_value = "HASHED_PASS"
pm.verify_password.return_value = True
tm.create_access_token.return_value = "SETUP_ACCESS_TOKEN"
tm.create_refresh_token.return_value = "SETUP_REFRESH_TOKEN"
user = auth_service.register(test_user_data_create)
_, tokens = auth_service.login(test_user_data_create.email, test_user_data_create.password)
self.user = user
self.refresh_token = tokens.refresh_token
self.access_token = tokens.access_token
# Restore mock values to default conftest behavior for the actual tests
pm.hash_password.return_value = original_hash
pm.verify_password.return_value = original_verify
tm.create_access_token.return_value = original_access
tm.create_refresh_token.return_value = original_refresh
def test_refresh_access_token_success(self, auth_service: AuthService):
"""Success: Refreshing an access token works with a valid refresh token."""
# Access the manager via the service instance
tm = auth_service.token_manager
# Use patch.object on the *instance* for granular control within the test
with patch.object(tm, 'create_access_token',
return_value="NEW_MOCKED_ACCESS_TOKEN") as mock_create_access:
tokens = auth_service.refresh_access_token(self.refresh_token)
assert tokens.access_token == "NEW_MOCKED_ACCESS_TOKEN"
mock_create_access.assert_called_once()
def test_refresh_access_token_failure_invalid_token(self, auth_service: AuthService):
"""Failure: Refreshing fails if the token is invalid (revoked, expired, etc.)."""
auth_service.logout(self.refresh_token)
with pytest.raises(InvalidTokenError):
auth_service.refresh_access_token("invalid_token")
def test_logout_success(self, auth_service: AuthService):
"""Success: Logout revokes the specified refresh token."""
result = auth_service.logout(self.refresh_token)
assert result is True
with pytest.raises(RevokedTokenError):
auth_service.refresh_access_token(self.refresh_token)
def test_get_current_user_success(self, auth_service: AuthService):
"""Success: Getting the current user works by successfully decoding the JWT."""
# Mock the decoder to simulate a decoded payload
token_payload = TokenPayload(sub=self.user.id,
email=str(self.user.email),
exp=int(datetime.now().timestamp() * 1000))
with patch.object(auth_service.token_manager, 'decode_access_token',
return_value=token_payload) as mock_decode:
user = auth_service.get_current_user("dummy_jwt")
assert user.id == self.user.id
mock_decode.assert_called_once()
def test_get_current_user_failure_invalid_token(self, auth_service: AuthService):
"""Failure: Getting the current user fails if the access token is invalid/expired."""
with patch.object(auth_service.token_manager, 'decode_access_token',
side_effect=InvalidTokenError("Invalid signature")):
with pytest.raises(InvalidTokenError):
auth_service.get_current_user("invalid_access_jwt")
with patch.object(auth_service.token_manager, 'decode_access_token',
side_effect=ExpiredTokenError("Token expired")):
with pytest.raises(ExpiredTokenError):
auth_service.get_current_user("expired_access_jwt")
class TestAuthServiceUserUpdate(object):
"""Tests for user update operations."""
@pytest.fixture(autouse=True)
def setup_user(self, auth_service: AuthService, test_user_data_create: UserCreate):
"""Sets up a registered user for update tests."""
pm = auth_service.password_manager
original_hash = pm.hash_password.return_value
# Temporarily set hash for setup
pm.hash_password.return_value = "HASHED_PASS"
user = auth_service.register(test_user_data_create)
self.user = user
self.original_email = user.email
self.original_username = user.username
# Restore hash mock
pm.hash_password.return_value = original_hash
def test_i_can_update_user_email(self, auth_service: AuthService):
"""Success: Email can be updated and is_verified is automatically set to False."""
from myauth.models.user import UserUpdate
new_email = "updated.email@example.com"
updates = UserUpdate(email=new_email)
updated_user = auth_service.update_user(self.user.id, updates)
assert updated_user.email == new_email
assert updated_user.is_verified is False
def test_i_can_update_user_username(self, auth_service: AuthService):
"""Success: Username can be updated."""
from myauth.models.user import UserUpdate
new_username = "UpdatedUsername"
updates = UserUpdate(username=new_username)
updated_user = auth_service.update_user(self.user.id, updates)
assert updated_user.username == new_username
def test_i_can_update_user_password(self, auth_service: AuthService):
"""Success: Password can be updated and is properly hashed."""
from myauth.models.user import UserUpdate
pm = auth_service.password_manager
new_password = "NewSecurePass123!"
with patch.object(pm, 'hash_password', return_value="NEW_HASHED_PASSWORD") as mock_hash:
updates = UserUpdate(password=new_password)
updated_user = auth_service.update_user(self.user.id, updates)
mock_hash.assert_called_once_with(new_password)
assert updated_user.hashed_password == "NEW_HASHED_PASSWORD"
def test_i_can_update_user_password_and_all_refresh_tokens_are_revoked(
self,
auth_service: AuthService
):
"""Success: Updating password revokes all refresh tokens when no current token provided."""
from myauth.models.user import UserUpdate
# Setup: Create some refresh tokens for the user
from myauth.models.token import TokenData
from datetime import datetime, timedelta
token1 = TokenData(
token="refresh_token_1",
token_type="refresh",
user_id=self.user.id,
expires_at=datetime.now() + timedelta(days=1),
created_at=datetime.now(),
is_revoked=False
)
token2 = TokenData(
token="refresh_token_2",
token_type="refresh",
user_id=self.user.id,
expires_at=datetime.now() + timedelta(days=1),
created_at=datetime.now(),
is_revoked=False
)
auth_service.token_repository.save_token(token1)
auth_service.token_repository.save_token(token2)
# Execute: Update password without providing current token
updates = UserUpdate(password="NewPassword123!")
auth_service.update_user(self.user.id, updates)
# Verify: Both tokens are revoked
token1_after = auth_service.token_repository.get_token("refresh_token_1", "refresh")
token2_after = auth_service.token_repository.get_token("refresh_token_2", "refresh")
assert token1_after.is_revoked is True
assert token2_after.is_revoked is True
def test_i_can_update_user_password_and_preserve_current_session(
self,
auth_service: AuthService
):
"""Success: Updating password preserves the current refresh token when provided."""
from myauth.models.user import UserUpdate
from myauth.models.token import TokenData
from datetime import datetime, timedelta
# Setup: Create some refresh tokens
current_token = TokenData(
token="current_refresh_token",
token_type="refresh",
user_id=self.user.id,
expires_at=datetime.now() + timedelta(days=1),
created_at=datetime.now(),
is_revoked=False
)
other_token = TokenData(
token="other_refresh_token",
token_type="refresh",
user_id=self.user.id,
expires_at=datetime.now() + timedelta(days=1),
created_at=datetime.now(),
is_revoked=False
)
auth_service.token_repository.save_token(current_token)
auth_service.token_repository.save_token(other_token)
# Execute: Update password while providing current token
updates = UserUpdate(password="NewPassword123!")
auth_service.update_user(self.user.id, updates, refresh_token="current_refresh_token")
# Verify: Current token is preserved, other is revoked
current_after = auth_service.token_repository.get_token("current_refresh_token", "refresh")
other_after = auth_service.token_repository.get_token("other_refresh_token", "refresh")
assert current_after.is_revoked is False
assert other_after.is_revoked is True
def test_i_can_update_multiple_fields_at_once(self, auth_service: AuthService):
"""Success: Multiple fields can be updated simultaneously."""
from myauth.models.user import UserUpdate
updates = UserUpdate(
username="MultiUpdateUser",
roles=["admin", "member"],
user_settings={"theme": "light", "language": "en"}
)
updated_user = auth_service.update_user(self.user.id, updates)
assert updated_user.username == "MultiUpdateUser"
assert updated_user.roles == ["admin", "member"]
assert updated_user.user_settings == {"theme": "light", "language": "en"}
def test_i_can_update_user_roles(self, auth_service: AuthService):
"""Success: User roles can be updated."""
from myauth.models.user import UserUpdate
new_roles = ["admin", "moderator"]
updates = UserUpdate(roles=new_roles)
updated_user = auth_service.update_user(self.user.id, updates)
assert updated_user.roles == new_roles
def test_i_can_update_user_settings(self, auth_service: AuthService):
"""Success: User settings can be updated."""
from myauth.models.user import UserUpdate
new_settings = {"theme": "light", "notifications": True, "language": "fr"}
updates = UserUpdate(user_settings=new_settings)
updated_user = auth_service.update_user(self.user.id, updates)
assert updated_user.user_settings == new_settings
def test_i_can_update_is_active_status(self, auth_service: AuthService):
"""Success: User active status can be updated."""
from myauth.models.user import UserUpdate
# Deactivate user
updates = UserUpdate(is_active=False)
updated_user = auth_service.update_user(self.user.id, updates)
assert updated_user.is_active is False
# Reactivate user
updates = UserUpdate(is_active=True)
updated_user = auth_service.update_user(self.user.id, updates)
assert updated_user.is_active is True
def test_i_cannot_update_user_with_invalid_user_id(self, auth_service: AuthService):
"""Failure: Updating a non-existent user raises UserNotFoundError."""
from myauth.models.user import UserUpdate
from myauth.exceptions import UserNotFoundError
updates = UserUpdate(username="ShouldFail")
with pytest.raises(UserNotFoundError):
auth_service.update_user("non_existent_id", updates)
def test_i_cannot_update_email_to_existing_email(self, auth_service: AuthService):
"""Failure: Updating email to an already registered email raises UserAlreadyExistsError."""
from myauth.models.user import UserCreate, UserUpdate
# Setup: Create another user with a different email
other_user_data = UserCreate(
email="other.user@example.com",
username="OtherUser",
password="OtherPass123!",
roles=["member"]
)
auth_service.register(other_user_data)
# Execute: Try to update original user's email to the other user's email
updates = UserUpdate(email="other.user@example.com")
with pytest.raises(UserAlreadyExistsError):
auth_service.update_user(self.user.id, updates)
def test_i_cannot_update_email_to_same_email_without_triggering_verification_reset(
self,
auth_service: AuthService
):
"""Success: Updating email to the same email does not reset is_verified."""
from myauth.models.user import UserUpdate
# Setup: Ensure user is verified
verify_updates = UserUpdate(is_verified=True)
auth_service.update_user(self.user.id, verify_updates)
# Execute: Update with the same email
updates = UserUpdate(email=self.original_email)
updated_user = auth_service.update_user(self.user.id, updates)
# Verify: is_verified should remain True
assert updated_user.is_verified is True
assert updated_user.email == self.original_email
def test_i_can_update_with_empty_updates(self, auth_service: AuthService):
"""Success: Updating with empty UserUpdate does not cause errors."""
from myauth.models.user import UserUpdate
updates = UserUpdate()
updated_user = auth_service.update_user(self.user.id, updates)
# Verify: User is returned and fields are unchanged
assert updated_user.id == self.user.id
assert updated_user.email == self.original_email
assert updated_user.username == self.original_username
def test_email_verification_reset_only_when_email_actually_changes(
self,
auth_service: AuthService
):
"""Success: is_verified is reset to False only when email actually changes."""
from myauth.models.user import UserUpdate
# Setup: Set user as verified
verify_updates = UserUpdate(is_verified=True)
auth_service.update_user(self.user.id, verify_updates)
verified_user = auth_service.user_repository.get_user_by_id(self.user.id)
assert verified_user.is_verified is True
# Test 1: Update with same email - verification should remain
same_email_updates = UserUpdate(email=self.original_email, username="SameEmailTest")
updated_user = auth_service.update_user(self.user.id, same_email_updates)
assert updated_user.is_verified is True
# Test 2: Update with different email - verification should reset
different_email_updates = UserUpdate(email="completely.new@example.com")
updated_user = auth_service.update_user(self.user.id, different_email_updates)
assert updated_user.is_verified is False
assert updated_user.email == "completely.new@example.com"
# class TestAuthServiceResetVerification(object):
# """Tests for password reset and email verification flows."""
#
# @pytest.fixture(autouse=True)
# def setup_user(self, auth_service: AuthService, test_user_data_create: UserCreate):
# """Sets up a registered user using a mock hash for speed."""
#
# pm = auth_service.password_manager
# original_hash = pm.hash_password.return_value
#
# # Temporarily set hash for setup
# pm.hash_password.return_value = "HASHED_PASS"
# user = auth_service.register(test_user_data_create)
# self.user = user
#
# # Restore hash mock
# pm.hash_password.return_value = original_hash
#
# @patch('myauth.core.email.send_email')
# def test_request_password_reset_success(self, mock_send_email: MagicMock, auth_service: AuthService):
# """Success: Requesting a password reset generates a token and sends an email."""
#
# tm = auth_service.token_manager
# with patch.object(tm, 'create_password_reset_token',
# return_value="MOCKED_RESET_TOKEN") as mock_create_token:
# token_string = auth_service.request_password_reset(self.user.email)
#
# assert token_string == "MOCKED_RESET_TOKEN"
# mock_create_token.assert_called_once()
# mock_send_email.assert_called_once()
#
# def test_reset_password_success(self, auth_service: AuthService):
# """Success: Resetting the password works with a valid token."""
#
# # Setup: Manually create a valid reset token
# auth_service.token_repository.save_token(
# TokenData(token="valid_reset_token", token_type="password_reset", user_id=self.user.id,
# expires_at=datetime.now() + timedelta(minutes=10))
# )
#
# # Patch the PasswordManager instance to control the hash output
# pm = auth_service.password_manager
# with patch.object(pm, 'hash_password',
# return_value="NEW_HASHED_PASSWORD_FOR_RESET") as mock_hash:
# new_password = "NewPassword123!"
# result = auth_service.reset_password("valid_reset_token", new_password)
#
# assert result is True
# mock_hash.assert_called_once_with(new_password)
#
# # Verification: Check that user data was updated
# updated_user = auth_service.user_repository.get_user_by_id(self.user.id)
# assert updated_user.hashed_password == "NEW_HASHED_PASSWORD_FOR_RESET"
#
# @patch('myauth.core.email.send_email')
# def test_request_email_verification_success(self, mock_send_email: MagicMock, auth_service: AuthService):
# """Success: Requesting verification generates a token and sends an email."""
#
# tm = auth_service.token_manager
# with patch.object(tm, 'create_email_verification_token',
# return_value="MOCKED_JWT_VERIFY_TOKEN") as mock_create_token:
# token_string = auth_service.request_email_verification(self.user.email)
#
# assert token_string == "MOCKED_JWT_VERIFY_TOKEN"
# mock_create_token.assert_called_once_with(self.user.email)
# mock_send_email.assert_called_once()
#
# def test_verify_email_success(self, auth_service: AuthService):
# """Success: Verification updates the user's status."""
#
# # The token_manager is mocked in conftest, so we must access its real create method
# # or rely on the mock's return value to get a token string to use in the call.
# # Since we need a real token for the decode logic to pass, we need to bypass the mock here.
#
# # We will temporarily use the real TokenManager to create a valid, decodable token.
# # This requires an *unmocked* token manager instance, which is tricky in this setup.
#
# # Alternative: Temporarily inject a real TokenManager for this test (or rely on a non-mocked method)
#
# # Assuming TokenManager.create_email_verification_token can be mocked to return a static string
# # and TokenManager.decode_email_verification_token can be patched to simulate success.
#
# # Since the method calls decode_email_verification_token internally, we mock the output of the decode step.
#
# # Setup: Ensure user is unverified
# auth_service.user_repository.update_user(self.user.id, UserUpdate(is_verified=False))
#
# tm = auth_service.token_manager
#
# # Mock the decode step to ensure it returns the email used for verification
# with patch.object(tm, 'decode_email_verification_token', return_value=self.user.email) as mock_decode:
# # Test (we use a dummy token string as the decode step is mocked)
# result = auth_service.verify_email("dummy_verification_token")
#
# assert result is True
# mock_decode.assert_called_once()
#
# # Verification: User is verified
# updated_user = auth_service.user_repository.get_user_by_id(self.user.id)
# assert updated_user.is_verified is True