274 lines
12 KiB
Python
274 lines
12 KiB
Python
# tests/core/test_auth_service.py
|
|
|
|
from datetime import datetime
|
|
from unittest.mock import patch
|
|
|
|
import pytest
|
|
|
|
from myauth.core.auth import AuthService
|
|
from myauth.exceptions import UserAlreadyExistsError, InvalidCredentialsError, InvalidTokenError, \
|
|
ExpiredTokenError, RevokedTokenError
|
|
from myauth.models.token import TokenPayload
|
|
from myauth.models.user import UserCreate
|
|
|
|
|
|
class TestAuthServiceRegisterLogin(object):
|
|
"""Tests for the user registration and login processes."""
|
|
|
|
# The mocks are injected via the auth_service fixture from conftest.py
|
|
|
|
def test_register_success(self, auth_service: AuthService,
|
|
test_user_data_create: UserCreate):
|
|
"""Success: Registration works and stores the predictable hash."""
|
|
|
|
user = auth_service.register(test_user_data_create)
|
|
|
|
assert user is not None
|
|
assert user.hashed_password == "PREDICTABLE_HASHED_PASSWORD_FOR_TESTING"
|
|
auth_service.password_manager.hash_password.assert_called_once_with(test_user_data_create.password)
|
|
|
|
def test_register_failure_if_email_exists(self, auth_service: AuthService,
|
|
test_user_data_create: UserCreate):
|
|
"""Failure: Cannot register if the email already exists."""
|
|
|
|
auth_service.register(test_user_data_create)
|
|
|
|
with pytest.raises(UserAlreadyExistsError):
|
|
auth_service.register(test_user_data_create)
|
|
|
|
def test_login_success(self, auth_service: AuthService, test_user_data_create: UserCreate):
|
|
"""Success: Logging in with correct credentials generates and saves tokens."""
|
|
|
|
# Execute register to insert the user
|
|
auth_service.register(test_user_data_create)
|
|
|
|
# Execute login
|
|
user, tokens = auth_service.login(test_user_data_create.email, test_user_data_create.password)
|
|
|
|
assert user.email == test_user_data_create.email
|
|
|
|
auth_service.password_manager.verify_password.assert_called_once()
|
|
auth_service.token_manager.create_access_token.assert_called_once()
|
|
assert tokens.access_token == "MOCKED_ACCESS_TOKEN"
|
|
assert tokens.refresh_token == "MOCKED_REFRESH_TOKEN"
|
|
|
|
def test_login_failure_invalid_password(self, auth_service: AuthService, # Removed mock_password_manager injection
|
|
test_user_data_create: UserCreate):
|
|
"""Failure: Login fails with InvalidCredentialsError if the password is wrong."""
|
|
|
|
# Access the manager via the service instance
|
|
pm = auth_service.password_manager
|
|
|
|
# Setup: Mock hash for registration
|
|
pm.hash_password.return_value = "HASH"
|
|
auth_service.register(test_user_data_create)
|
|
|
|
# Mock verify for failure
|
|
pm.verify_password.return_value = False
|
|
|
|
with pytest.raises(InvalidCredentialsError):
|
|
auth_service.login(test_user_data_create.email, "WrongPassword!")
|
|
|
|
# Restore the default mock behavior defined in conftest for subsequent tests
|
|
pm.hash_password.return_value = "PREDICTABLE_HASHED_PASSWORD_FOR_TESTING"
|
|
pm.verify_password.return_value = True
|
|
|
|
def test_login_failure_user_not_found(self, auth_service: AuthService):
|
|
"""Failure: Login fails if the user does not exist."""
|
|
with pytest.raises(InvalidCredentialsError):
|
|
auth_service.login("non.existent@example.com", "AnyPassword")
|
|
|
|
|
|
class TestAuthServiceTokenManagement(object):
|
|
"""Tests for token-related flows (Refresh, Logout, GetCurrentUser)."""
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def setup_user_and_token(self, auth_service: AuthService, test_user_data_create: UserCreate):
|
|
"""
|
|
Sets up a registered user and an initial set of tokens for management tests.
|
|
Temporarily overrides manager behavior for setup.
|
|
"""
|
|
# Temporarily set up predictable values for the registration/login flow within the fixture setup
|
|
pm = auth_service.password_manager
|
|
tm = auth_service.token_manager
|
|
|
|
original_hash = pm.hash_password.return_value
|
|
original_verify = pm.verify_password.return_value
|
|
original_access = tm.create_access_token.return_value
|
|
original_refresh = tm.create_refresh_token.return_value
|
|
|
|
pm.hash_password.return_value = "HASHED_PASS"
|
|
pm.verify_password.return_value = True
|
|
tm.create_access_token.return_value = "SETUP_ACCESS_TOKEN"
|
|
tm.create_refresh_token.return_value = "SETUP_REFRESH_TOKEN"
|
|
|
|
user = auth_service.register(test_user_data_create)
|
|
_, tokens = auth_service.login(test_user_data_create.email, test_user_data_create.password)
|
|
|
|
self.user = user
|
|
self.refresh_token = tokens.refresh_token
|
|
self.access_token = tokens.access_token
|
|
|
|
# Restore mock values to default conftest behavior for the actual tests
|
|
pm.hash_password.return_value = original_hash
|
|
pm.verify_password.return_value = original_verify
|
|
tm.create_access_token.return_value = original_access
|
|
tm.create_refresh_token.return_value = original_refresh
|
|
|
|
def test_refresh_access_token_success(self, auth_service: AuthService):
|
|
"""Success: Refreshing an access token works with a valid refresh token."""
|
|
|
|
# Access the manager via the service instance
|
|
tm = auth_service.token_manager
|
|
|
|
# Use patch.object on the *instance* for granular control within the test
|
|
with patch.object(tm, 'create_access_token',
|
|
return_value="NEW_MOCKED_ACCESS_TOKEN") as mock_create_access:
|
|
tokens = auth_service.refresh_access_token(self.refresh_token)
|
|
|
|
assert tokens.access_token == "NEW_MOCKED_ACCESS_TOKEN"
|
|
mock_create_access.assert_called_once()
|
|
|
|
def test_refresh_access_token_failure_invalid_token(self, auth_service: AuthService):
|
|
"""Failure: Refreshing fails if the token is invalid (revoked, expired, etc.)."""
|
|
auth_service.logout(self.refresh_token)
|
|
|
|
with pytest.raises(InvalidTokenError):
|
|
auth_service.refresh_access_token("invalid_token")
|
|
|
|
def test_logout_success(self, auth_service: AuthService):
|
|
"""Success: Logout revokes the specified refresh token."""
|
|
result = auth_service.logout(self.refresh_token)
|
|
assert result is True
|
|
|
|
with pytest.raises(RevokedTokenError):
|
|
auth_service.refresh_access_token(self.refresh_token)
|
|
|
|
def test_get_current_user_success(self, auth_service: AuthService):
|
|
"""Success: Getting the current user works by successfully decoding the JWT."""
|
|
|
|
# Mock the decoder to simulate a decoded payload
|
|
token_payload = TokenPayload(sub=self.user.id,
|
|
email=str(self.user.email),
|
|
exp=int(datetime.now().timestamp() * 1000))
|
|
with patch.object(auth_service.token_manager, 'decode_access_token',
|
|
return_value=token_payload) as mock_decode:
|
|
user = auth_service.get_current_user("dummy_jwt")
|
|
|
|
assert user.id == self.user.id
|
|
mock_decode.assert_called_once()
|
|
|
|
def test_get_current_user_failure_invalid_token(self, auth_service: AuthService):
|
|
"""Failure: Getting the current user fails if the access token is invalid/expired."""
|
|
|
|
with patch.object(auth_service.token_manager, 'decode_access_token',
|
|
side_effect=InvalidTokenError("Invalid signature")):
|
|
with pytest.raises(InvalidTokenError):
|
|
auth_service.get_current_user("invalid_access_jwt")
|
|
|
|
with patch.object(auth_service.token_manager, 'decode_access_token',
|
|
side_effect=ExpiredTokenError("Token expired")):
|
|
with pytest.raises(ExpiredTokenError):
|
|
auth_service.get_current_user("expired_access_jwt")
|
|
|
|
# class TestAuthServiceResetVerification(object):
|
|
# """Tests for password reset and email verification flows."""
|
|
#
|
|
# @pytest.fixture(autouse=True)
|
|
# def setup_user(self, auth_service: AuthService, test_user_data_create: UserCreate):
|
|
# """Sets up a registered user using a mock hash for speed."""
|
|
#
|
|
# pm = auth_service.password_manager
|
|
# original_hash = pm.hash_password.return_value
|
|
#
|
|
# # Temporarily set hash for setup
|
|
# pm.hash_password.return_value = "HASHED_PASS"
|
|
# user = auth_service.register(test_user_data_create)
|
|
# self.user = user
|
|
#
|
|
# # Restore hash mock
|
|
# pm.hash_password.return_value = original_hash
|
|
#
|
|
# @patch('myauth.core.email.send_email')
|
|
# def test_request_password_reset_success(self, mock_send_email: MagicMock, auth_service: AuthService):
|
|
# """Success: Requesting a password reset generates a token and sends an email."""
|
|
#
|
|
# tm = auth_service.token_manager
|
|
# with patch.object(tm, 'create_password_reset_token',
|
|
# return_value="MOCKED_RESET_TOKEN") as mock_create_token:
|
|
# token_string = auth_service.request_password_reset(self.user.email)
|
|
#
|
|
# assert token_string == "MOCKED_RESET_TOKEN"
|
|
# mock_create_token.assert_called_once()
|
|
# mock_send_email.assert_called_once()
|
|
#
|
|
# def test_reset_password_success(self, auth_service: AuthService):
|
|
# """Success: Resetting the password works with a valid token."""
|
|
#
|
|
# # Setup: Manually create a valid reset token
|
|
# auth_service.token_repository.save_token(
|
|
# TokenData(token="valid_reset_token", token_type="password_reset", user_id=self.user.id,
|
|
# expires_at=datetime.now() + timedelta(minutes=10))
|
|
# )
|
|
#
|
|
# # Patch the PasswordManager instance to control the hash output
|
|
# pm = auth_service.password_manager
|
|
# with patch.object(pm, 'hash_password',
|
|
# return_value="NEW_HASHED_PASSWORD_FOR_RESET") as mock_hash:
|
|
# new_password = "NewPassword123!"
|
|
# result = auth_service.reset_password("valid_reset_token", new_password)
|
|
#
|
|
# assert result is True
|
|
# mock_hash.assert_called_once_with(new_password)
|
|
#
|
|
# # Verification: Check that user data was updated
|
|
# updated_user = auth_service.user_repository.get_user_by_id(self.user.id)
|
|
# assert updated_user.hashed_password == "NEW_HASHED_PASSWORD_FOR_RESET"
|
|
#
|
|
# @patch('myauth.core.email.send_email')
|
|
# def test_request_email_verification_success(self, mock_send_email: MagicMock, auth_service: AuthService):
|
|
# """Success: Requesting verification generates a token and sends an email."""
|
|
#
|
|
# tm = auth_service.token_manager
|
|
# with patch.object(tm, 'create_email_verification_token',
|
|
# return_value="MOCKED_JWT_VERIFY_TOKEN") as mock_create_token:
|
|
# token_string = auth_service.request_email_verification(self.user.email)
|
|
#
|
|
# assert token_string == "MOCKED_JWT_VERIFY_TOKEN"
|
|
# mock_create_token.assert_called_once_with(self.user.email)
|
|
# mock_send_email.assert_called_once()
|
|
#
|
|
# def test_verify_email_success(self, auth_service: AuthService):
|
|
# """Success: Verification updates the user's status."""
|
|
#
|
|
# # The token_manager is mocked in conftest, so we must access its real create method
|
|
# # or rely on the mock's return value to get a token string to use in the call.
|
|
# # Since we need a real token for the decode logic to pass, we need to bypass the mock here.
|
|
#
|
|
# # We will temporarily use the real TokenManager to create a valid, decodable token.
|
|
# # This requires an *unmocked* token manager instance, which is tricky in this setup.
|
|
#
|
|
# # Alternative: Temporarily inject a real TokenManager for this test (or rely on a non-mocked method)
|
|
#
|
|
# # Assuming TokenManager.create_email_verification_token can be mocked to return a static string
|
|
# # and TokenManager.decode_email_verification_token can be patched to simulate success.
|
|
#
|
|
# # Since the method calls decode_email_verification_token internally, we mock the output of the decode step.
|
|
#
|
|
# # Setup: Ensure user is unverified
|
|
# auth_service.user_repository.update_user(self.user.id, UserUpdate(is_verified=False))
|
|
#
|
|
# tm = auth_service.token_manager
|
|
#
|
|
# # Mock the decode step to ensure it returns the email used for verification
|
|
# with patch.object(tm, 'decode_email_verification_token', return_value=self.user.email) as mock_decode:
|
|
# # Test (we use a dummy token string as the decode step is mocked)
|
|
# result = auth_service.verify_email("dummy_verification_token")
|
|
#
|
|
# assert result is True
|
|
# mock_decode.assert_called_once()
|
|
#
|
|
# # Verification: User is verified
|
|
# updated_user = auth_service.user_repository.get_user_by_id(self.user.id)
|
|
# assert updated_user.is_verified is True
|