Added user profile modification

This commit is contained in:
2025-11-11 09:30:04 +01:00
parent 43603fe66f
commit 5f652b436e
9 changed files with 986 additions and 46 deletions

View File

@@ -247,6 +247,8 @@ POST /auth/password-reset # Reset password with token
POST /auth/verify-email-request # Request email verification
POST /auth/verify-email # Verify email with token
GET /auth/me # Get current user info
PATCH /auth/me # Update own profile
PATCH /auth/users/{user_id} # Update user profile (by admin)
```
### Error Handling
@@ -356,3 +358,4 @@ MIT
* 0.1.0 - Initial Release
* 0.2.0 - Added admin user auto creation
* 0.2.1 - Added user profile update (PATCH on /me and /users/{user_id})

View File

@@ -7,7 +7,7 @@ build-backend = "setuptools.build_meta"
[project]
name = "myauth"
version = "0.2.0"
version = "0.2.1"
description = "A reusable, modular authentication system for FastAPI applications with pluggable database backends."
readme = "README.md"
authors = [

View File

@@ -4,7 +4,6 @@ FastAPI routes for authentication module.
This module provides ready-to-use FastAPI routes for all authentication
operations. Routes are organized in an APIRouter with /auth prefix.
"""
import os
from typing import Annotated
from fastapi import Depends, HTTPException, status, FastAPI
@@ -18,7 +17,7 @@ from ..models.email_verification import (
PasswordResetConfirm
)
from ..models.token import AccessTokenResponse, RefreshTokenRequest
from ..models.user import UserCreate, UserResponse
from ..models.user import UserCreate, UserResponse, UserUpdate, UserUpdateMe
# OAuth2 scheme for token authentication
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/login")
@@ -86,6 +85,35 @@ def create_auth_app(auth_service: AuthService) -> FastAPI:
detail=e.message
)
def get_current_admin(current_user: Annotated[UserResponse, Depends(get_current_user)]) -> UserResponse:
"""
Dependency to ensure the current user has admin role.
This dependency can be used in any route that requires admin privileges.
It first validates the user's token (via get_current_user) then checks
for the 'admin' role.
Args:
current_user: The authenticated user (injected by get_current_user dependency).
Returns:
The current authenticated admin user.
Raises:
HTTPException: 403 if user does not have admin role.
Example:
>>> @auth_app.patch("/users/{user_id}")
>>> def update_user(user_id: str, admin: UserResponse = Depends(get_current_admin)):
>>> # Only admins can access this route
"""
if "admin" not in current_user.roles:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Admin privileges required"
)
return current_user
@auth_app.post(
"/register",
response_model=UserResponse,
@@ -375,6 +403,123 @@ def create_auth_app(auth_service: AuthService) -> FastAPI:
HTTPException: 401 if token is invalid or expired.
"""
return current_user
@auth_app.patch(
"/me",
response_model=UserResponse,
summary="Update own profile",
description="Update the current user's profile (restricted fields)."
)
def update_me(
updates: UserUpdateMe,
current_user: Annotated[UserResponse, Depends(get_current_user)]
) -> UserResponse:
"""
Update the current authenticated user's profile.
This endpoint allows users to update their own profile with restrictions:
- Allowed fields: email, username, password, user_settings
- Forbidden fields: roles, is_active, is_verified (must use other flows)
When changing password, the current session is preserved if refresh_token
is provided in the request body.
Args:
updates: User update data (restricted fields only).
current_user: The authenticated user (injected by dependency).
Returns:
Updated user information.
Raises:
HTTPException: 409 if new email already exists.
HTTPException: 422 if validation fails.
"""
try:
# Convert UserUpdateMe to UserUpdate (only allowed fields)
from ..models.user import UserUpdate
user_update = UserUpdate(
email=updates.email,
username=updates.username,
password=updates.password,
user_settings=updates.user_settings
)
# Update user with optional refresh_token to preserve session
updated_user = auth_service.update_user(
user_id=current_user.id,
updates=user_update,
refresh_token=updates.refresh_token
)
return UserResponse(
id=updated_user.id,
email=updated_user.email,
username=updated_user.username,
roles=updated_user.roles,
user_settings=updated_user.user_settings,
created_at=updated_user.created_at,
updated_at=updated_user.updated_at
)
except AuthError as e:
raise HTTPException(
status_code=e.status_code,
detail=e.message
)
@auth_app.patch(
"/users/{user_id}",
response_model=UserResponse,
summary="Update any user (admin only)",
description="Update any user's profile with full access to all fields (admin only)."
)
def update_user_by_id(
user_id: str,
updates: UserUpdate,
admin: Annotated[UserResponse, Depends(get_current_admin)]
) -> UserResponse:
"""
Update any user's profile (admin only).
This endpoint allows administrators to update any user with full access
to all fields including roles, is_active, and is_verified.
Args:
user_id: The ID of the user to update.
updates: User update data (all fields allowed).
admin: The authenticated admin user (injected by dependency).
Returns:
Updated user information.
Raises:
HTTPException: 403 if user is not an admin.
HTTPException: 404 if user not found.
HTTPException: 409 if new email already exists.
HTTPException: 422 if validation fails.
"""
try:
# Admin updates don't preserve refresh tokens (security)
updated_user = auth_service.update_user(
user_id=user_id,
updates=updates,
refresh_token=None
)
return UserResponse(
id=updated_user.id,
email=updated_user.email,
username=updated_user.username,
roles=updated_user.roles,
user_settings=updated_user.user_settings,
created_at=updated_user.created_at,
updated_at=updated_user.updated_at
)
except AuthError as e:
raise HTTPException(
status_code=e.status_code,
detail=e.message
)
return auth_app

View File

@@ -18,7 +18,7 @@ from ..exceptions import (
AccountDisabledError,
ExpiredTokenError,
InvalidTokenError,
RevokedTokenError
RevokedTokenError, UserAlreadyExistsError
)
from ..models.token import AccessTokenResponse, TokenData
from ..models.user import UserCreate, UserInDB, UserUpdate, UserCreateNoValidation
@@ -512,3 +512,61 @@ class AuthService:
:rtype: list
"""
return self.user_repository.list_users(skip, limit)
def update_user(self, user_id: str, updates: UserUpdate, refresh_token: Optional[str] = None) -> UserInDB:
"""
Update an existing user's information.
This method handles user profile updates with automatic security measures:
- Validates email uniqueness if email is changed
- Automatically sets is_verified=False if email is changed
- Hashes password if provided
- Revokes refresh tokens (except current one) if password is changed
Args:
user_id: The unique user identifier.
updates: Pydantic model containing fields to update.
refresh_token: Optional current refresh token to preserve when password changes.
Returns:
The updated user.
Raises:
UserNotFoundError: If user does not exist.
UserAlreadyExistsError: If new email is already used by another user.
Example:
>>> updates = UserUpdate(email="newemail@example.com")
>>> user = auth_service.update_user(user_id, updates)
>>>
>>> # Update password while preserving current session
>>> updates = UserUpdate(password="NewSecurePass123!")
>>> user = auth_service.update_user(user_id, updates, refresh_token=current_token)
"""
# Get current user to compare changes
current_user = self.user_repository.get_user_by_id(user_id)
if not current_user:
raise UserNotFoundError(f"User with id {user_id} not found")
# Handle email change
if updates.email is not None and updates.email != current_user.email:
# Check if new email is already used by another user
if self.user_repository.email_exists(updates.email):
raise UserAlreadyExistsError(f"Email {updates.email} is already in use")
# Force email verification to false when email changes
updates.is_verified = False
# Handle password change
if updates.password is not None:
# Hash the new password
hashed_password = self.password_manager.hash_password(updates.password)
updates.password = hashed_password
# Revoke all refresh tokens except the current one (if provided)
self.token_repository.revoke_all_user_tokens(user_id, "refresh", except_token=refresh_token)
# Update user in database
updated_user = self.user_repository.update_user(user_id, updates)
return updated_user

View File

@@ -153,6 +153,67 @@ class UserUpdate(BaseModel):
Raises:
ValueError: If username is provided but empty or too long.
"""
if value is None:
return None
return validate_username_not_empty(value)
class UserUpdateMe(BaseModel):
"""
Model for user self-update (restricted fields).
This model allows users to update their own profile with restrictions:
- Allowed fields: email, username, password, user_settings
- Forbidden fields: roles, is_active, is_verified
- Optional refresh_token to preserve current session when changing password
Attributes:
email: Optional new email address.
username: Optional new username.
password: Optional new password (will be hashed and validated).
user_settings: Optional new settings dict.
refresh_token: Optional refresh token to preserve when changing password.
"""
email: Optional[EmailStr] = None
username: Optional[str] = None
password: Optional[str] = None
user_settings: Optional[dict] = None
refresh_token: Optional[str] = None
@field_validator('password')
@classmethod
def validate_password_strength(cls, value: Optional[str]) -> Optional[str]:
"""
Validate password meets security requirements if provided.
Args:
value: The password to validate (can be None).
Returns:
The validated password or None.
Raises:
ValueError: If password is provided but does not meet security requirements.
"""
return validate_password_strength(value)
@field_validator('username')
@classmethod
def validate_username(cls, value: Optional[str]) -> Optional[str]:
"""
Validate username if provided.
Args:
value: The username to validate (can be None).
Returns:
The validated username or None.
Raises:
ValueError: If username is provided but empty or too long.
"""
if value is None:
return None
return validate_username_not_empty(value)

View File

@@ -8,6 +8,7 @@ PostgreSQL, custom engines, etc.).
"""
from abc import ABC, abstractmethod
from typing import Optional
from ..models.token import TokenData
from ..models.user import UserCreate, UserInDB, UserUpdate
@@ -203,16 +204,19 @@ class TokenRepository(ABC):
pass
@abstractmethod
def revoke_all_user_tokens(self, user_id: str, token_type: str) -> int:
def revoke_all_user_tokens(self, user_id: str, token_type: str, except_token: Optional[str] = None) -> int:
"""
Revoke all tokens of a specific type for a user.
This is useful for logout-all-devices functionality or when a user's
password is changed (invalidating all refresh tokens).
password is changed (invalidating all refresh tokens). Optionally,
a specific token can be excluded from revocation to preserve the
current session.
Args:
user_id: The user whose tokens should be revoked.
token_type: Type of tokens to revoke ("refresh" or "password_reset").
except_token: Optional token string to exclude from revocation.
Returns:
Number of tokens revoked.

View File

@@ -13,9 +13,9 @@ from typing import Optional
from uuid import uuid4
from .base import UserRepository, TokenRepository
from ..models.user import UserCreate, UserInDB, UserUpdate
from ..models.token import TokenData
from ..exceptions import UserAlreadyExistsError, UserNotFoundError
from ..models.token import TokenData
from ..models.user import UserCreate, UserInDB, UserUpdate
class SQLiteUserRepository(UserRepository):
@@ -364,38 +364,38 @@ class SQLiteUserRepository(UserRepository):
cursor = conn.cursor()
cursor.execute("SELECT 1 FROM users WHERE email = ? LIMIT 1", (email,))
return cursor.fetchone() is not None
def list_users(self, skip: int = 0, limit: int = 100) -> list[UserInDB]:
"""
List users with pagination.
"""
List users with pagination.
Args:
skip: Number of users to skip (default: 0).
limit: Maximum number of users to return (default: 100).
Args:
skip: Number of users to skip (default: 0).
limit: Maximum number of users to return (default: 100).
Returns:
List of users.
"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT id,
email,
username,
hashed_password,
roles,
user_settings,
is_verified,
is_active,
created_at,
updated_at
FROM users
ORDER BY created_at DESC LIMIT ?
OFFSET ?
""", (limit, skip))
rows = cursor.fetchall()
return [self._row_to_user(row) for row in rows]
Returns:
List of users.
"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT id,
email,
username,
hashed_password,
roles,
user_settings,
is_verified,
is_active,
created_at,
updated_at
FROM users
ORDER BY created_at DESC LIMIT ?
OFFSET ?
""", (limit, skip))
rows = cursor.fetchall()
return [self._row_to_user(row) for row in rows]
def count_users(self) -> int:
"""
@@ -410,6 +410,7 @@ class SQLiteUserRepository(UserRepository):
result = cursor.fetchone()
return result[0] if result else 0
class SQLiteTokenRepository(TokenRepository):
"""
SQLite implementation of TokenRepository.
@@ -589,25 +590,44 @@ class SQLiteTokenRepository(TokenRepository):
conn.commit()
return cursor.rowcount > 0
def revoke_all_user_tokens(self, user_id: str, token_type: str) -> int:
def revoke_all_user_tokens(self, user_id: str, token_type: str, except_token: Optional[str] = None) -> int:
"""
Revoke all tokens of a specific type for a user.
This is useful for logout-all-devices functionality or when a user's
password is changed (invalidating all refresh tokens). Optionally,
a specific token can be excluded from revocation to preserve the
current session.
Args:
user_id: The user whose tokens should be revoked.
token_type: Type of tokens to revoke.
token_type: Type of tokens to revoke ("refresh" or "password_reset").
except_token: Optional token string to exclude from revocation.
Returns:
Number of tokens revoked.
"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("""
UPDATE tokens
SET is_revoked = 1
WHERE user_id = ?
AND token_type = ?
""", (user_id, token_type))
if except_token is not None:
# Revoke all tokens except the specified one
cursor.execute("""
UPDATE tokens
SET is_revoked = 1
WHERE user_id = ?
AND token_type = ?
AND token != ?
""", (user_id, token_type, except_token))
else:
# Revoke all tokens
cursor.execute("""
UPDATE tokens
SET is_revoked = 1
WHERE user_id = ?
AND token_type = ?
""", (user_id, token_type))
conn.commit()
return cursor.rowcount
@@ -652,4 +672,4 @@ class SQLiteTokenRepository(TokenRepository):
if token_data.expires_at < datetime.now():
return False
return True
return True

View File

@@ -513,3 +513,361 @@ def test_i_cannot_access_protected_route_with_invalid_token(client, mock_auth_se
)
assert response.status_code == 401
def test_i_can_update_my_email(client, mock_auth_service, sample_user):
"""
Test user can update their own email.
Verifies that a PATCH request to /auth/me with a new email
successfully updates the user's email address.
"""
updated_user = sample_user.model_copy(update={"email": "newemail@example.com"})
mock_auth_service.get_current_user.return_value = sample_user
mock_auth_service.update_user.return_value = updated_user
response = client.patch(
"/auth/me",
headers={"Authorization": "Bearer sample.access.token"},
json={"email": "newemail@example.com"}
)
assert response.status_code == 200
data = response.json()
assert data["email"] == "newemail@example.com"
mock_auth_service.update_user.assert_called_once()
def test_i_can_update_my_username(client, mock_auth_service, sample_user):
"""
Test user can update their own username.
Verifies that a PATCH request to /auth/me with a new username
successfully updates the user's username.
"""
updated_user = sample_user.model_copy(update={"username": "newusername"})
mock_auth_service.get_current_user.return_value = sample_user
mock_auth_service.update_user.return_value = updated_user
response = client.patch(
"/auth/me",
headers={"Authorization": "Bearer sample.access.token"},
json={"username": "newusername"}
)
assert response.status_code == 200
data = response.json()
assert data["username"] == "newusername"
mock_auth_service.update_user.assert_called_once()
def test_i_can_update_my_password(client, mock_auth_service, sample_user):
"""
Test user can update their own password.
Verifies that a PATCH request to /auth/me with a new password
successfully updates the password (which will be hashed by the service).
"""
mock_auth_service.get_current_user.return_value = sample_user
mock_auth_service.update_user.return_value = sample_user
response = client.patch(
"/auth/me",
headers={"Authorization": "Bearer sample.access.token"},
json={"password": "NewSecurePass123!"}
)
assert response.status_code == 200
mock_auth_service.update_user.assert_called_once()
# Verify password was included in the update
call_args = mock_auth_service.update_user.call_args
assert call_args[1]["updates"].password == "NewSecurePass123!"
def test_i_can_update_my_password_and_preserve_session(client, mock_auth_service, sample_user):
"""
Test user can update password while preserving current session.
Verifies that when a refresh_token is provided in the request body,
it is passed to the service to preserve the current session.
"""
mock_auth_service.get_current_user.return_value = sample_user
mock_auth_service.update_user.return_value = sample_user
response = client.patch(
"/auth/me",
headers={"Authorization": "Bearer sample.access.token"},
json={
"password": "NewSecurePass123!",
"refresh_token": "current_refresh_token"
}
)
assert response.status_code == 200
mock_auth_service.update_user.assert_called_once()
# Verify refresh_token was passed to preserve session
call_args = mock_auth_service.update_user.call_args
assert call_args[1]["refresh_token"] == "current_refresh_token"
def test_i_can_update_my_user_settings(client, mock_auth_service, sample_user):
"""
Test user can update their own settings.
Verifies that a PATCH request to /auth/me with new user_settings
successfully updates the user's custom settings.
"""
new_settings = {"theme": "dark", "language": "fr", "notifications": True}
updated_user = sample_user.model_copy(update={"user_settings": new_settings})
mock_auth_service.get_current_user.return_value = sample_user
mock_auth_service.update_user.return_value = updated_user
response = client.patch(
"/auth/me",
headers={"Authorization": "Bearer sample.access.token"},
json={"user_settings": new_settings}
)
assert response.status_code == 200
data = response.json()
assert data["user_settings"] == new_settings
mock_auth_service.update_user.assert_called_once()
def test_i_can_update_multiple_fields_on_my_profile(client, mock_auth_service, sample_user):
"""
Test user can update multiple fields simultaneously.
Verifies that a PATCH request to /auth/me can update multiple
fields (email, username, user_settings) at once.
"""
updated_user = sample_user.model_copy(update={
"email": "multiemail@example.com",
"username": "multiuser",
"user_settings": {"theme": "light"}
})
mock_auth_service.get_current_user.return_value = sample_user
mock_auth_service.update_user.return_value = updated_user
response = client.patch(
"/auth/me",
headers={"Authorization": "Bearer sample.access.token"},
json={
"email": "multiemail@example.com",
"username": "multiuser",
"user_settings": {"theme": "light"}
}
)
assert response.status_code == 200
data = response.json()
assert data["email"] == "multiemail@example.com"
assert data["username"] == "multiuser"
assert data["user_settings"] == {"theme": "light"}
mock_auth_service.update_user.assert_called_once()
def test_i_cannot_update_my_profile_without_authentication(client):
"""
Test updating profile fails without authentication.
Verifies that a PATCH request to /auth/me without a Bearer token
returns 401 Unauthorized status.
"""
response = client.patch(
"/auth/me",
json={"username": "shouldfail"}
)
assert response.status_code == 401
def test_i_cannot_update_my_email_to_existing_email(client, mock_auth_service, sample_user):
"""
Test updating email fails if already exists.
Verifies that attempting to update email to an already registered
email returns 409 Conflict status.
"""
mock_auth_service.get_current_user.return_value = sample_user
mock_auth_service.update_user.side_effect = UserAlreadyExistsError(
"Email already in use"
)
response = client.patch(
"/auth/me",
headers={"Authorization": "Bearer sample.access.token"},
json={"email": "existing@example.com"}
)
assert response.status_code == 409
assert "already" in response.json()["detail"].lower()
def test_i_cannot_update_my_profile_with_invalid_token(client, mock_auth_service):
"""
Test updating profile fails with invalid token.
Verifies that attempting to access /auth/me with an invalid token
returns 401 Unauthorized status.
"""
mock_auth_service.get_current_user.side_effect = InvalidTokenError(
"Invalid access token"
)
response = client.patch(
"/auth/me",
headers={"Authorization": "Bearer invalid.token"},
json={"username": "shouldfail"}
)
assert response.status_code == 401
def test_admin_can_update_any_user(client, mock_auth_service, sample_user):
"""
Test admin can update any user.
Verifies that an admin can successfully update another user's
information via PATCH /auth/users/{user_id}.
"""
admin_user = sample_user.model_copy(update={"roles": ["admin"]})
target_user = sample_user.model_copy(update={"id": "target_user_id", "username": "targetuser"})
updated_user = target_user.model_copy(update={"username": "updatedusername"})
mock_auth_service.get_current_user.return_value = admin_user
mock_auth_service.update_user.return_value = updated_user
response = client.patch(
"/auth/users/target_user_id",
headers={"Authorization": "Bearer admin.access.token"},
json={"username": "updatedusername"}
)
assert response.status_code == 200
data = response.json()
assert data["username"] == "updatedusername"
mock_auth_service.update_user.assert_called_once()
def test_admin_can_update_user_roles(client, mock_auth_service, sample_user):
"""
Test admin can update user roles.
Verifies that an admin can change a user's roles, which is forbidden
for regular users on the /auth/me endpoint.
"""
admin_user = sample_user.model_copy(update={"roles": ["admin"]})
target_user = sample_user.model_copy(update={"id": "target_user_id", "roles": ["user"]})
updated_user = target_user.model_copy(update={"roles": ["admin", "moderator"]})
mock_auth_service.get_current_user.return_value = admin_user
mock_auth_service.update_user.return_value = updated_user
response = client.patch(
"/auth/users/target_user_id",
headers={"Authorization": "Bearer admin.access.token"},
json={"roles": ["admin", "moderator"]}
)
assert response.status_code == 200
data = response.json()
assert data["roles"] == ["admin", "moderator"]
mock_auth_service.update_user.assert_called_once()
def test_admin_can_update_user_is_active(client, mock_auth_service, sample_user):
"""
Test admin can update user active status.
Verifies that an admin can activate or deactivate a user account,
which is forbidden for regular users on the /auth/me endpoint.
"""
admin_user = sample_user.model_copy(update={"roles": ["admin"]})
target_user = sample_user.model_copy(update={"id": "target_user_id", "is_active": True})
updated_user = target_user.model_copy(update={"is_active": False})
mock_auth_service.get_current_user.return_value = admin_user
mock_auth_service.update_user.return_value = updated_user
response = client.patch(
"/auth/users/target_user_id",
headers={"Authorization": "Bearer admin.access.token"},
json={"is_active": False}
)
assert response.status_code == 200
# Note: is_active is not in UserResponse, so we just verify the call was made
mock_auth_service.update_user.assert_called_once()
call_args = mock_auth_service.update_user.call_args
assert call_args[1]["updates"].is_active is False
def test_admin_can_update_user_is_verified(client, mock_auth_service, sample_user):
"""
Test admin can update user verification status.
Verifies that an admin can change a user's email verification status,
which is forbidden for regular users on the /auth/me endpoint.
"""
admin_user = sample_user.model_copy(update={"roles": ["admin"]})
target_user = sample_user.model_copy(update={"id": "target_user_id", "is_verified": False})
updated_user = target_user.model_copy(update={"is_verified": True})
mock_auth_service.get_current_user.return_value = admin_user
mock_auth_service.update_user.return_value = updated_user
response = client.patch(
"/auth/users/target_user_id",
headers={"Authorization": "Bearer admin.access.token"},
json={"is_verified": True}
)
assert response.status_code == 200
# Note: is_verified is not in UserResponse, so we just verify the call was made
mock_auth_service.update_user.assert_called_once()
call_args = mock_auth_service.update_user.call_args
assert call_args[1]["updates"].is_verified is True
def test_non_admin_cannot_update_other_users(client, mock_auth_service, sample_user):
"""
Test non-admin cannot update other users.
Verifies that a regular user (without admin role) cannot access
the PATCH /auth/users/{user_id} endpoint and receives 403 Forbidden.
"""
regular_user = sample_user.model_copy(update={"roles": ["user"]})
mock_auth_service.get_current_user.return_value = regular_user
response = client.patch(
"/auth/users/other_user_id",
headers={"Authorization": "Bearer user.access.token"},
json={"username": "shouldfail"}
)
assert response.status_code == 403
assert "admin" in response.json()["detail"].lower()
def test_admin_cannot_update_non_existent_user(client, mock_auth_service, sample_user):
"""
Test admin cannot update non-existent user.
Verifies that attempting to update a non-existent user returns
404 Not Found status.
"""
admin_user = sample_user.model_copy(update={"roles": ["admin"]})
mock_auth_service.get_current_user.return_value = admin_user
mock_auth_service.update_user.side_effect = UserNotFoundError(
"User not found"
)
response = client.patch(
"/auth/users/non_existent_id",
headers={"Authorization": "Bearer admin.access.token"},
json={"username": "shouldfail"}
)
assert response.status_code == 404
assert "not found" in response.json()["detail"].lower()

View File

@@ -343,6 +343,297 @@ class TestAuthServiceTokenManagement(object):
with pytest.raises(ExpiredTokenError):
auth_service.get_current_user("expired_access_jwt")
class TestAuthServiceUserUpdate(object):
"""Tests for user update operations."""
@pytest.fixture(autouse=True)
def setup_user(self, auth_service: AuthService, test_user_data_create: UserCreate):
"""Sets up a registered user for update tests."""
pm = auth_service.password_manager
original_hash = pm.hash_password.return_value
# Temporarily set hash for setup
pm.hash_password.return_value = "HASHED_PASS"
user = auth_service.register(test_user_data_create)
self.user = user
self.original_email = user.email
self.original_username = user.username
# Restore hash mock
pm.hash_password.return_value = original_hash
def test_i_can_update_user_email(self, auth_service: AuthService):
"""Success: Email can be updated and is_verified is automatically set to False."""
from myauth.models.user import UserUpdate
new_email = "updated.email@example.com"
updates = UserUpdate(email=new_email)
updated_user = auth_service.update_user(self.user.id, updates)
assert updated_user.email == new_email
assert updated_user.is_verified is False
def test_i_can_update_user_username(self, auth_service: AuthService):
"""Success: Username can be updated."""
from myauth.models.user import UserUpdate
new_username = "UpdatedUsername"
updates = UserUpdate(username=new_username)
updated_user = auth_service.update_user(self.user.id, updates)
assert updated_user.username == new_username
def test_i_can_update_user_password(self, auth_service: AuthService):
"""Success: Password can be updated and is properly hashed."""
from myauth.models.user import UserUpdate
pm = auth_service.password_manager
new_password = "NewSecurePass123!"
with patch.object(pm, 'hash_password', return_value="NEW_HASHED_PASSWORD") as mock_hash:
updates = UserUpdate(password=new_password)
updated_user = auth_service.update_user(self.user.id, updates)
mock_hash.assert_called_once_with(new_password)
assert updated_user.hashed_password == "NEW_HASHED_PASSWORD"
def test_i_can_update_user_password_and_all_refresh_tokens_are_revoked(
self,
auth_service: AuthService
):
"""Success: Updating password revokes all refresh tokens when no current token provided."""
from myauth.models.user import UserUpdate
# Setup: Create some refresh tokens for the user
from myauth.models.token import TokenData
from datetime import datetime, timedelta
token1 = TokenData(
token="refresh_token_1",
token_type="refresh",
user_id=self.user.id,
expires_at=datetime.now() + timedelta(days=1),
created_at=datetime.now(),
is_revoked=False
)
token2 = TokenData(
token="refresh_token_2",
token_type="refresh",
user_id=self.user.id,
expires_at=datetime.now() + timedelta(days=1),
created_at=datetime.now(),
is_revoked=False
)
auth_service.token_repository.save_token(token1)
auth_service.token_repository.save_token(token2)
# Execute: Update password without providing current token
updates = UserUpdate(password="NewPassword123!")
auth_service.update_user(self.user.id, updates)
# Verify: Both tokens are revoked
token1_after = auth_service.token_repository.get_token("refresh_token_1", "refresh")
token2_after = auth_service.token_repository.get_token("refresh_token_2", "refresh")
assert token1_after.is_revoked is True
assert token2_after.is_revoked is True
def test_i_can_update_user_password_and_preserve_current_session(
self,
auth_service: AuthService
):
"""Success: Updating password preserves the current refresh token when provided."""
from myauth.models.user import UserUpdate
from myauth.models.token import TokenData
from datetime import datetime, timedelta
# Setup: Create some refresh tokens
current_token = TokenData(
token="current_refresh_token",
token_type="refresh",
user_id=self.user.id,
expires_at=datetime.now() + timedelta(days=1),
created_at=datetime.now(),
is_revoked=False
)
other_token = TokenData(
token="other_refresh_token",
token_type="refresh",
user_id=self.user.id,
expires_at=datetime.now() + timedelta(days=1),
created_at=datetime.now(),
is_revoked=False
)
auth_service.token_repository.save_token(current_token)
auth_service.token_repository.save_token(other_token)
# Execute: Update password while providing current token
updates = UserUpdate(password="NewPassword123!")
auth_service.update_user(self.user.id, updates, refresh_token="current_refresh_token")
# Verify: Current token is preserved, other is revoked
current_after = auth_service.token_repository.get_token("current_refresh_token", "refresh")
other_after = auth_service.token_repository.get_token("other_refresh_token", "refresh")
assert current_after.is_revoked is False
assert other_after.is_revoked is True
def test_i_can_update_multiple_fields_at_once(self, auth_service: AuthService):
"""Success: Multiple fields can be updated simultaneously."""
from myauth.models.user import UserUpdate
updates = UserUpdate(
username="MultiUpdateUser",
roles=["admin", "member"],
user_settings={"theme": "light", "language": "en"}
)
updated_user = auth_service.update_user(self.user.id, updates)
assert updated_user.username == "MultiUpdateUser"
assert updated_user.roles == ["admin", "member"]
assert updated_user.user_settings == {"theme": "light", "language": "en"}
def test_i_can_update_user_roles(self, auth_service: AuthService):
"""Success: User roles can be updated."""
from myauth.models.user import UserUpdate
new_roles = ["admin", "moderator"]
updates = UserUpdate(roles=new_roles)
updated_user = auth_service.update_user(self.user.id, updates)
assert updated_user.roles == new_roles
def test_i_can_update_user_settings(self, auth_service: AuthService):
"""Success: User settings can be updated."""
from myauth.models.user import UserUpdate
new_settings = {"theme": "light", "notifications": True, "language": "fr"}
updates = UserUpdate(user_settings=new_settings)
updated_user = auth_service.update_user(self.user.id, updates)
assert updated_user.user_settings == new_settings
def test_i_can_update_is_active_status(self, auth_service: AuthService):
"""Success: User active status can be updated."""
from myauth.models.user import UserUpdate
# Deactivate user
updates = UserUpdate(is_active=False)
updated_user = auth_service.update_user(self.user.id, updates)
assert updated_user.is_active is False
# Reactivate user
updates = UserUpdate(is_active=True)
updated_user = auth_service.update_user(self.user.id, updates)
assert updated_user.is_active is True
def test_i_cannot_update_user_with_invalid_user_id(self, auth_service: AuthService):
"""Failure: Updating a non-existent user raises UserNotFoundError."""
from myauth.models.user import UserUpdate
from myauth.exceptions import UserNotFoundError
updates = UserUpdate(username="ShouldFail")
with pytest.raises(UserNotFoundError):
auth_service.update_user("non_existent_id", updates)
def test_i_cannot_update_email_to_existing_email(self, auth_service: AuthService):
"""Failure: Updating email to an already registered email raises UserAlreadyExistsError."""
from myauth.models.user import UserCreate, UserUpdate
# Setup: Create another user with a different email
other_user_data = UserCreate(
email="other.user@example.com",
username="OtherUser",
password="OtherPass123!",
roles=["member"]
)
auth_service.register(other_user_data)
# Execute: Try to update original user's email to the other user's email
updates = UserUpdate(email="other.user@example.com")
with pytest.raises(UserAlreadyExistsError):
auth_service.update_user(self.user.id, updates)
def test_i_cannot_update_email_to_same_email_without_triggering_verification_reset(
self,
auth_service: AuthService
):
"""Success: Updating email to the same email does not reset is_verified."""
from myauth.models.user import UserUpdate
# Setup: Ensure user is verified
verify_updates = UserUpdate(is_verified=True)
auth_service.update_user(self.user.id, verify_updates)
# Execute: Update with the same email
updates = UserUpdate(email=self.original_email)
updated_user = auth_service.update_user(self.user.id, updates)
# Verify: is_verified should remain True
assert updated_user.is_verified is True
assert updated_user.email == self.original_email
def test_i_can_update_with_empty_updates(self, auth_service: AuthService):
"""Success: Updating with empty UserUpdate does not cause errors."""
from myauth.models.user import UserUpdate
updates = UserUpdate()
updated_user = auth_service.update_user(self.user.id, updates)
# Verify: User is returned and fields are unchanged
assert updated_user.id == self.user.id
assert updated_user.email == self.original_email
assert updated_user.username == self.original_username
def test_email_verification_reset_only_when_email_actually_changes(
self,
auth_service: AuthService
):
"""Success: is_verified is reset to False only when email actually changes."""
from myauth.models.user import UserUpdate
# Setup: Set user as verified
verify_updates = UserUpdate(is_verified=True)
auth_service.update_user(self.user.id, verify_updates)
verified_user = auth_service.user_repository.get_user_by_id(self.user.id)
assert verified_user.is_verified is True
# Test 1: Update with same email - verification should remain
same_email_updates = UserUpdate(email=self.original_email, username="SameEmailTest")
updated_user = auth_service.update_user(self.user.id, same_email_updates)
assert updated_user.is_verified is True
# Test 2: Update with different email - verification should reset
different_email_updates = UserUpdate(email="completely.new@example.com")
updated_user = auth_service.update_user(self.user.id, different_email_updates)
assert updated_user.is_verified is False
assert updated_user.email == "completely.new@example.com"
# class TestAuthServiceResetVerification(object):
# """Tests for password reset and email verification flows."""
#