diff --git a/README.md b/README.md index 43e153d..3b1b5cd 100644 --- a/README.md +++ b/README.md @@ -247,6 +247,8 @@ POST /auth/password-reset # Reset password with token POST /auth/verify-email-request # Request email verification POST /auth/verify-email # Verify email with token GET /auth/me # Get current user info +PATCH /auth/me # Update own profile +PATCH /auth/users/{user_id} # Update user profile (by admin) ``` ### Error Handling @@ -356,3 +358,4 @@ MIT * 0.1.0 - Initial Release * 0.2.0 - Added admin user auto creation +* 0.2.1 - Added user profile update (PATCH on /me and /users/{user_id}) diff --git a/pyproject.toml b/pyproject.toml index 621213b..9fa79fe 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -7,7 +7,7 @@ build-backend = "setuptools.build_meta" [project] name = "myauth" -version = "0.2.0" +version = "0.2.1" description = "A reusable, modular authentication system for FastAPI applications with pluggable database backends." readme = "README.md" authors = [ diff --git a/src/myauth/api/routes.py b/src/myauth/api/routes.py index f71743c..bbafa11 100644 --- a/src/myauth/api/routes.py +++ b/src/myauth/api/routes.py @@ -4,7 +4,6 @@ FastAPI routes for authentication module. This module provides ready-to-use FastAPI routes for all authentication operations. Routes are organized in an APIRouter with /auth prefix. """ -import os from typing import Annotated from fastapi import Depends, HTTPException, status, FastAPI @@ -18,7 +17,7 @@ from ..models.email_verification import ( PasswordResetConfirm ) from ..models.token import AccessTokenResponse, RefreshTokenRequest -from ..models.user import UserCreate, UserResponse +from ..models.user import UserCreate, UserResponse, UserUpdate, UserUpdateMe # OAuth2 scheme for token authentication oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/login") @@ -86,6 +85,35 @@ def create_auth_app(auth_service: AuthService) -> FastAPI: detail=e.message ) + def get_current_admin(current_user: Annotated[UserResponse, Depends(get_current_user)]) -> UserResponse: + """ + Dependency to ensure the current user has admin role. + + This dependency can be used in any route that requires admin privileges. + It first validates the user's token (via get_current_user) then checks + for the 'admin' role. + + Args: + current_user: The authenticated user (injected by get_current_user dependency). + + Returns: + The current authenticated admin user. + + Raises: + HTTPException: 403 if user does not have admin role. + + Example: + >>> @auth_app.patch("/users/{user_id}") + >>> def update_user(user_id: str, admin: UserResponse = Depends(get_current_admin)): + >>> # Only admins can access this route + """ + if "admin" not in current_user.roles: + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="Admin privileges required" + ) + return current_user + @auth_app.post( "/register", response_model=UserResponse, @@ -375,6 +403,123 @@ def create_auth_app(auth_service: AuthService) -> FastAPI: HTTPException: 401 if token is invalid or expired. """ return current_user + + @auth_app.patch( + "/me", + response_model=UserResponse, + summary="Update own profile", + description="Update the current user's profile (restricted fields)." + ) + def update_me( + updates: UserUpdateMe, + current_user: Annotated[UserResponse, Depends(get_current_user)] + ) -> UserResponse: + """ + Update the current authenticated user's profile. + This endpoint allows users to update their own profile with restrictions: + - Allowed fields: email, username, password, user_settings + - Forbidden fields: roles, is_active, is_verified (must use other flows) + + When changing password, the current session is preserved if refresh_token + is provided in the request body. + + Args: + updates: User update data (restricted fields only). + current_user: The authenticated user (injected by dependency). + + Returns: + Updated user information. + + Raises: + HTTPException: 409 if new email already exists. + HTTPException: 422 if validation fails. + """ + try: + # Convert UserUpdateMe to UserUpdate (only allowed fields) + from ..models.user import UserUpdate + + user_update = UserUpdate( + email=updates.email, + username=updates.username, + password=updates.password, + user_settings=updates.user_settings + ) + + # Update user with optional refresh_token to preserve session + updated_user = auth_service.update_user( + user_id=current_user.id, + updates=user_update, + refresh_token=updates.refresh_token + ) + + return UserResponse( + id=updated_user.id, + email=updated_user.email, + username=updated_user.username, + roles=updated_user.roles, + user_settings=updated_user.user_settings, + created_at=updated_user.created_at, + updated_at=updated_user.updated_at + ) + except AuthError as e: + raise HTTPException( + status_code=e.status_code, + detail=e.message + ) + + @auth_app.patch( + "/users/{user_id}", + response_model=UserResponse, + summary="Update any user (admin only)", + description="Update any user's profile with full access to all fields (admin only)." + ) + def update_user_by_id( + user_id: str, + updates: UserUpdate, + admin: Annotated[UserResponse, Depends(get_current_admin)] + ) -> UserResponse: + """ + Update any user's profile (admin only). + + This endpoint allows administrators to update any user with full access + to all fields including roles, is_active, and is_verified. + + Args: + user_id: The ID of the user to update. + updates: User update data (all fields allowed). + admin: The authenticated admin user (injected by dependency). + + Returns: + Updated user information. + + Raises: + HTTPException: 403 if user is not an admin. + HTTPException: 404 if user not found. + HTTPException: 409 if new email already exists. + HTTPException: 422 if validation fails. + """ + try: + # Admin updates don't preserve refresh tokens (security) + updated_user = auth_service.update_user( + user_id=user_id, + updates=updates, + refresh_token=None + ) + + return UserResponse( + id=updated_user.id, + email=updated_user.email, + username=updated_user.username, + roles=updated_user.roles, + user_settings=updated_user.user_settings, + created_at=updated_user.created_at, + updated_at=updated_user.updated_at + ) + except AuthError as e: + raise HTTPException( + status_code=e.status_code, + detail=e.message + ) return auth_app diff --git a/src/myauth/core/auth.py b/src/myauth/core/auth.py index 2fde4e5..fdcf2e7 100644 --- a/src/myauth/core/auth.py +++ b/src/myauth/core/auth.py @@ -18,7 +18,7 @@ from ..exceptions import ( AccountDisabledError, ExpiredTokenError, InvalidTokenError, - RevokedTokenError + RevokedTokenError, UserAlreadyExistsError ) from ..models.token import AccessTokenResponse, TokenData from ..models.user import UserCreate, UserInDB, UserUpdate, UserCreateNoValidation @@ -512,3 +512,61 @@ class AuthService: :rtype: list """ return self.user_repository.list_users(skip, limit) + + def update_user(self, user_id: str, updates: UserUpdate, refresh_token: Optional[str] = None) -> UserInDB: + """ + Update an existing user's information. + + This method handles user profile updates with automatic security measures: + - Validates email uniqueness if email is changed + - Automatically sets is_verified=False if email is changed + - Hashes password if provided + - Revokes refresh tokens (except current one) if password is changed + + Args: + user_id: The unique user identifier. + updates: Pydantic model containing fields to update. + refresh_token: Optional current refresh token to preserve when password changes. + + Returns: + The updated user. + + Raises: + UserNotFoundError: If user does not exist. + UserAlreadyExistsError: If new email is already used by another user. + + Example: + >>> updates = UserUpdate(email="newemail@example.com") + >>> user = auth_service.update_user(user_id, updates) + >>> + >>> # Update password while preserving current session + >>> updates = UserUpdate(password="NewSecurePass123!") + >>> user = auth_service.update_user(user_id, updates, refresh_token=current_token) + """ + # Get current user to compare changes + current_user = self.user_repository.get_user_by_id(user_id) + if not current_user: + raise UserNotFoundError(f"User with id {user_id} not found") + + # Handle email change + if updates.email is not None and updates.email != current_user.email: + # Check if new email is already used by another user + if self.user_repository.email_exists(updates.email): + raise UserAlreadyExistsError(f"Email {updates.email} is already in use") + + # Force email verification to false when email changes + updates.is_verified = False + + # Handle password change + if updates.password is not None: + # Hash the new password + hashed_password = self.password_manager.hash_password(updates.password) + updates.password = hashed_password + + # Revoke all refresh tokens except the current one (if provided) + self.token_repository.revoke_all_user_tokens(user_id, "refresh", except_token=refresh_token) + + # Update user in database + updated_user = self.user_repository.update_user(user_id, updates) + + return updated_user \ No newline at end of file diff --git a/src/myauth/models/user.py b/src/myauth/models/user.py index f5240ee..7353bf3 100644 --- a/src/myauth/models/user.py +++ b/src/myauth/models/user.py @@ -153,6 +153,67 @@ class UserUpdate(BaseModel): Raises: ValueError: If username is provided but empty or too long. """ + if value is None: + return None + return validate_username_not_empty(value) + + +class UserUpdateMe(BaseModel): + """ + Model for user self-update (restricted fields). + + This model allows users to update their own profile with restrictions: + - Allowed fields: email, username, password, user_settings + - Forbidden fields: roles, is_active, is_verified + - Optional refresh_token to preserve current session when changing password + + Attributes: + email: Optional new email address. + username: Optional new username. + password: Optional new password (will be hashed and validated). + user_settings: Optional new settings dict. + refresh_token: Optional refresh token to preserve when changing password. + """ + email: Optional[EmailStr] = None + username: Optional[str] = None + password: Optional[str] = None + user_settings: Optional[dict] = None + refresh_token: Optional[str] = None + + @field_validator('password') + @classmethod + def validate_password_strength(cls, value: Optional[str]) -> Optional[str]: + """ + Validate password meets security requirements if provided. + + Args: + value: The password to validate (can be None). + + Returns: + The validated password or None. + + Raises: + ValueError: If password is provided but does not meet security requirements. + """ + return validate_password_strength(value) + + @field_validator('username') + @classmethod + def validate_username(cls, value: Optional[str]) -> Optional[str]: + """ + Validate username if provided. + + Args: + value: The username to validate (can be None). + + Returns: + The validated username or None. + + Raises: + ValueError: If username is provided but empty or too long. + """ + if value is None: + return None return validate_username_not_empty(value) diff --git a/src/myauth/persistence/base.py b/src/myauth/persistence/base.py index da101f8..0da7edd 100644 --- a/src/myauth/persistence/base.py +++ b/src/myauth/persistence/base.py @@ -8,6 +8,7 @@ PostgreSQL, custom engines, etc.). """ from abc import ABC, abstractmethod +from typing import Optional from ..models.token import TokenData from ..models.user import UserCreate, UserInDB, UserUpdate @@ -203,16 +204,19 @@ class TokenRepository(ABC): pass @abstractmethod - def revoke_all_user_tokens(self, user_id: str, token_type: str) -> int: + def revoke_all_user_tokens(self, user_id: str, token_type: str, except_token: Optional[str] = None) -> int: """ Revoke all tokens of a specific type for a user. This is useful for logout-all-devices functionality or when a user's - password is changed (invalidating all refresh tokens). + password is changed (invalidating all refresh tokens). Optionally, + a specific token can be excluded from revocation to preserve the + current session. Args: user_id: The user whose tokens should be revoked. token_type: Type of tokens to revoke ("refresh" or "password_reset"). + except_token: Optional token string to exclude from revocation. Returns: Number of tokens revoked. diff --git a/src/myauth/persistence/sqlite.py b/src/myauth/persistence/sqlite.py index 9f39e4a..8396479 100644 --- a/src/myauth/persistence/sqlite.py +++ b/src/myauth/persistence/sqlite.py @@ -13,9 +13,9 @@ from typing import Optional from uuid import uuid4 from .base import UserRepository, TokenRepository -from ..models.user import UserCreate, UserInDB, UserUpdate -from ..models.token import TokenData from ..exceptions import UserAlreadyExistsError, UserNotFoundError +from ..models.token import TokenData +from ..models.user import UserCreate, UserInDB, UserUpdate class SQLiteUserRepository(UserRepository): @@ -364,38 +364,38 @@ class SQLiteUserRepository(UserRepository): cursor = conn.cursor() cursor.execute("SELECT 1 FROM users WHERE email = ? LIMIT 1", (email,)) return cursor.fetchone() is not None - + def list_users(self, skip: int = 0, limit: int = 100) -> list[UserInDB]: - """ - List users with pagination. + """ + List users with pagination. - Args: - skip: Number of users to skip (default: 0). - limit: Maximum number of users to return (default: 100). + Args: + skip: Number of users to skip (default: 0). + limit: Maximum number of users to return (default: 100). - Returns: - List of users. - """ - with sqlite3.connect(self.db_path) as conn: - cursor = conn.cursor() - cursor.execute(""" - SELECT id, - email, - username, - hashed_password, - roles, - user_settings, - is_verified, - is_active, - created_at, - updated_at - FROM users - ORDER BY created_at DESC LIMIT ? - OFFSET ? - """, (limit, skip)) - - rows = cursor.fetchall() - return [self._row_to_user(row) for row in rows] + Returns: + List of users. + """ + with sqlite3.connect(self.db_path) as conn: + cursor = conn.cursor() + cursor.execute(""" + SELECT id, + email, + username, + hashed_password, + roles, + user_settings, + is_verified, + is_active, + created_at, + updated_at + FROM users + ORDER BY created_at DESC LIMIT ? + OFFSET ? + """, (limit, skip)) + + rows = cursor.fetchall() + return [self._row_to_user(row) for row in rows] def count_users(self) -> int: """ @@ -410,6 +410,7 @@ class SQLiteUserRepository(UserRepository): result = cursor.fetchone() return result[0] if result else 0 + class SQLiteTokenRepository(TokenRepository): """ SQLite implementation of TokenRepository. @@ -589,25 +590,44 @@ class SQLiteTokenRepository(TokenRepository): conn.commit() return cursor.rowcount > 0 - def revoke_all_user_tokens(self, user_id: str, token_type: str) -> int: + def revoke_all_user_tokens(self, user_id: str, token_type: str, except_token: Optional[str] = None) -> int: """ Revoke all tokens of a specific type for a user. + This is useful for logout-all-devices functionality or when a user's + password is changed (invalidating all refresh tokens). Optionally, + a specific token can be excluded from revocation to preserve the + current session. + Args: user_id: The user whose tokens should be revoked. - token_type: Type of tokens to revoke. + token_type: Type of tokens to revoke ("refresh" or "password_reset"). + except_token: Optional token string to exclude from revocation. Returns: Number of tokens revoked. """ with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() - cursor.execute(""" - UPDATE tokens - SET is_revoked = 1 - WHERE user_id = ? - AND token_type = ? - """, (user_id, token_type)) + + if except_token is not None: + # Revoke all tokens except the specified one + cursor.execute(""" + UPDATE tokens + SET is_revoked = 1 + WHERE user_id = ? + AND token_type = ? + AND token != ? + """, (user_id, token_type, except_token)) + else: + # Revoke all tokens + cursor.execute(""" + UPDATE tokens + SET is_revoked = 1 + WHERE user_id = ? + AND token_type = ? + """, (user_id, token_type)) + conn.commit() return cursor.rowcount @@ -652,4 +672,4 @@ class SQLiteTokenRepository(TokenRepository): if token_data.expires_at < datetime.now(): return False - return True \ No newline at end of file + return True diff --git a/tests/api/test_api_routes.py b/tests/api/test_api_routes.py index af75640..3e283eb 100644 --- a/tests/api/test_api_routes.py +++ b/tests/api/test_api_routes.py @@ -513,3 +513,361 @@ def test_i_cannot_access_protected_route_with_invalid_token(client, mock_auth_se ) assert response.status_code == 401 + + +def test_i_can_update_my_email(client, mock_auth_service, sample_user): + """ + Test user can update their own email. + + Verifies that a PATCH request to /auth/me with a new email + successfully updates the user's email address. + """ + updated_user = sample_user.model_copy(update={"email": "newemail@example.com"}) + mock_auth_service.get_current_user.return_value = sample_user + mock_auth_service.update_user.return_value = updated_user + + response = client.patch( + "/auth/me", + headers={"Authorization": "Bearer sample.access.token"}, + json={"email": "newemail@example.com"} + ) + + assert response.status_code == 200 + data = response.json() + assert data["email"] == "newemail@example.com" + mock_auth_service.update_user.assert_called_once() + + +def test_i_can_update_my_username(client, mock_auth_service, sample_user): + """ + Test user can update their own username. + + Verifies that a PATCH request to /auth/me with a new username + successfully updates the user's username. + """ + updated_user = sample_user.model_copy(update={"username": "newusername"}) + mock_auth_service.get_current_user.return_value = sample_user + mock_auth_service.update_user.return_value = updated_user + + response = client.patch( + "/auth/me", + headers={"Authorization": "Bearer sample.access.token"}, + json={"username": "newusername"} + ) + + assert response.status_code == 200 + data = response.json() + assert data["username"] == "newusername" + mock_auth_service.update_user.assert_called_once() + + +def test_i_can_update_my_password(client, mock_auth_service, sample_user): + """ + Test user can update their own password. + + Verifies that a PATCH request to /auth/me with a new password + successfully updates the password (which will be hashed by the service). + """ + mock_auth_service.get_current_user.return_value = sample_user + mock_auth_service.update_user.return_value = sample_user + + response = client.patch( + "/auth/me", + headers={"Authorization": "Bearer sample.access.token"}, + json={"password": "NewSecurePass123!"} + ) + + assert response.status_code == 200 + mock_auth_service.update_user.assert_called_once() + # Verify password was included in the update + call_args = mock_auth_service.update_user.call_args + assert call_args[1]["updates"].password == "NewSecurePass123!" + + +def test_i_can_update_my_password_and_preserve_session(client, mock_auth_service, sample_user): + """ + Test user can update password while preserving current session. + + Verifies that when a refresh_token is provided in the request body, + it is passed to the service to preserve the current session. + """ + mock_auth_service.get_current_user.return_value = sample_user + mock_auth_service.update_user.return_value = sample_user + + response = client.patch( + "/auth/me", + headers={"Authorization": "Bearer sample.access.token"}, + json={ + "password": "NewSecurePass123!", + "refresh_token": "current_refresh_token" + } + ) + + assert response.status_code == 200 + mock_auth_service.update_user.assert_called_once() + # Verify refresh_token was passed to preserve session + call_args = mock_auth_service.update_user.call_args + assert call_args[1]["refresh_token"] == "current_refresh_token" + + +def test_i_can_update_my_user_settings(client, mock_auth_service, sample_user): + """ + Test user can update their own settings. + + Verifies that a PATCH request to /auth/me with new user_settings + successfully updates the user's custom settings. + """ + new_settings = {"theme": "dark", "language": "fr", "notifications": True} + updated_user = sample_user.model_copy(update={"user_settings": new_settings}) + mock_auth_service.get_current_user.return_value = sample_user + mock_auth_service.update_user.return_value = updated_user + + response = client.patch( + "/auth/me", + headers={"Authorization": "Bearer sample.access.token"}, + json={"user_settings": new_settings} + ) + + assert response.status_code == 200 + data = response.json() + assert data["user_settings"] == new_settings + mock_auth_service.update_user.assert_called_once() + + +def test_i_can_update_multiple_fields_on_my_profile(client, mock_auth_service, sample_user): + """ + Test user can update multiple fields simultaneously. + + Verifies that a PATCH request to /auth/me can update multiple + fields (email, username, user_settings) at once. + """ + updated_user = sample_user.model_copy(update={ + "email": "multiemail@example.com", + "username": "multiuser", + "user_settings": {"theme": "light"} + }) + mock_auth_service.get_current_user.return_value = sample_user + mock_auth_service.update_user.return_value = updated_user + + response = client.patch( + "/auth/me", + headers={"Authorization": "Bearer sample.access.token"}, + json={ + "email": "multiemail@example.com", + "username": "multiuser", + "user_settings": {"theme": "light"} + } + ) + + assert response.status_code == 200 + data = response.json() + assert data["email"] == "multiemail@example.com" + assert data["username"] == "multiuser" + assert data["user_settings"] == {"theme": "light"} + mock_auth_service.update_user.assert_called_once() + + +def test_i_cannot_update_my_profile_without_authentication(client): + """ + Test updating profile fails without authentication. + + Verifies that a PATCH request to /auth/me without a Bearer token + returns 401 Unauthorized status. + """ + response = client.patch( + "/auth/me", + json={"username": "shouldfail"} + ) + + assert response.status_code == 401 + + +def test_i_cannot_update_my_email_to_existing_email(client, mock_auth_service, sample_user): + """ + Test updating email fails if already exists. + + Verifies that attempting to update email to an already registered + email returns 409 Conflict status. + """ + mock_auth_service.get_current_user.return_value = sample_user + mock_auth_service.update_user.side_effect = UserAlreadyExistsError( + "Email already in use" + ) + + response = client.patch( + "/auth/me", + headers={"Authorization": "Bearer sample.access.token"}, + json={"email": "existing@example.com"} + ) + + assert response.status_code == 409 + assert "already" in response.json()["detail"].lower() + + +def test_i_cannot_update_my_profile_with_invalid_token(client, mock_auth_service): + """ + Test updating profile fails with invalid token. + + Verifies that attempting to access /auth/me with an invalid token + returns 401 Unauthorized status. + """ + mock_auth_service.get_current_user.side_effect = InvalidTokenError( + "Invalid access token" + ) + + response = client.patch( + "/auth/me", + headers={"Authorization": "Bearer invalid.token"}, + json={"username": "shouldfail"} + ) + + assert response.status_code == 401 + + +def test_admin_can_update_any_user(client, mock_auth_service, sample_user): + """ + Test admin can update any user. + + Verifies that an admin can successfully update another user's + information via PATCH /auth/users/{user_id}. + """ + admin_user = sample_user.model_copy(update={"roles": ["admin"]}) + target_user = sample_user.model_copy(update={"id": "target_user_id", "username": "targetuser"}) + updated_user = target_user.model_copy(update={"username": "updatedusername"}) + + mock_auth_service.get_current_user.return_value = admin_user + mock_auth_service.update_user.return_value = updated_user + + response = client.patch( + "/auth/users/target_user_id", + headers={"Authorization": "Bearer admin.access.token"}, + json={"username": "updatedusername"} + ) + + assert response.status_code == 200 + data = response.json() + assert data["username"] == "updatedusername" + mock_auth_service.update_user.assert_called_once() + + +def test_admin_can_update_user_roles(client, mock_auth_service, sample_user): + """ + Test admin can update user roles. + + Verifies that an admin can change a user's roles, which is forbidden + for regular users on the /auth/me endpoint. + """ + admin_user = sample_user.model_copy(update={"roles": ["admin"]}) + target_user = sample_user.model_copy(update={"id": "target_user_id", "roles": ["user"]}) + updated_user = target_user.model_copy(update={"roles": ["admin", "moderator"]}) + + mock_auth_service.get_current_user.return_value = admin_user + mock_auth_service.update_user.return_value = updated_user + + response = client.patch( + "/auth/users/target_user_id", + headers={"Authorization": "Bearer admin.access.token"}, + json={"roles": ["admin", "moderator"]} + ) + + assert response.status_code == 200 + data = response.json() + assert data["roles"] == ["admin", "moderator"] + mock_auth_service.update_user.assert_called_once() + + +def test_admin_can_update_user_is_active(client, mock_auth_service, sample_user): + """ + Test admin can update user active status. + + Verifies that an admin can activate or deactivate a user account, + which is forbidden for regular users on the /auth/me endpoint. + """ + admin_user = sample_user.model_copy(update={"roles": ["admin"]}) + target_user = sample_user.model_copy(update={"id": "target_user_id", "is_active": True}) + updated_user = target_user.model_copy(update={"is_active": False}) + + mock_auth_service.get_current_user.return_value = admin_user + mock_auth_service.update_user.return_value = updated_user + + response = client.patch( + "/auth/users/target_user_id", + headers={"Authorization": "Bearer admin.access.token"}, + json={"is_active": False} + ) + + assert response.status_code == 200 + # Note: is_active is not in UserResponse, so we just verify the call was made + mock_auth_service.update_user.assert_called_once() + call_args = mock_auth_service.update_user.call_args + assert call_args[1]["updates"].is_active is False + + +def test_admin_can_update_user_is_verified(client, mock_auth_service, sample_user): + """ + Test admin can update user verification status. + + Verifies that an admin can change a user's email verification status, + which is forbidden for regular users on the /auth/me endpoint. + """ + admin_user = sample_user.model_copy(update={"roles": ["admin"]}) + target_user = sample_user.model_copy(update={"id": "target_user_id", "is_verified": False}) + updated_user = target_user.model_copy(update={"is_verified": True}) + + mock_auth_service.get_current_user.return_value = admin_user + mock_auth_service.update_user.return_value = updated_user + + response = client.patch( + "/auth/users/target_user_id", + headers={"Authorization": "Bearer admin.access.token"}, + json={"is_verified": True} + ) + + assert response.status_code == 200 + # Note: is_verified is not in UserResponse, so we just verify the call was made + mock_auth_service.update_user.assert_called_once() + call_args = mock_auth_service.update_user.call_args + assert call_args[1]["updates"].is_verified is True + + +def test_non_admin_cannot_update_other_users(client, mock_auth_service, sample_user): + """ + Test non-admin cannot update other users. + + Verifies that a regular user (without admin role) cannot access + the PATCH /auth/users/{user_id} endpoint and receives 403 Forbidden. + """ + regular_user = sample_user.model_copy(update={"roles": ["user"]}) + mock_auth_service.get_current_user.return_value = regular_user + + response = client.patch( + "/auth/users/other_user_id", + headers={"Authorization": "Bearer user.access.token"}, + json={"username": "shouldfail"} + ) + + assert response.status_code == 403 + assert "admin" in response.json()["detail"].lower() + + +def test_admin_cannot_update_non_existent_user(client, mock_auth_service, sample_user): + """ + Test admin cannot update non-existent user. + + Verifies that attempting to update a non-existent user returns + 404 Not Found status. + """ + admin_user = sample_user.model_copy(update={"roles": ["admin"]}) + mock_auth_service.get_current_user.return_value = admin_user + mock_auth_service.update_user.side_effect = UserNotFoundError( + "User not found" + ) + + response = client.patch( + "/auth/users/non_existent_id", + headers={"Authorization": "Bearer admin.access.token"}, + json={"username": "shouldfail"} + ) + + assert response.status_code == 404 + assert "not found" in response.json()["detail"].lower() diff --git a/tests/core/test_auth_service.py b/tests/core/test_auth_service.py index bb6c63a..1d0c906 100644 --- a/tests/core/test_auth_service.py +++ b/tests/core/test_auth_service.py @@ -343,6 +343,297 @@ class TestAuthServiceTokenManagement(object): with pytest.raises(ExpiredTokenError): auth_service.get_current_user("expired_access_jwt") + +class TestAuthServiceUserUpdate(object): + """Tests for user update operations.""" + + @pytest.fixture(autouse=True) + def setup_user(self, auth_service: AuthService, test_user_data_create: UserCreate): + """Sets up a registered user for update tests.""" + + pm = auth_service.password_manager + original_hash = pm.hash_password.return_value + + # Temporarily set hash for setup + pm.hash_password.return_value = "HASHED_PASS" + user = auth_service.register(test_user_data_create) + self.user = user + self.original_email = user.email + self.original_username = user.username + + # Restore hash mock + pm.hash_password.return_value = original_hash + + def test_i_can_update_user_email(self, auth_service: AuthService): + """Success: Email can be updated and is_verified is automatically set to False.""" + + from myauth.models.user import UserUpdate + + new_email = "updated.email@example.com" + updates = UserUpdate(email=new_email) + + updated_user = auth_service.update_user(self.user.id, updates) + + assert updated_user.email == new_email + assert updated_user.is_verified is False + + def test_i_can_update_user_username(self, auth_service: AuthService): + """Success: Username can be updated.""" + + from myauth.models.user import UserUpdate + + new_username = "UpdatedUsername" + updates = UserUpdate(username=new_username) + + updated_user = auth_service.update_user(self.user.id, updates) + + assert updated_user.username == new_username + + def test_i_can_update_user_password(self, auth_service: AuthService): + """Success: Password can be updated and is properly hashed.""" + + from myauth.models.user import UserUpdate + + pm = auth_service.password_manager + new_password = "NewSecurePass123!" + + with patch.object(pm, 'hash_password', return_value="NEW_HASHED_PASSWORD") as mock_hash: + updates = UserUpdate(password=new_password) + updated_user = auth_service.update_user(self.user.id, updates) + + mock_hash.assert_called_once_with(new_password) + assert updated_user.hashed_password == "NEW_HASHED_PASSWORD" + + def test_i_can_update_user_password_and_all_refresh_tokens_are_revoked( + self, + auth_service: AuthService + ): + """Success: Updating password revokes all refresh tokens when no current token provided.""" + + from myauth.models.user import UserUpdate + + # Setup: Create some refresh tokens for the user + from myauth.models.token import TokenData + from datetime import datetime, timedelta + + token1 = TokenData( + token="refresh_token_1", + token_type="refresh", + user_id=self.user.id, + expires_at=datetime.now() + timedelta(days=1), + created_at=datetime.now(), + is_revoked=False + ) + token2 = TokenData( + token="refresh_token_2", + token_type="refresh", + user_id=self.user.id, + expires_at=datetime.now() + timedelta(days=1), + created_at=datetime.now(), + is_revoked=False + ) + auth_service.token_repository.save_token(token1) + auth_service.token_repository.save_token(token2) + + # Execute: Update password without providing current token + updates = UserUpdate(password="NewPassword123!") + auth_service.update_user(self.user.id, updates) + + # Verify: Both tokens are revoked + token1_after = auth_service.token_repository.get_token("refresh_token_1", "refresh") + token2_after = auth_service.token_repository.get_token("refresh_token_2", "refresh") + + assert token1_after.is_revoked is True + assert token2_after.is_revoked is True + + def test_i_can_update_user_password_and_preserve_current_session( + self, + auth_service: AuthService + ): + """Success: Updating password preserves the current refresh token when provided.""" + + from myauth.models.user import UserUpdate + from myauth.models.token import TokenData + from datetime import datetime, timedelta + + # Setup: Create some refresh tokens + current_token = TokenData( + token="current_refresh_token", + token_type="refresh", + user_id=self.user.id, + expires_at=datetime.now() + timedelta(days=1), + created_at=datetime.now(), + is_revoked=False + ) + other_token = TokenData( + token="other_refresh_token", + token_type="refresh", + user_id=self.user.id, + expires_at=datetime.now() + timedelta(days=1), + created_at=datetime.now(), + is_revoked=False + ) + auth_service.token_repository.save_token(current_token) + auth_service.token_repository.save_token(other_token) + + # Execute: Update password while providing current token + updates = UserUpdate(password="NewPassword123!") + auth_service.update_user(self.user.id, updates, refresh_token="current_refresh_token") + + # Verify: Current token is preserved, other is revoked + current_after = auth_service.token_repository.get_token("current_refresh_token", "refresh") + other_after = auth_service.token_repository.get_token("other_refresh_token", "refresh") + + assert current_after.is_revoked is False + assert other_after.is_revoked is True + + def test_i_can_update_multiple_fields_at_once(self, auth_service: AuthService): + """Success: Multiple fields can be updated simultaneously.""" + + from myauth.models.user import UserUpdate + + updates = UserUpdate( + username="MultiUpdateUser", + roles=["admin", "member"], + user_settings={"theme": "light", "language": "en"} + ) + + updated_user = auth_service.update_user(self.user.id, updates) + + assert updated_user.username == "MultiUpdateUser" + assert updated_user.roles == ["admin", "member"] + assert updated_user.user_settings == {"theme": "light", "language": "en"} + + def test_i_can_update_user_roles(self, auth_service: AuthService): + """Success: User roles can be updated.""" + + from myauth.models.user import UserUpdate + + new_roles = ["admin", "moderator"] + updates = UserUpdate(roles=new_roles) + + updated_user = auth_service.update_user(self.user.id, updates) + + assert updated_user.roles == new_roles + + def test_i_can_update_user_settings(self, auth_service: AuthService): + """Success: User settings can be updated.""" + + from myauth.models.user import UserUpdate + + new_settings = {"theme": "light", "notifications": True, "language": "fr"} + updates = UserUpdate(user_settings=new_settings) + + updated_user = auth_service.update_user(self.user.id, updates) + + assert updated_user.user_settings == new_settings + + def test_i_can_update_is_active_status(self, auth_service: AuthService): + """Success: User active status can be updated.""" + + from myauth.models.user import UserUpdate + + # Deactivate user + updates = UserUpdate(is_active=False) + updated_user = auth_service.update_user(self.user.id, updates) + + assert updated_user.is_active is False + + # Reactivate user + updates = UserUpdate(is_active=True) + updated_user = auth_service.update_user(self.user.id, updates) + + assert updated_user.is_active is True + + def test_i_cannot_update_user_with_invalid_user_id(self, auth_service: AuthService): + """Failure: Updating a non-existent user raises UserNotFoundError.""" + + from myauth.models.user import UserUpdate + from myauth.exceptions import UserNotFoundError + + updates = UserUpdate(username="ShouldFail") + + with pytest.raises(UserNotFoundError): + auth_service.update_user("non_existent_id", updates) + + def test_i_cannot_update_email_to_existing_email(self, auth_service: AuthService): + """Failure: Updating email to an already registered email raises UserAlreadyExistsError.""" + + from myauth.models.user import UserCreate, UserUpdate + + # Setup: Create another user with a different email + other_user_data = UserCreate( + email="other.user@example.com", + username="OtherUser", + password="OtherPass123!", + roles=["member"] + ) + auth_service.register(other_user_data) + + # Execute: Try to update original user's email to the other user's email + updates = UserUpdate(email="other.user@example.com") + + with pytest.raises(UserAlreadyExistsError): + auth_service.update_user(self.user.id, updates) + + def test_i_cannot_update_email_to_same_email_without_triggering_verification_reset( + self, + auth_service: AuthService + ): + """Success: Updating email to the same email does not reset is_verified.""" + + from myauth.models.user import UserUpdate + + # Setup: Ensure user is verified + verify_updates = UserUpdate(is_verified=True) + auth_service.update_user(self.user.id, verify_updates) + + # Execute: Update with the same email + updates = UserUpdate(email=self.original_email) + updated_user = auth_service.update_user(self.user.id, updates) + + # Verify: is_verified should remain True + assert updated_user.is_verified is True + assert updated_user.email == self.original_email + + def test_i_can_update_with_empty_updates(self, auth_service: AuthService): + """Success: Updating with empty UserUpdate does not cause errors.""" + + from myauth.models.user import UserUpdate + + updates = UserUpdate() + updated_user = auth_service.update_user(self.user.id, updates) + + # Verify: User is returned and fields are unchanged + assert updated_user.id == self.user.id + assert updated_user.email == self.original_email + assert updated_user.username == self.original_username + + def test_email_verification_reset_only_when_email_actually_changes( + self, + auth_service: AuthService + ): + """Success: is_verified is reset to False only when email actually changes.""" + + from myauth.models.user import UserUpdate + + # Setup: Set user as verified + verify_updates = UserUpdate(is_verified=True) + auth_service.update_user(self.user.id, verify_updates) + verified_user = auth_service.user_repository.get_user_by_id(self.user.id) + assert verified_user.is_verified is True + + # Test 1: Update with same email - verification should remain + same_email_updates = UserUpdate(email=self.original_email, username="SameEmailTest") + updated_user = auth_service.update_user(self.user.id, same_email_updates) + assert updated_user.is_verified is True + + # Test 2: Update with different email - verification should reset + different_email_updates = UserUpdate(email="completely.new@example.com") + updated_user = auth_service.update_user(self.user.id, different_email_updates) + assert updated_user.is_verified is False + assert updated_user.email == "completely.new@example.com" + # class TestAuthServiceResetVerification(object): # """Tests for password reset and email verification flows.""" #