5 Commits

Author SHA1 Message Date
5f652b436e Added user profile modification 2025-11-11 09:30:04 +01:00
43603fe66f Added auto admin creation 2025-11-10 15:54:05 +01:00
ea7cf786cb Added user admin auto creation for Sqlite 2025-10-29 22:07:27 +01:00
5d869e3793 Updated to expose auth_app instead of app_route 2025-10-28 20:32:30 +01:00
0138ac247a Changed module name from my_auth to myauth
Changed encryption algorithm to argon2
Added unit tests
2025-10-19 23:17:38 +02:00
16 changed files with 1925 additions and 476 deletions

2
.gitignore vendored
View File

@@ -1,3 +1,5 @@
**/UserDB
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[codz]

View File

@@ -16,3 +16,4 @@ clean-build: clean-package
# Alias to clean everything
clean: clean-build
find . -name "UserDB" -exec rm -f {} +

View File

@@ -75,17 +75,18 @@ This example configures myauth to use MongoDB as its backend.
```Python
from fastapi import FastAPI
from myauth import create_app_router_for_mongoDB
from myauth import create_auth_app_for_mongodb
# 1. Initialize FastAPI app
app = FastAPI()
# 2. Configure repositories for MongoDB
auth_router = create_app_router_for_mongoDB(mongodb_url="mongodb://localhost:27017",
jwt_secret="THIS_NEEDS_TO_BE_CHANGED")
auth_app = create_auth_app_for_mongodb(mongodb_url="mongodb://localhost:27017",
jwt_secret="THIS_NEEDS_TO_BE_CHANGED")
# 3. Include the authentication routes
app.include_router(auth_router)
app.mount("/auth", auth_app)
@app.get("/")
@@ -100,19 +101,20 @@ This example configures myauth to use PostgreSQL as its backend.
```Python
from fastapi import FastAPI
from myauth import create_app_router_for_postgreSQL
from myauth import create_auth_app_for_postgresql
# 1. Initialize FastAPI app
app = FastAPI()
# 2. Configure repositories for MongoDB
auth_router = create_app_router_for_mongoDB(postgresql_url="mongodb://localhost:27017",
username="admin",
password="password",
jwt_secret="THIS_NEEDS_TO_BE_CHANGED")
# 2. Configure repositories for PostgreSQL
auth_app = create_auth_app_for_postgresql(postgresql_url="mongodb://localhost:27017",
username="admin",
password="password",
jwt_secret="THIS_NEEDS_TO_BE_CHANGED")
# 3. Include the authentication routes
app.include_router(auth_router)
app.mount("/auth", auth_app)
@app.get("/")
@@ -127,16 +129,17 @@ This example configures myauth to use SQLite, which is ideal for development or
```Python
from fastapi import FastAPI
from myauth import create_app_router_for_sqlite
from myauth import create_auth_app_for_sqlite
# 1. Initialize FastAPI app
app = FastAPI()
# 2. Configure repositories for MongoDB
auth_router = create_app_router_for_sqlite(db_path="./UserDB", jwt_secret="THIS_NEEDS_TO_BE_CHANGED")
auth_app = create_auth_app_for_sqlite(db_path="./UserDB", jwt_secret="THIS_NEEDS_TO_BE_CHANGED")
# 3. Include the authentication routes
app.include_router(auth_router)
app.mount("/auth", auth_app)
@app.get("/")
@@ -159,8 +162,9 @@ pip install "myauth[email]"
```Python
from fastapi import FastAPI
from myauth import create_auth_app_for_sqlite
from myauth.emailing.smtp import SMTPEmailService
from myauth import create_app_router_for_sqlite
# 1. Initialize FastAPI app
app = FastAPI()
@@ -175,11 +179,11 @@ email_service = SMTPEmailService(
)
# 3. Configure repositories for MongoDB
auth_router = create_app_router_for_sqlite(db_path="./UserDB", jwt_secret="THIS_NEEDS_TO_BE_CHANGED",
email_service=email_service)
auth_app = create_auth_app_for_sqlite(db_path="./UserDB", jwt_secret="THIS_NEEDS_TO_BE_CHANGED",
email_service=email_service)
# 4. Include the authentication routes
app.include_router(auth_router)
app.mount("/auth", auth_app)
```
### Option 2: Create a Custom Email Service
@@ -189,8 +193,9 @@ If you use a third-party service (like AWS SES, Mailgun) that requires an API, y
```Python
from fastapi import FastAPI
from myauth import create_auth_app_for_sqlite
from myauth.emailing.base import EmailService
from myauth import create_app_router_for_sqlite
# 1. Initialize FastAPI app
app = FastAPI()
@@ -220,11 +225,11 @@ class CustomEmailService(EmailService):
email_service = CustomEmailService(api_key="YOUR_API_KEY_HERE")
# 3. Pass your custom service to AuthService
auth_router = create_app_router_for_sqlite(db_path="./UserDB", jwt_secret="THIS_NEEDS_TO_BE_CHANGED",
email_service=email_service)
auth_app = create_auth_app_for_sqlite(db_path="./UserDB", jwt_secret="THIS_NEEDS_TO_BE_CHANGED",
email_service=email_service)
# 4. Include the authentication routes
app.include_router(auth_router)
app.mount("/auth", auth_app)
```
@@ -242,6 +247,8 @@ POST /auth/password-reset # Reset password with token
POST /auth/verify-email-request # Request email verification
POST /auth/verify-email # Verify email with token
GET /auth/me # Get current user info
PATCH /auth/me # Update own profile
PATCH /auth/users/{user_id} # Update user profile (by admin)
```
### Error Handling
@@ -346,3 +353,9 @@ pytest tests/
## License
MIT
## Release History
* 0.1.0 - Initial Release
* 0.2.0 - Added admin user auto creation
* 0.2.1 - Added user profile update (PATCH on /me and /users/{user_id})

View File

@@ -7,7 +7,7 @@ build-backend = "setuptools.build_meta"
[project]
name = "myauth"
version = "0.5.0"
version = "0.2.1"
description = "A reusable, modular authentication system for FastAPI applications with pluggable database backends."
readme = "README.md"
authors = [

22
src/main.py Normal file
View File

@@ -0,0 +1,22 @@
import uvicorn
from fastapi import FastAPI
from myauth import create_auth_app_for_sqlite
# 1. Initialize FastAPI app
app = FastAPI()
# 2. Configure repositories for MongoDB
auth_app = create_auth_app_for_sqlite(db_path="./UserDB", jwt_secret="THIS_NEEDS_TO_BE_CHANGED")
# 3. Include the authentication routes
app.mount("/auth", auth_app)
@app.get("/")
def read_root():
return {"message": "Application running with MyAuth (SQLite)"}
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)

View File

@@ -1,6 +1,9 @@
from .factory import create_sqlite_auth_service, create_app_router_for_sqlite
from .factory import create_sqlite_auth_service, create_auth_app_for_sqlite, create_auth_app_for_mongodb, \
create_auth_app_for_postgresql
__all__ = [
'create_sqlite_auth_service',
'create_app_router_for_sqlite',
'create_auth_app_for_sqlite',
'create_auth_app_for_mongodb',
'create_auth_app_for_postgresql'
]

View File

@@ -1,3 +1,3 @@
from .routes import create_auth_router
from .routes import create_auth_app
__all__ = ["create_auth_router"]
__all__ = ["create_auth_app"]

View File

@@ -4,10 +4,9 @@ FastAPI routes for authentication module.
This module provides ready-to-use FastAPI routes for all authentication
operations. Routes are organized in an APIRouter with /auth prefix.
"""
from typing import Annotated
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi import Depends, HTTPException, status, FastAPI
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from ..core.auth import AuthService
@@ -18,13 +17,13 @@ from ..models.email_verification import (
PasswordResetConfirm
)
from ..models.token import AccessTokenResponse, RefreshTokenRequest
from ..models.user import UserCreate, UserResponse
from ..models.user import UserCreate, UserResponse, UserUpdate, UserUpdateMe
# OAuth2 scheme for token authentication
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/login")
def create_auth_router(auth_service: AuthService) -> APIRouter:
def create_auth_app(auth_service: AuthService) -> FastAPI:
"""
Create and configure the authentication router.
@@ -40,13 +39,13 @@ def create_auth_router(auth_service: AuthService) -> APIRouter:
Example:
>>> from fastapi import FastAPI
>>> from auth_module.api.routes import create_auth_router
>>> from myauth.api.routes import create_auth_app
>>>
>>> app = FastAPI()
>>> auth_router = create_auth_router(auth_service)
>>> app.include_router(auth_router)
>>> auth_api = create_auth_app(auth_service)
>>> app.mount(auth_api)
"""
router = APIRouter(prefix="/auth", tags=["authentication"])
auth_app = FastAPI(prefix="/auth", tags=["authentication"])
def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]) -> UserResponse:
"""
@@ -65,7 +64,7 @@ def create_auth_router(auth_service: AuthService) -> APIRouter:
HTTPException: 401 if token is invalid or expired.
Example:
>>> @app.get("/protected")
>>> @auth_app.get("/protected")
>>> def protected_route(user: UserResponse = Depends(get_current_user)):
>>> return {"user_id": user.id}
"""
@@ -86,7 +85,36 @@ def create_auth_router(auth_service: AuthService) -> APIRouter:
detail=e.message
)
@router.post(
def get_current_admin(current_user: Annotated[UserResponse, Depends(get_current_user)]) -> UserResponse:
"""
Dependency to ensure the current user has admin role.
This dependency can be used in any route that requires admin privileges.
It first validates the user's token (via get_current_user) then checks
for the 'admin' role.
Args:
current_user: The authenticated user (injected by get_current_user dependency).
Returns:
The current authenticated admin user.
Raises:
HTTPException: 403 if user does not have admin role.
Example:
>>> @auth_app.patch("/users/{user_id}")
>>> def update_user(user_id: str, admin: UserResponse = Depends(get_current_admin)):
>>> # Only admins can access this route
"""
if "admin" not in current_user.roles:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Admin privileges required"
)
return current_user
@auth_app.post(
"/register",
response_model=UserResponse,
status_code=status.HTTP_201_CREATED,
@@ -128,7 +156,7 @@ def create_auth_router(auth_service: AuthService) -> APIRouter:
detail=e.message
)
@router.post(
@auth_app.post(
"/login",
response_model=AccessTokenResponse,
summary="Login with email and password",
@@ -163,7 +191,7 @@ def create_auth_router(auth_service: AuthService) -> APIRouter:
detail=e.message
)
@router.post(
@auth_app.post(
"/logout",
status_code=status.HTTP_204_NO_CONTENT,
summary="Logout user",
@@ -185,7 +213,7 @@ def create_auth_router(auth_service: AuthService) -> APIRouter:
auth_service.logout(request.refresh_token)
return None
@router.post(
@auth_app.post(
"/refresh",
response_model=AccessTokenResponse,
summary="Refresh access token",
@@ -217,7 +245,7 @@ def create_auth_router(auth_service: AuthService) -> APIRouter:
detail=e.message
)
@router.post(
@auth_app.post(
"/password-reset-request",
status_code=status.HTTP_200_OK,
summary="Request password reset",
@@ -252,7 +280,7 @@ def create_auth_router(auth_service: AuthService) -> APIRouter:
detail=e.message
)
@router.post(
@auth_app.post(
"/password-reset",
status_code=status.HTTP_200_OK,
summary="Reset password with token",
@@ -284,7 +312,7 @@ def create_auth_router(auth_service: AuthService) -> APIRouter:
detail=e.message
)
@router.post(
@auth_app.post(
"/verify-email-request",
status_code=status.HTTP_200_OK,
summary="Request email verification",
@@ -320,7 +348,7 @@ def create_auth_router(auth_service: AuthService) -> APIRouter:
detail=e.message
)
@router.get(
@auth_app.get(
"/verify-email",
status_code=status.HTTP_200_OK,
summary="Verify email with token",
@@ -352,7 +380,7 @@ def create_auth_router(auth_service: AuthService) -> APIRouter:
detail=e.message
)
@router.get(
@auth_app.get(
"/me",
response_model=UserResponse,
summary="Get current user",
@@ -376,4 +404,122 @@ def create_auth_router(auth_service: AuthService) -> APIRouter:
"""
return current_user
return router
@auth_app.patch(
"/me",
response_model=UserResponse,
summary="Update own profile",
description="Update the current user's profile (restricted fields)."
)
def update_me(
updates: UserUpdateMe,
current_user: Annotated[UserResponse, Depends(get_current_user)]
) -> UserResponse:
"""
Update the current authenticated user's profile.
This endpoint allows users to update their own profile with restrictions:
- Allowed fields: email, username, password, user_settings
- Forbidden fields: roles, is_active, is_verified (must use other flows)
When changing password, the current session is preserved if refresh_token
is provided in the request body.
Args:
updates: User update data (restricted fields only).
current_user: The authenticated user (injected by dependency).
Returns:
Updated user information.
Raises:
HTTPException: 409 if new email already exists.
HTTPException: 422 if validation fails.
"""
try:
# Convert UserUpdateMe to UserUpdate (only allowed fields)
from ..models.user import UserUpdate
user_update = UserUpdate(
email=updates.email,
username=updates.username,
password=updates.password,
user_settings=updates.user_settings
)
# Update user with optional refresh_token to preserve session
updated_user = auth_service.update_user(
user_id=current_user.id,
updates=user_update,
refresh_token=updates.refresh_token
)
return UserResponse(
id=updated_user.id,
email=updated_user.email,
username=updated_user.username,
roles=updated_user.roles,
user_settings=updated_user.user_settings,
created_at=updated_user.created_at,
updated_at=updated_user.updated_at
)
except AuthError as e:
raise HTTPException(
status_code=e.status_code,
detail=e.message
)
@auth_app.patch(
"/users/{user_id}",
response_model=UserResponse,
summary="Update any user (admin only)",
description="Update any user's profile with full access to all fields (admin only)."
)
def update_user_by_id(
user_id: str,
updates: UserUpdate,
admin: Annotated[UserResponse, Depends(get_current_admin)]
) -> UserResponse:
"""
Update any user's profile (admin only).
This endpoint allows administrators to update any user with full access
to all fields including roles, is_active, and is_verified.
Args:
user_id: The ID of the user to update.
updates: User update data (all fields allowed).
admin: The authenticated admin user (injected by dependency).
Returns:
Updated user information.
Raises:
HTTPException: 403 if user is not an admin.
HTTPException: 404 if user not found.
HTTPException: 409 if new email already exists.
HTTPException: 422 if validation fails.
"""
try:
# Admin updates don't preserve refresh tokens (security)
updated_user = auth_service.update_user(
user_id=user_id,
updates=updates,
refresh_token=None
)
return UserResponse(
id=updated_user.id,
email=updated_user.email,
username=updated_user.username,
roles=updated_user.roles,
user_settings=updated_user.user_settings,
created_at=updated_user.created_at,
updated_at=updated_user.updated_at
)
except AuthError as e:
raise HTTPException(
status_code=e.status_code,
detail=e.message
)
return auth_app

View File

@@ -5,442 +5,568 @@ This module provides the main authentication service that orchestrates
all authentication operations including registration, login, token management,
password reset, and email verification.
"""
import os
from datetime import datetime
from typing import Optional
from .password import PasswordManager
from .token import TokenManager
from ..persistence.base import UserRepository, TokenRepository
from ..models.user import UserCreate, UserInDB, UserUpdate
from ..models.token import AccessTokenResponse, TokenData
from ..emailing.base import EmailService
from ..exceptions import (
InvalidCredentialsError,
UserNotFoundError,
AccountDisabledError,
ExpiredTokenError,
InvalidTokenError,
RevokedTokenError
InvalidCredentialsError,
UserNotFoundError,
AccountDisabledError,
ExpiredTokenError,
InvalidTokenError,
RevokedTokenError, UserAlreadyExistsError
)
from ..models.token import AccessTokenResponse, TokenData
from ..models.user import UserCreate, UserInDB, UserUpdate, UserCreateNoValidation
from ..persistence.base import UserRepository, TokenRepository
class AuthService:
"""
Main authentication service.
This service orchestrates all authentication-related operations by
coordinating between password management, token management, and
persistence layers.
Attributes:
user_repository: Repository for user persistence operations.
token_repository: Repository for token persistence operations.
password_manager: Manager for password hashing and verification.
token_manager: Manager for token creation and validation.
email_service: Optional service for sending emails.
"""
def __init__(
self,
user_repository: UserRepository,
token_repository: TokenRepository,
password_manager: PasswordManager,
token_manager: TokenManager,
email_service: Optional[EmailService] = None
):
"""
Main authentication service.
Initialize the authentication service.
This service orchestrates all authentication-related operations by
coordinating between password management, token management, and
persistence layers.
Attributes:
user_repository: Repository for user persistence operations.
token_repository: Repository for token persistence operations.
Args:
user_repository: Repository for user persistence.
token_repository: Repository for token persistence.
password_manager: Manager for password hashing and verification.
token_manager: Manager for token creation and validation.
email_service: Optional service for sending emails.
email_service: Optional service for sending emails (password reset, verification).
"""
self.user_repository = user_repository
self.token_repository = token_repository
self.password_manager = password_manager
self.token_manager = token_manager
self.email_service = email_service
def create_admin_if_needed(self, admin_email: str = None, admin_username: str = None, admin_password: str = None):
"""
Create the admin user if it does not exist. This function checks the current number
of users in the system. If there are no users present, it creates a user with
administrative privileges using the provided credentials or environment variables
as defaults.
:param admin_email: The email of the admin user. Defaults to "AUTH_ADMIN_EMAIL"
environment variable or "admin@myauth.com" if not provided.
:type admin_email: str, optional
:param admin_username: The username of the admin user. Defaults to
"AUTH_ADMIN_USERNAME" environment variable or "admin" if not provided.
:type admin_username: str, optional
:param admin_password: The password of the admin user. Defaults to
"AUTH_ADMIN_PASSWORD" environment variable or "admin" if not provided.
:type admin_password: str, optional
:return: True if an admin user is created, otherwise False.
:rtype: bool
"""
# create the admin user if it doesn't exist
nb_users = self.count_users()
if nb_users == 0:
admin_email = admin_email or os.getenv("AUTH_ADMIN_EMAIL", "admin@myauth.com")
admin_username = admin_username or os.getenv("AUTH_ADMIN_USERNAME", "admin")
admin_password = admin_password or os.getenv("AUTH_ADMIN_PASSWORD", "admin")
admin_user = UserCreateNoValidation(
email=admin_email,
username=admin_username,
password=admin_password,
roles=["admin"]
)
self.register(admin_user)
return True
return False
def register(self, user_data: UserCreate | UserCreateNoValidation) -> UserInDB:
"""
Register a new user.
This method creates a new user account with hashed password.
The user's email is initially unverified.
Args:
user_data: User registration data including password.
Returns:
The created user (without password).
Raises:
UserAlreadyExistsError: If email is already registered.
Example:
>>> user_data = UserCreate(
... email="user@example.com",
... username="john_doe",
... password="SecurePass123!"
... )
>>> user = auth_service.register(user_data)
"""
hashed_password = self.password_manager.hash_password(user_data.password)
user = self.user_repository.create_user(user_data, hashed_password)
return user
def login(self, email: str, password: str) -> tuple[UserInDB, AccessTokenResponse]:
"""
Authenticate a user and create tokens.
This method verifies credentials, checks account status, and generates
both access and refresh tokens.
Args:
email: User's email address.
password: User's plain text password.
Returns:
Tuple of (user, tokens) where tokens contains access_token and refresh_token.
Raises:
InvalidCredentialsError: If email or password is incorrect.
AccountDisabledError: If user account is disabled.
Example:
>>> user, tokens = auth_service.login("user@example.com", "password")
>>> print(tokens.access_token)
"""
# Get user by email
user = self.user_repository.get_user_by_email(email)
if not user:
raise InvalidCredentialsError()
# Verify password
if not self.password_manager.verify_password(password, user.hashed_password):
raise InvalidCredentialsError()
# Check if account is active
if not user.is_active:
raise AccountDisabledError()
# Create tokens
access_token = self.token_manager.create_access_token(user)
refresh_token = self.token_manager.create_refresh_token()
# Store refresh token in database
token_data = TokenData(
token=refresh_token,
token_type="refresh",
user_id=user.id,
expires_at=self.token_manager.get_refresh_token_expiration(),
created_at=datetime.utcnow(),
is_revoked=False
)
self.token_repository.save_token(token_data)
tokens = AccessTokenResponse(
access_token=access_token,
refresh_token=refresh_token,
token_type="bearer"
)
return user, tokens
def refresh_access_token(self, refresh_token: str) -> AccessTokenResponse:
"""
Create a new access token using a refresh token.
This method validates the refresh token and generates a new access token
without requiring the user to re-enter their password.
Args:
refresh_token: The refresh token to exchange.
Returns:
New access and refresh tokens.
Raises:
InvalidTokenError: If refresh token is invalid.
ExpiredTokenError: If refresh token has expired.
RevokedTokenError: If refresh token has been revoked.
UserNotFoundError: If user no longer exists.
AccountDisabledError: If user account is disabled.
Example:
>>> tokens = auth_service.refresh_access_token(old_refresh_token)
>>> print(tokens.access_token)
"""
# Validate refresh token
token_data = self.token_repository.get_token(refresh_token, "refresh")
if not token_data:
raise InvalidTokenError("Invalid refresh token")
if token_data.is_revoked:
raise RevokedTokenError()
if token_data.expires_at < datetime.utcnow():
raise ExpiredTokenError()
# Get user
user = self.user_repository.get_user_by_id(token_data.user_id)
if not user:
raise UserNotFoundError()
if not user.is_active:
raise AccountDisabledError()
# Create new tokens
access_token = self.token_manager.create_access_token(user)
new_refresh_token = self.token_manager.create_refresh_token()
# Revoke old refresh token
self.token_repository.revoke_token(refresh_token)
# Store new refresh token
new_token_data = TokenData(
token=new_refresh_token,
token_type="refresh",
user_id=user.id,
expires_at=self.token_manager.get_refresh_token_expiration(),
created_at=datetime.utcnow(),
is_revoked=False
)
self.token_repository.save_token(new_token_data)
return AccessTokenResponse(
access_token=access_token,
refresh_token=new_refresh_token,
token_type="bearer"
)
def logout(self, refresh_token: str) -> bool:
"""
Logout a user by revoking their refresh token.
This prevents the refresh token from being used to obtain new
access tokens. The current access token will remain valid until
it expires naturally.
Args:
refresh_token: The refresh token to revoke.
Returns:
True if logout was successful, False if token not found.
Example:
>>> auth_service.logout(refresh_token)
True
"""
return self.token_repository.revoke_token(refresh_token)
def request_password_reset(self, email: str) -> str:
"""
Generate a password reset token for a user.
This method creates a secure token that can be sent to the user
via email to reset their password. If an email service is configured,
the email is sent automatically. Otherwise, the token is returned
for manual handling.
Args:
email: User's email address.
Returns:
The password reset token to be sent via email.
Raises:
UserNotFoundError: If email is not registered.
Example:
>>> token = auth_service.request_password_reset("user@example.com")
>>> # Token is automatically sent via email if service is configured
"""
user = self.user_repository.get_user_by_email(email)
if not user:
raise UserNotFoundError(f"No user found with email {email}")
# Create reset token
reset_token = self.token_manager.create_password_reset_token()
# Store token in database
token_data = TokenData(
token=reset_token,
token_type="password_reset",
user_id=user.id,
expires_at=self.token_manager.get_password_reset_token_expiration(),
created_at=datetime.utcnow(),
is_revoked=False
)
self.token_repository.save_token(token_data)
# Send email if service is configured
if self.email_service:
self.email_service.send_password_reset_email(email, reset_token)
return reset_token
def reset_password(self, token: str, new_password: str) -> bool:
"""
Reset a user's password using a reset token.
This method validates the reset token and updates the user's password.
All existing refresh tokens for the user are revoked for security.
Args:
token: Password reset token.
new_password: New plain text password (will be hashed).
Returns:
True if password was reset successfully.
Raises:
InvalidTokenError: If reset token is invalid.
ExpiredTokenError: If reset token has expired.
RevokedTokenError: If reset token has been used.
UserNotFoundError: If user no longer exists.
Example:
>>> auth_service.reset_password(token, "NewSecurePass123!")
True
"""
# Validate reset token
token_data = self.token_repository.get_token(token, "password_reset")
if not token_data:
raise InvalidTokenError("Invalid password reset token")
if token_data.is_revoked:
raise RevokedTokenError("Password reset token has already been used")
def __init__(
self,
user_repository: UserRepository,
token_repository: TokenRepository,
password_manager: PasswordManager,
token_manager: TokenManager,
email_service: Optional[EmailService] = None
):
"""
Initialize the authentication service.
Args:
user_repository: Repository for user persistence.
token_repository: Repository for token persistence.
password_manager: Manager for password hashing and verification.
token_manager: Manager for token creation and validation.
email_service: Optional service for sending emails (password reset, verification).
"""
self.user_repository = user_repository
self.token_repository = token_repository
self.password_manager = password_manager
self.token_manager = token_manager
self.email_service = email_service
def register(self, user_data: UserCreate) -> UserInDB:
"""
Register a new user.
This method creates a new user account with hashed password.
The user's email is initially unverified.
Args:
user_data: User registration data including password.
Returns:
The created user (without password).
Raises:
UserAlreadyExistsError: If email is already registered.
Example:
>>> user_data = UserCreate(
... email="user@example.com",
... username="john_doe",
... password="SecurePass123!"
... )
>>> user = auth_service.register(user_data)
"""
hashed_password = self.password_manager.hash_password(user_data.password)
user = self.user_repository.create_user(user_data, hashed_password)
return user
def login(self, email: str, password: str) -> tuple[UserInDB, AccessTokenResponse]:
"""
Authenticate a user and create tokens.
This method verifies credentials, checks account status, and generates
both access and refresh tokens.
Args:
email: User's email address.
password: User's plain text password.
Returns:
Tuple of (user, tokens) where tokens contains access_token and refresh_token.
Raises:
InvalidCredentialsError: If email or password is incorrect.
AccountDisabledError: If user account is disabled.
Example:
>>> user, tokens = auth_service.login("user@example.com", "password")
>>> print(tokens.access_token)
"""
# Get user by email
user = self.user_repository.get_user_by_email(email)
if not user:
raise InvalidCredentialsError()
# Verify password
if not self.password_manager.verify_password(password, user.hashed_password):
raise InvalidCredentialsError()
# Check if account is active
if not user.is_active:
raise AccountDisabledError()
# Create tokens
access_token = self.token_manager.create_access_token(user)
refresh_token = self.token_manager.create_refresh_token()
# Store refresh token in database
token_data = TokenData(
token=refresh_token,
token_type="refresh",
user_id=user.id,
expires_at=self.token_manager.get_refresh_token_expiration(),
created_at=datetime.utcnow(),
is_revoked=False
)
self.token_repository.save_token(token_data)
tokens = AccessTokenResponse(
access_token=access_token,
refresh_token=refresh_token,
token_type="bearer"
)
return user, tokens
def refresh_access_token(self, refresh_token: str) -> AccessTokenResponse:
"""
Create a new access token using a refresh token.
This method validates the refresh token and generates a new access token
without requiring the user to re-enter their password.
Args:
refresh_token: The refresh token to exchange.
Returns:
New access and refresh tokens.
Raises:
InvalidTokenError: If refresh token is invalid.
ExpiredTokenError: If refresh token has expired.
RevokedTokenError: If refresh token has been revoked.
UserNotFoundError: If user no longer exists.
AccountDisabledError: If user account is disabled.
Example:
>>> tokens = auth_service.refresh_access_token(old_refresh_token)
>>> print(tokens.access_token)
"""
# Validate refresh token
token_data = self.token_repository.get_token(refresh_token, "refresh")
if not token_data:
raise InvalidTokenError("Invalid refresh token")
if token_data.is_revoked:
raise RevokedTokenError()
if token_data.expires_at < datetime.utcnow():
raise ExpiredTokenError()
# Get user
user = self.user_repository.get_user_by_id(token_data.user_id)
if not user:
raise UserNotFoundError()
if not user.is_active:
raise AccountDisabledError()
# Create new tokens
access_token = self.token_manager.create_access_token(user)
new_refresh_token = self.token_manager.create_refresh_token()
# Revoke old refresh token
self.token_repository.revoke_token(refresh_token)
# Store new refresh token
new_token_data = TokenData(
token=new_refresh_token,
token_type="refresh",
user_id=user.id,
expires_at=self.token_manager.get_refresh_token_expiration(),
created_at=datetime.utcnow(),
is_revoked=False
)
self.token_repository.save_token(new_token_data)
return AccessTokenResponse(
access_token=access_token,
refresh_token=new_refresh_token,
token_type="bearer"
)
def logout(self, refresh_token: str) -> bool:
"""
Logout a user by revoking their refresh token.
This prevents the refresh token from being used to obtain new
access tokens. The current access token will remain valid until
it expires naturally.
Args:
refresh_token: The refresh token to revoke.
Returns:
True if logout was successful, False if token not found.
Example:
>>> auth_service.logout(refresh_token)
True
"""
return self.token_repository.revoke_token(refresh_token)
def request_password_reset(self, email: str) -> str:
"""
Generate a password reset token for a user.
This method creates a secure token that can be sent to the user
via email to reset their password. If an email service is configured,
the email is sent automatically. Otherwise, the token is returned
for manual handling.
Args:
email: User's email address.
Returns:
The password reset token to be sent via email.
Raises:
UserNotFoundError: If email is not registered.
Example:
>>> token = auth_service.request_password_reset("user@example.com")
>>> # Token is automatically sent via email if service is configured
"""
user = self.user_repository.get_user_by_email(email)
if not user:
raise UserNotFoundError(f"No user found with email {email}")
# Create reset token
reset_token = self.token_manager.create_password_reset_token()
# Store token in database
token_data = TokenData(
token=reset_token,
token_type="password_reset",
user_id=user.id,
expires_at=self.token_manager.get_password_reset_token_expiration(),
created_at=datetime.utcnow(),
is_revoked=False
)
self.token_repository.save_token(token_data)
# Send email if service is configured
if self.email_service:
self.email_service.send_password_reset_email(email, reset_token)
return reset_token
def reset_password(self, token: str, new_password: str) -> bool:
"""
Reset a user's password using a reset token.
This method validates the reset token and updates the user's password.
All existing refresh tokens for the user are revoked for security.
Args:
token: Password reset token.
new_password: New plain text password (will be hashed).
Returns:
True if password was reset successfully.
Raises:
InvalidTokenError: If reset token is invalid.
ExpiredTokenError: If reset token has expired.
RevokedTokenError: If reset token has been used.
UserNotFoundError: If user no longer exists.
Example:
>>> auth_service.reset_password(token, "NewSecurePass123!")
True
"""
# Validate reset token
token_data = self.token_repository.get_token(token, "password_reset")
if not token_data:
raise InvalidTokenError("Invalid password reset token")
if token_data.is_revoked:
raise RevokedTokenError("Password reset token has already been used")
if token_data.expires_at < datetime.utcnow():
raise ExpiredTokenError("Password reset token has expired")
if token_data.expires_at < datetime.utcnow():
raise ExpiredTokenError("Password reset token has expired")
# Get user
user = self.user_repository.get_user_by_id(token_data.user_id)
if not user:
raise UserNotFoundError()
# Get user
user = self.user_repository.get_user_by_id(token_data.user_id)
if not user:
raise UserNotFoundError()
# Hash new password
hashed_password = self.password_manager.hash_password(new_password)
# Update user password
updates = UserUpdate(password=hashed_password)
self.user_repository.update_user(user.id, updates)
# Revoke the reset token
self.token_repository.revoke_token(token)
# Revoke all user's refresh tokens for security
self.token_repository.revoke_all_user_tokens(user.id, "refresh")
return True
def request_email_verification(self, email: str) -> str:
"""
Generate an email verification token for a user.
This method creates a JWT token that can be sent to the user
to verify their email address. If an email service is configured,
the email is sent automatically. Otherwise, the token is returned
for manual handling.
Args:
email: User's email address.
Returns:
The email verification token (JWT) to be sent via email.
Raises:
UserNotFoundError: If email is not registered.
Example:
>>> token = auth_service.request_email_verification("user@example.com")
>>> # Token is automatically sent via email if service is configured
"""
user = self.user_repository.get_user_by_email(email)
if not user:
raise UserNotFoundError(f"No user found with email {email}")
verification_token = self.token_manager.create_email_verification_token(email)
# Hash new password
hashed_password = self.password_manager.hash_password(new_password)
# Update user password
updates = UserUpdate(password=hashed_password)
self.user_repository.update_user(user.id, updates)
# Revoke the reset token
self.token_repository.revoke_token(token)
# Revoke all user's refresh tokens for security
self.token_repository.revoke_all_user_tokens(user.id, "refresh")
return True
def request_email_verification(self, email: str) -> str:
"""
Generate an email verification token for a user.
This method creates a JWT token that can be sent to the user
to verify their email address. If an email service is configured,
the email is sent automatically. Otherwise, the token is returned
for manual handling.
Args:
email: User's email address.
Returns:
The email verification token (JWT) to be sent via email.
Raises:
UserNotFoundError: If email is not registered.
Example:
>>> token = auth_service.request_email_verification("user@example.com")
>>> # Token is automatically sent via email if service is configured
"""
user = self.user_repository.get_user_by_email(email)
if not user:
raise UserNotFoundError(f"No user found with email {email}")
verification_token = self.token_manager.create_email_verification_token(email)
# Send email if service is configured
if self.email_service:
self.email_service.send_verification_email(email, verification_token)
return verification_token
def verify_email(self, token: str) -> bool:
"""
Verify a user's email address using a verification token.
# Send email if service is configured
if self.email_service:
self.email_service.send_verification_email(email, verification_token)
This method decodes the JWT token, extracts the email, and marks
the user's email as verified.
return verification_token
def verify_email(self, token: str) -> bool:
"""
Verify a user's email address using a verification token.
This method decodes the JWT token, extracts the email, and marks
the user's email as verified.
Args:
token: Email verification token (JWT).
Returns:
True if email was verified successfully.
Raises:
InvalidTokenError: If verification token is invalid.
ExpiredTokenError: If verification token has expired.
UserNotFoundError: If user no longer exists.
Example:
>>> auth_service.verify_email(token)
True
"""
# Decode and validate token
email = self.token_manager.decode_email_verification_token(token)
# Get user
user = self.user_repository.get_user_by_email(email)
if not user:
raise UserNotFoundError(f"No user found with email {email}")
# Update user's verified status
updates = UserUpdate(is_verified=True)
self.user_repository.update_user(user.id, updates)
return True
def get_current_user(self, access_token: str) -> UserInDB:
"""
Retrieve the current user from an access token.
This method decodes and validates the JWT access token and
retrieves the corresponding user from the database.
Args:
access_token: JWT access token.
Returns:
The user associated with the token.
Raises:
InvalidTokenError: If access token is invalid.
ExpiredTokenError: If access token has expired.
UserNotFoundError: If user no longer exists.
AccountDisabledError: If user account is disabled.
Example:
>>> user = auth_service.get_current_user(access_token)
>>> print(user.email)
"""
# Decode and validate token
token_payload = self.token_manager.decode_access_token(access_token)
# Get user
user = self.user_repository.get_user_by_id(token_payload.sub)
if not user:
raise UserNotFoundError()
if not user.is_active:
raise AccountDisabledError()
return user
Args:
token: Email verification token (JWT).
Returns:
True if email was verified successfully.
Raises:
InvalidTokenError: If verification token is invalid.
ExpiredTokenError: If verification token has expired.
UserNotFoundError: If user no longer exists.
Example:
>>> auth_service.verify_email(token)
True
"""
# Decode and validate token
email = self.token_manager.decode_email_verification_token(token)
# Get user
user = self.user_repository.get_user_by_email(email)
if not user:
raise UserNotFoundError(f"No user found with email {email}")
# Update user's verified status
updates = UserUpdate(is_verified=True)
self.user_repository.update_user(user.id, updates)
return True
def get_current_user(self, access_token: str) -> UserInDB:
"""
Retrieve the current user from an access token.
This method decodes and validates the JWT access token and
retrieves the corresponding user from the database.
Args:
access_token: JWT access token.
Returns:
The user associated with the token.
Raises:
InvalidTokenError: If access token is invalid.
ExpiredTokenError: If access token has expired.
UserNotFoundError: If user no longer exists.
AccountDisabledError: If user account is disabled.
Example:
>>> user = auth_service.get_current_user(access_token)
>>> print(user.email)
"""
# Decode and validate token
token_payload = self.token_manager.decode_access_token(access_token)
# Get user
user = self.user_repository.get_user_by_id(token_payload.sub)
if not user:
raise UserNotFoundError()
if not user.is_active:
raise AccountDisabledError()
return user
def count_users(self) -> int:
"""
Counts the total number of users.
This method retrieves and returns the total count of users
from the user repository. It provides functionality for
fetching user count stored in the underlying repository of
users.
:return: The total count of users.
:rtype: int
"""
return self.user_repository.count_users()
def list_users(self, skip: int = 0, limit: int = 100):
"""
Lists users from the user repository with optional pagination.
This method retrieves a list of users, allowing optional pagination
by specifying the number of records to skip and the maximum number
of users to retrieve.
:param skip: The number of users to skip in the result set. Defaults to 0.
:type skip: int
:param limit: The maximum number of users to retrieve. Defaults to 100.
:type limit: int
:return: A list of users retrieved from the repository.
:rtype: list
"""
return self.user_repository.list_users(skip, limit)
def update_user(self, user_id: str, updates: UserUpdate, refresh_token: Optional[str] = None) -> UserInDB:
"""
Update an existing user's information.
This method handles user profile updates with automatic security measures:
- Validates email uniqueness if email is changed
- Automatically sets is_verified=False if email is changed
- Hashes password if provided
- Revokes refresh tokens (except current one) if password is changed
Args:
user_id: The unique user identifier.
updates: Pydantic model containing fields to update.
refresh_token: Optional current refresh token to preserve when password changes.
Returns:
The updated user.
Raises:
UserNotFoundError: If user does not exist.
UserAlreadyExistsError: If new email is already used by another user.
Example:
>>> updates = UserUpdate(email="newemail@example.com")
>>> user = auth_service.update_user(user_id, updates)
>>>
>>> # Update password while preserving current session
>>> updates = UserUpdate(password="NewSecurePass123!")
>>> user = auth_service.update_user(user_id, updates, refresh_token=current_token)
"""
# Get current user to compare changes
current_user = self.user_repository.get_user_by_id(user_id)
if not current_user:
raise UserNotFoundError(f"User with id {user_id} not found")
# Handle email change
if updates.email is not None and updates.email != current_user.email:
# Check if new email is already used by another user
if self.user_repository.email_exists(updates.email):
raise UserAlreadyExistsError(f"Email {updates.email} is already in use")
# Force email verification to false when email changes
updates.is_verified = False
# Handle password change
if updates.password is not None:
# Hash the new password
hashed_password = self.password_manager.hash_password(updates.password)
updates.password = hashed_password
# Revoke all refresh tokens except the current one (if provided)
self.token_repository.revoke_all_user_tokens(user_id, "refresh", except_token=refresh_token)
# Update user in database
updated_user = self.user_repository.update_user(user_id, updates)
return updated_user

View File

@@ -57,9 +57,10 @@ def create_sqlite_auth_service(db_path: str,
return auth_service
def create_app_router_for_sqlite(db_path: str,
jwt_secret: str,
email_service: Optional[EmailService] = None):
def create_auth_app_for_sqlite(db_path: str,
jwt_secret: str,
create_admin_user: bool = True,
email_service: Optional[EmailService] = None):
"""
Creates an application router designed to use with an SQLite database backend.
This function initializes an authentication service using the provided database
@@ -69,10 +70,25 @@ def create_app_router_for_sqlite(db_path: str,
:param db_path: Path to the SQLite database file.
:param jwt_secret: A secret string used for signing and verifying JWT tokens.
:param create_admin_user: Create an admin user if necessary.
:param email_service: An optional email service instance for managing email-related
communication and functionalities during authentication.
:return: An application router configured for handling authentication API routes.
"""
auth_service = create_sqlite_auth_service(db_path, jwt_secret, email_service)
from .api.routes import create_auth_router
return create_auth_router(auth_service)
if create_admin_user:
auth_service.create_admin_if_needed()
from .api.routes import create_auth_app
return create_auth_app(auth_service)
def create_auth_app_for_mongodb(mongodb_url="mongodb://localhost:27017",
jwt_secret="THIS_NEEDS_TO_BE_CHANGED"):
raise NotImplementedError("MongoDB support is not yet implemented.")
def create_auth_app_for_postgresql(postgresql_url="mongodb://localhost:27017",
username="admin",
password="password",
jwt_secret="THIS_NEEDS_TO_BE_CHANGED"):
raise NotImplementedError("PostgreSQL support is not yet implemented.")

View File

@@ -33,6 +33,16 @@ class UserBase(BaseModel):
user_settings: dict = Field(default_factory=dict)
class UserCreateNoValidation(UserBase):
"""
Model for user creation (registration).
This model extends UserBase with a password field
"""
password: str
class UserCreate(UserBase):
"""
Model for user creation (registration).
@@ -143,6 +153,67 @@ class UserUpdate(BaseModel):
Raises:
ValueError: If username is provided but empty or too long.
"""
if value is None:
return None
return validate_username_not_empty(value)
class UserUpdateMe(BaseModel):
"""
Model for user self-update (restricted fields).
This model allows users to update their own profile with restrictions:
- Allowed fields: email, username, password, user_settings
- Forbidden fields: roles, is_active, is_verified
- Optional refresh_token to preserve current session when changing password
Attributes:
email: Optional new email address.
username: Optional new username.
password: Optional new password (will be hashed and validated).
user_settings: Optional new settings dict.
refresh_token: Optional refresh token to preserve when changing password.
"""
email: Optional[EmailStr] = None
username: Optional[str] = None
password: Optional[str] = None
user_settings: Optional[dict] = None
refresh_token: Optional[str] = None
@field_validator('password')
@classmethod
def validate_password_strength(cls, value: Optional[str]) -> Optional[str]:
"""
Validate password meets security requirements if provided.
Args:
value: The password to validate (can be None).
Returns:
The validated password or None.
Raises:
ValueError: If password is provided but does not meet security requirements.
"""
return validate_password_strength(value)
@field_validator('username')
@classmethod
def validate_username(cls, value: Optional[str]) -> Optional[str]:
"""
Validate username if provided.
Args:
value: The username to validate (can be None).
Returns:
The validated username or None.
Raises:
ValueError: If username is provided but empty or too long.
"""
if value is None:
return None
return validate_username_not_empty(value)

View File

@@ -8,6 +8,7 @@ PostgreSQL, custom engines, etc.).
"""
from abc import ABC, abstractmethod
from typing import Optional
from ..models.token import TokenData
from ..models.user import UserCreate, UserInDB, UserUpdate
@@ -121,6 +122,31 @@ class UserRepository(ABC):
"""
pass
@abstractmethod
def list_users(self, skip: int = 0, limit: int = 100) -> list[UserInDB]:
"""
List users with pagination.
Args:
skip (int): Number of users to skip (default: 0)
limit (int): Maximum number of users to return (default: 100)
Returns:
List[UserInDB]: List of users
"""
pass
@abstractmethod
def count_users(self) -> int:
"""
Count total number of users.
Returns:
int: Total number of users in system
"""
pass
class TokenRepository(ABC):
"""
@@ -178,16 +204,19 @@ class TokenRepository(ABC):
pass
@abstractmethod
def revoke_all_user_tokens(self, user_id: str, token_type: str) -> int:
def revoke_all_user_tokens(self, user_id: str, token_type: str, except_token: Optional[str] = None) -> int:
"""
Revoke all tokens of a specific type for a user.
This is useful for logout-all-devices functionality or when a user's
password is changed (invalidating all refresh tokens).
password is changed (invalidating all refresh tokens). Optionally,
a specific token can be excluded from revocation to preserve the
current session.
Args:
user_id: The user whose tokens should be revoked.
token_type: Type of tokens to revoke ("refresh" or "password_reset").
except_token: Optional token string to exclude from revocation.
Returns:
Number of tokens revoked.

View File

@@ -13,9 +13,9 @@ from typing import Optional
from uuid import uuid4
from .base import UserRepository, TokenRepository
from ..models.user import UserCreate, UserInDB, UserUpdate
from ..models.token import TokenData
from ..exceptions import UserAlreadyExistsError, UserNotFoundError
from ..models.token import TokenData
from ..models.user import UserCreate, UserInDB, UserUpdate
class SQLiteUserRepository(UserRepository):
@@ -365,6 +365,51 @@ class SQLiteUserRepository(UserRepository):
cursor.execute("SELECT 1 FROM users WHERE email = ? LIMIT 1", (email,))
return cursor.fetchone() is not None
def list_users(self, skip: int = 0, limit: int = 100) -> list[UserInDB]:
"""
List users with pagination.
Args:
skip: Number of users to skip (default: 0).
limit: Maximum number of users to return (default: 100).
Returns:
List of users.
"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT id,
email,
username,
hashed_password,
roles,
user_settings,
is_verified,
is_active,
created_at,
updated_at
FROM users
ORDER BY created_at DESC LIMIT ?
OFFSET ?
""", (limit, skip))
rows = cursor.fetchall()
return [self._row_to_user(row) for row in rows]
def count_users(self) -> int:
"""
Count total number of users.
Returns:
Total number of users in system.
"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) FROM users")
result = cursor.fetchone()
return result[0] if result else 0
class SQLiteTokenRepository(TokenRepository):
"""
@@ -545,25 +590,44 @@ class SQLiteTokenRepository(TokenRepository):
conn.commit()
return cursor.rowcount > 0
def revoke_all_user_tokens(self, user_id: str, token_type: str) -> int:
def revoke_all_user_tokens(self, user_id: str, token_type: str, except_token: Optional[str] = None) -> int:
"""
Revoke all tokens of a specific type for a user.
This is useful for logout-all-devices functionality or when a user's
password is changed (invalidating all refresh tokens). Optionally,
a specific token can be excluded from revocation to preserve the
current session.
Args:
user_id: The user whose tokens should be revoked.
token_type: Type of tokens to revoke.
token_type: Type of tokens to revoke ("refresh" or "password_reset").
except_token: Optional token string to exclude from revocation.
Returns:
Number of tokens revoked.
"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("""
UPDATE tokens
SET is_revoked = 1
WHERE user_id = ?
AND token_type = ?
""", (user_id, token_type))
if except_token is not None:
# Revoke all tokens except the specified one
cursor.execute("""
UPDATE tokens
SET is_revoked = 1
WHERE user_id = ?
AND token_type = ?
AND token != ?
""", (user_id, token_type, except_token))
else:
# Revoke all tokens
cursor.execute("""
UPDATE tokens
SET is_revoked = 1
WHERE user_id = ?
AND token_type = ?
""", (user_id, token_type))
conn.commit()
return cursor.rowcount

View File

@@ -12,7 +12,7 @@ import pytest
from fastapi import FastAPI
from fastapi.testclient import TestClient
from myauth.api.routes import create_auth_router
from myauth.api import create_auth_app
from myauth.core.auth import AuthService
from myauth.exceptions import (
UserAlreadyExistsError,
@@ -51,8 +51,8 @@ def test_app(mock_auth_service):
FastAPI application configured for testing.
"""
app = FastAPI()
auth_router = create_auth_router(mock_auth_service)
app.include_router(auth_router)
auth_app = create_auth_app(mock_auth_service)
app.mount("/auth", auth_app)
return app
@@ -513,3 +513,361 @@ def test_i_cannot_access_protected_route_with_invalid_token(client, mock_auth_se
)
assert response.status_code == 401
def test_i_can_update_my_email(client, mock_auth_service, sample_user):
"""
Test user can update their own email.
Verifies that a PATCH request to /auth/me with a new email
successfully updates the user's email address.
"""
updated_user = sample_user.model_copy(update={"email": "newemail@example.com"})
mock_auth_service.get_current_user.return_value = sample_user
mock_auth_service.update_user.return_value = updated_user
response = client.patch(
"/auth/me",
headers={"Authorization": "Bearer sample.access.token"},
json={"email": "newemail@example.com"}
)
assert response.status_code == 200
data = response.json()
assert data["email"] == "newemail@example.com"
mock_auth_service.update_user.assert_called_once()
def test_i_can_update_my_username(client, mock_auth_service, sample_user):
"""
Test user can update their own username.
Verifies that a PATCH request to /auth/me with a new username
successfully updates the user's username.
"""
updated_user = sample_user.model_copy(update={"username": "newusername"})
mock_auth_service.get_current_user.return_value = sample_user
mock_auth_service.update_user.return_value = updated_user
response = client.patch(
"/auth/me",
headers={"Authorization": "Bearer sample.access.token"},
json={"username": "newusername"}
)
assert response.status_code == 200
data = response.json()
assert data["username"] == "newusername"
mock_auth_service.update_user.assert_called_once()
def test_i_can_update_my_password(client, mock_auth_service, sample_user):
"""
Test user can update their own password.
Verifies that a PATCH request to /auth/me with a new password
successfully updates the password (which will be hashed by the service).
"""
mock_auth_service.get_current_user.return_value = sample_user
mock_auth_service.update_user.return_value = sample_user
response = client.patch(
"/auth/me",
headers={"Authorization": "Bearer sample.access.token"},
json={"password": "NewSecurePass123!"}
)
assert response.status_code == 200
mock_auth_service.update_user.assert_called_once()
# Verify password was included in the update
call_args = mock_auth_service.update_user.call_args
assert call_args[1]["updates"].password == "NewSecurePass123!"
def test_i_can_update_my_password_and_preserve_session(client, mock_auth_service, sample_user):
"""
Test user can update password while preserving current session.
Verifies that when a refresh_token is provided in the request body,
it is passed to the service to preserve the current session.
"""
mock_auth_service.get_current_user.return_value = sample_user
mock_auth_service.update_user.return_value = sample_user
response = client.patch(
"/auth/me",
headers={"Authorization": "Bearer sample.access.token"},
json={
"password": "NewSecurePass123!",
"refresh_token": "current_refresh_token"
}
)
assert response.status_code == 200
mock_auth_service.update_user.assert_called_once()
# Verify refresh_token was passed to preserve session
call_args = mock_auth_service.update_user.call_args
assert call_args[1]["refresh_token"] == "current_refresh_token"
def test_i_can_update_my_user_settings(client, mock_auth_service, sample_user):
"""
Test user can update their own settings.
Verifies that a PATCH request to /auth/me with new user_settings
successfully updates the user's custom settings.
"""
new_settings = {"theme": "dark", "language": "fr", "notifications": True}
updated_user = sample_user.model_copy(update={"user_settings": new_settings})
mock_auth_service.get_current_user.return_value = sample_user
mock_auth_service.update_user.return_value = updated_user
response = client.patch(
"/auth/me",
headers={"Authorization": "Bearer sample.access.token"},
json={"user_settings": new_settings}
)
assert response.status_code == 200
data = response.json()
assert data["user_settings"] == new_settings
mock_auth_service.update_user.assert_called_once()
def test_i_can_update_multiple_fields_on_my_profile(client, mock_auth_service, sample_user):
"""
Test user can update multiple fields simultaneously.
Verifies that a PATCH request to /auth/me can update multiple
fields (email, username, user_settings) at once.
"""
updated_user = sample_user.model_copy(update={
"email": "multiemail@example.com",
"username": "multiuser",
"user_settings": {"theme": "light"}
})
mock_auth_service.get_current_user.return_value = sample_user
mock_auth_service.update_user.return_value = updated_user
response = client.patch(
"/auth/me",
headers={"Authorization": "Bearer sample.access.token"},
json={
"email": "multiemail@example.com",
"username": "multiuser",
"user_settings": {"theme": "light"}
}
)
assert response.status_code == 200
data = response.json()
assert data["email"] == "multiemail@example.com"
assert data["username"] == "multiuser"
assert data["user_settings"] == {"theme": "light"}
mock_auth_service.update_user.assert_called_once()
def test_i_cannot_update_my_profile_without_authentication(client):
"""
Test updating profile fails without authentication.
Verifies that a PATCH request to /auth/me without a Bearer token
returns 401 Unauthorized status.
"""
response = client.patch(
"/auth/me",
json={"username": "shouldfail"}
)
assert response.status_code == 401
def test_i_cannot_update_my_email_to_existing_email(client, mock_auth_service, sample_user):
"""
Test updating email fails if already exists.
Verifies that attempting to update email to an already registered
email returns 409 Conflict status.
"""
mock_auth_service.get_current_user.return_value = sample_user
mock_auth_service.update_user.side_effect = UserAlreadyExistsError(
"Email already in use"
)
response = client.patch(
"/auth/me",
headers={"Authorization": "Bearer sample.access.token"},
json={"email": "existing@example.com"}
)
assert response.status_code == 409
assert "already" in response.json()["detail"].lower()
def test_i_cannot_update_my_profile_with_invalid_token(client, mock_auth_service):
"""
Test updating profile fails with invalid token.
Verifies that attempting to access /auth/me with an invalid token
returns 401 Unauthorized status.
"""
mock_auth_service.get_current_user.side_effect = InvalidTokenError(
"Invalid access token"
)
response = client.patch(
"/auth/me",
headers={"Authorization": "Bearer invalid.token"},
json={"username": "shouldfail"}
)
assert response.status_code == 401
def test_admin_can_update_any_user(client, mock_auth_service, sample_user):
"""
Test admin can update any user.
Verifies that an admin can successfully update another user's
information via PATCH /auth/users/{user_id}.
"""
admin_user = sample_user.model_copy(update={"roles": ["admin"]})
target_user = sample_user.model_copy(update={"id": "target_user_id", "username": "targetuser"})
updated_user = target_user.model_copy(update={"username": "updatedusername"})
mock_auth_service.get_current_user.return_value = admin_user
mock_auth_service.update_user.return_value = updated_user
response = client.patch(
"/auth/users/target_user_id",
headers={"Authorization": "Bearer admin.access.token"},
json={"username": "updatedusername"}
)
assert response.status_code == 200
data = response.json()
assert data["username"] == "updatedusername"
mock_auth_service.update_user.assert_called_once()
def test_admin_can_update_user_roles(client, mock_auth_service, sample_user):
"""
Test admin can update user roles.
Verifies that an admin can change a user's roles, which is forbidden
for regular users on the /auth/me endpoint.
"""
admin_user = sample_user.model_copy(update={"roles": ["admin"]})
target_user = sample_user.model_copy(update={"id": "target_user_id", "roles": ["user"]})
updated_user = target_user.model_copy(update={"roles": ["admin", "moderator"]})
mock_auth_service.get_current_user.return_value = admin_user
mock_auth_service.update_user.return_value = updated_user
response = client.patch(
"/auth/users/target_user_id",
headers={"Authorization": "Bearer admin.access.token"},
json={"roles": ["admin", "moderator"]}
)
assert response.status_code == 200
data = response.json()
assert data["roles"] == ["admin", "moderator"]
mock_auth_service.update_user.assert_called_once()
def test_admin_can_update_user_is_active(client, mock_auth_service, sample_user):
"""
Test admin can update user active status.
Verifies that an admin can activate or deactivate a user account,
which is forbidden for regular users on the /auth/me endpoint.
"""
admin_user = sample_user.model_copy(update={"roles": ["admin"]})
target_user = sample_user.model_copy(update={"id": "target_user_id", "is_active": True})
updated_user = target_user.model_copy(update={"is_active": False})
mock_auth_service.get_current_user.return_value = admin_user
mock_auth_service.update_user.return_value = updated_user
response = client.patch(
"/auth/users/target_user_id",
headers={"Authorization": "Bearer admin.access.token"},
json={"is_active": False}
)
assert response.status_code == 200
# Note: is_active is not in UserResponse, so we just verify the call was made
mock_auth_service.update_user.assert_called_once()
call_args = mock_auth_service.update_user.call_args
assert call_args[1]["updates"].is_active is False
def test_admin_can_update_user_is_verified(client, mock_auth_service, sample_user):
"""
Test admin can update user verification status.
Verifies that an admin can change a user's email verification status,
which is forbidden for regular users on the /auth/me endpoint.
"""
admin_user = sample_user.model_copy(update={"roles": ["admin"]})
target_user = sample_user.model_copy(update={"id": "target_user_id", "is_verified": False})
updated_user = target_user.model_copy(update={"is_verified": True})
mock_auth_service.get_current_user.return_value = admin_user
mock_auth_service.update_user.return_value = updated_user
response = client.patch(
"/auth/users/target_user_id",
headers={"Authorization": "Bearer admin.access.token"},
json={"is_verified": True}
)
assert response.status_code == 200
# Note: is_verified is not in UserResponse, so we just verify the call was made
mock_auth_service.update_user.assert_called_once()
call_args = mock_auth_service.update_user.call_args
assert call_args[1]["updates"].is_verified is True
def test_non_admin_cannot_update_other_users(client, mock_auth_service, sample_user):
"""
Test non-admin cannot update other users.
Verifies that a regular user (without admin role) cannot access
the PATCH /auth/users/{user_id} endpoint and receives 403 Forbidden.
"""
regular_user = sample_user.model_copy(update={"roles": ["user"]})
mock_auth_service.get_current_user.return_value = regular_user
response = client.patch(
"/auth/users/other_user_id",
headers={"Authorization": "Bearer user.access.token"},
json={"username": "shouldfail"}
)
assert response.status_code == 403
assert "admin" in response.json()["detail"].lower()
def test_admin_cannot_update_non_existent_user(client, mock_auth_service, sample_user):
"""
Test admin cannot update non-existent user.
Verifies that attempting to update a non-existent user returns
404 Not Found status.
"""
admin_user = sample_user.model_copy(update={"roles": ["admin"]})
mock_auth_service.get_current_user.return_value = admin_user
mock_auth_service.update_user.side_effect = UserNotFoundError(
"User not found"
)
response = client.patch(
"/auth/users/non_existent_id",
headers={"Authorization": "Bearer admin.access.token"},
json={"username": "shouldfail"}
)
assert response.status_code == 404
assert "not found" in response.json()["detail"].lower()

View File

@@ -78,6 +78,178 @@ class TestAuthServiceRegisterLogin(object):
with pytest.raises(InvalidCredentialsError):
auth_service.login("non.existent@example.com", "AnyPassword")
def test_create_admin_if_needed_success_with_custom_credentials(
self,
auth_service: AuthService
):
"""Success: Admin is created with custom credentials when no users exist."""
# Arrange
custom_email = "custom.admin@example.com"
custom_username = "custom_admin"
custom_password = "CustomAdminPass123!"
# Act
result = auth_service.create_admin_if_needed(
admin_email=custom_email,
admin_username=custom_username,
admin_password=custom_password
)
# Assert
assert result is True
# Verify admin user was created
admin_user = auth_service.user_repository.get_user_by_email(custom_email)
assert admin_user is not None
assert admin_user.email == custom_email
assert admin_user.username == custom_username
assert "admin" in admin_user.roles
# Verify password was hashed
auth_service.password_manager.hash_password.assert_called()
def test_create_admin_if_needed_success_with_default_credentials(
self,
auth_service: AuthService,
monkeypatch
):
"""Success: Admin is created with default credentials from environment variables."""
# Arrange
monkeypatch.setenv("AUTH_ADMIN_EMAIL", "env.admin@example.com")
monkeypatch.setenv("AUTH_ADMIN_USERNAME", "env_admin")
monkeypatch.setenv("AUTH_ADMIN_PASSWORD", "EnvAdminPass123!")
# Act
result = auth_service.create_admin_if_needed()
# Assert
assert result is True
# Verify admin user was created with env variables
admin_user = auth_service.user_repository.get_user_by_email("env.admin@example.com")
assert admin_user is not None
assert admin_user.email == "env.admin@example.com"
assert admin_user.username == "env_admin"
assert "admin" in admin_user.roles
def test_create_admin_if_needed_success_with_hardcoded_defaults(
self,
auth_service: AuthService,
monkeypatch
):
"""Success: Admin is created with hardcoded defaults when no env vars or params provided."""
# Arrange - Clear any existing env variables
monkeypatch.delenv("AUTH_ADMIN_EMAIL", raising=False)
monkeypatch.delenv("AUTH_ADMIN_USERNAME", raising=False)
monkeypatch.delenv("AUTH_ADMIN_PASSWORD", raising=False)
# Act
result = auth_service.create_admin_if_needed()
# Assert
assert result is True
# Verify admin user was created with hardcoded defaults
admin_user = auth_service.user_repository.get_user_by_email("admin@myauth.com")
assert admin_user is not None
assert admin_user.email == "admin@myauth.com"
assert admin_user.username == "admin"
assert "admin" in admin_user.roles
def test_create_admin_if_needed_no_creation_when_users_exist(
self,
auth_service: AuthService,
test_user_data_create: UserCreate
):
"""Failure: Admin is not created when users already exist in the system."""
# Arrange - Create a regular user first
auth_service.register(test_user_data_create)
# Act
result = auth_service.create_admin_if_needed(
admin_email="should.not.be.created@example.com",
admin_username="should_not_exist",
admin_password="ShouldNotExist123!"
)
# Assert
assert result is False
# Verify admin user was NOT created
admin_user = auth_service.user_repository.get_user_by_email(
"should.not.be.created@example.com"
)
assert admin_user is None
# Verify only the original user exists
assert auth_service.count_users() == 1
def test_create_admin_if_needed_parameters_override_env_variables(
self,
auth_service: AuthService,
monkeypatch
):
"""Success: Parameters take precedence over environment variables."""
# Arrange
monkeypatch.setenv("AUTH_ADMIN_EMAIL", "env.admin@example.com")
monkeypatch.setenv("AUTH_ADMIN_USERNAME", "env_admin")
monkeypatch.setenv("AUTH_ADMIN_PASSWORD", "EnvAdminPass123!")
param_email = "param.admin@example.com"
param_username = "param_admin"
param_password = "ParamAdminPass123!"
# Act
result = auth_service.create_admin_if_needed(
admin_email=param_email,
admin_username=param_username,
admin_password=param_password
)
# Assert
assert result is True
# Verify parameters were used, not env variables
admin_user = auth_service.user_repository.get_user_by_email(param_email)
assert admin_user is not None
assert admin_user.email == param_email
assert admin_user.username == param_username
# Verify env admin was NOT created
env_admin = auth_service.user_repository.get_user_by_email("env.admin@example.com")
assert env_admin is None
def test_create_admin_if_needed_mixed_parameters_and_env(
self,
auth_service: AuthService,
monkeypatch
):
"""Success: Partial parameters combine with environment variables."""
# Arrange
monkeypatch.setenv("AUTH_ADMIN_EMAIL", "env.admin@example.com")
monkeypatch.setenv("AUTH_ADMIN_USERNAME", "env_admin")
monkeypatch.setenv("AUTH_ADMIN_PASSWORD", "EnvAdminPass123!")
# Act - Only provide email as parameter
result = auth_service.create_admin_if_needed(
admin_email="partial.admin@example.com"
)
# Assert
assert result is True
# Verify email from parameter, username and password from env
admin_user = auth_service.user_repository.get_user_by_email("partial.admin@example.com")
assert admin_user is not None
assert admin_user.email == "partial.admin@example.com"
assert admin_user.username == "env_admin"
class TestAuthServiceTokenManagement(object):
"""Tests for token-related flows (Refresh, Logout, GetCurrentUser)."""
@@ -171,6 +343,297 @@ class TestAuthServiceTokenManagement(object):
with pytest.raises(ExpiredTokenError):
auth_service.get_current_user("expired_access_jwt")
class TestAuthServiceUserUpdate(object):
"""Tests for user update operations."""
@pytest.fixture(autouse=True)
def setup_user(self, auth_service: AuthService, test_user_data_create: UserCreate):
"""Sets up a registered user for update tests."""
pm = auth_service.password_manager
original_hash = pm.hash_password.return_value
# Temporarily set hash for setup
pm.hash_password.return_value = "HASHED_PASS"
user = auth_service.register(test_user_data_create)
self.user = user
self.original_email = user.email
self.original_username = user.username
# Restore hash mock
pm.hash_password.return_value = original_hash
def test_i_can_update_user_email(self, auth_service: AuthService):
"""Success: Email can be updated and is_verified is automatically set to False."""
from myauth.models.user import UserUpdate
new_email = "updated.email@example.com"
updates = UserUpdate(email=new_email)
updated_user = auth_service.update_user(self.user.id, updates)
assert updated_user.email == new_email
assert updated_user.is_verified is False
def test_i_can_update_user_username(self, auth_service: AuthService):
"""Success: Username can be updated."""
from myauth.models.user import UserUpdate
new_username = "UpdatedUsername"
updates = UserUpdate(username=new_username)
updated_user = auth_service.update_user(self.user.id, updates)
assert updated_user.username == new_username
def test_i_can_update_user_password(self, auth_service: AuthService):
"""Success: Password can be updated and is properly hashed."""
from myauth.models.user import UserUpdate
pm = auth_service.password_manager
new_password = "NewSecurePass123!"
with patch.object(pm, 'hash_password', return_value="NEW_HASHED_PASSWORD") as mock_hash:
updates = UserUpdate(password=new_password)
updated_user = auth_service.update_user(self.user.id, updates)
mock_hash.assert_called_once_with(new_password)
assert updated_user.hashed_password == "NEW_HASHED_PASSWORD"
def test_i_can_update_user_password_and_all_refresh_tokens_are_revoked(
self,
auth_service: AuthService
):
"""Success: Updating password revokes all refresh tokens when no current token provided."""
from myauth.models.user import UserUpdate
# Setup: Create some refresh tokens for the user
from myauth.models.token import TokenData
from datetime import datetime, timedelta
token1 = TokenData(
token="refresh_token_1",
token_type="refresh",
user_id=self.user.id,
expires_at=datetime.now() + timedelta(days=1),
created_at=datetime.now(),
is_revoked=False
)
token2 = TokenData(
token="refresh_token_2",
token_type="refresh",
user_id=self.user.id,
expires_at=datetime.now() + timedelta(days=1),
created_at=datetime.now(),
is_revoked=False
)
auth_service.token_repository.save_token(token1)
auth_service.token_repository.save_token(token2)
# Execute: Update password without providing current token
updates = UserUpdate(password="NewPassword123!")
auth_service.update_user(self.user.id, updates)
# Verify: Both tokens are revoked
token1_after = auth_service.token_repository.get_token("refresh_token_1", "refresh")
token2_after = auth_service.token_repository.get_token("refresh_token_2", "refresh")
assert token1_after.is_revoked is True
assert token2_after.is_revoked is True
def test_i_can_update_user_password_and_preserve_current_session(
self,
auth_service: AuthService
):
"""Success: Updating password preserves the current refresh token when provided."""
from myauth.models.user import UserUpdate
from myauth.models.token import TokenData
from datetime import datetime, timedelta
# Setup: Create some refresh tokens
current_token = TokenData(
token="current_refresh_token",
token_type="refresh",
user_id=self.user.id,
expires_at=datetime.now() + timedelta(days=1),
created_at=datetime.now(),
is_revoked=False
)
other_token = TokenData(
token="other_refresh_token",
token_type="refresh",
user_id=self.user.id,
expires_at=datetime.now() + timedelta(days=1),
created_at=datetime.now(),
is_revoked=False
)
auth_service.token_repository.save_token(current_token)
auth_service.token_repository.save_token(other_token)
# Execute: Update password while providing current token
updates = UserUpdate(password="NewPassword123!")
auth_service.update_user(self.user.id, updates, refresh_token="current_refresh_token")
# Verify: Current token is preserved, other is revoked
current_after = auth_service.token_repository.get_token("current_refresh_token", "refresh")
other_after = auth_service.token_repository.get_token("other_refresh_token", "refresh")
assert current_after.is_revoked is False
assert other_after.is_revoked is True
def test_i_can_update_multiple_fields_at_once(self, auth_service: AuthService):
"""Success: Multiple fields can be updated simultaneously."""
from myauth.models.user import UserUpdate
updates = UserUpdate(
username="MultiUpdateUser",
roles=["admin", "member"],
user_settings={"theme": "light", "language": "en"}
)
updated_user = auth_service.update_user(self.user.id, updates)
assert updated_user.username == "MultiUpdateUser"
assert updated_user.roles == ["admin", "member"]
assert updated_user.user_settings == {"theme": "light", "language": "en"}
def test_i_can_update_user_roles(self, auth_service: AuthService):
"""Success: User roles can be updated."""
from myauth.models.user import UserUpdate
new_roles = ["admin", "moderator"]
updates = UserUpdate(roles=new_roles)
updated_user = auth_service.update_user(self.user.id, updates)
assert updated_user.roles == new_roles
def test_i_can_update_user_settings(self, auth_service: AuthService):
"""Success: User settings can be updated."""
from myauth.models.user import UserUpdate
new_settings = {"theme": "light", "notifications": True, "language": "fr"}
updates = UserUpdate(user_settings=new_settings)
updated_user = auth_service.update_user(self.user.id, updates)
assert updated_user.user_settings == new_settings
def test_i_can_update_is_active_status(self, auth_service: AuthService):
"""Success: User active status can be updated."""
from myauth.models.user import UserUpdate
# Deactivate user
updates = UserUpdate(is_active=False)
updated_user = auth_service.update_user(self.user.id, updates)
assert updated_user.is_active is False
# Reactivate user
updates = UserUpdate(is_active=True)
updated_user = auth_service.update_user(self.user.id, updates)
assert updated_user.is_active is True
def test_i_cannot_update_user_with_invalid_user_id(self, auth_service: AuthService):
"""Failure: Updating a non-existent user raises UserNotFoundError."""
from myauth.models.user import UserUpdate
from myauth.exceptions import UserNotFoundError
updates = UserUpdate(username="ShouldFail")
with pytest.raises(UserNotFoundError):
auth_service.update_user("non_existent_id", updates)
def test_i_cannot_update_email_to_existing_email(self, auth_service: AuthService):
"""Failure: Updating email to an already registered email raises UserAlreadyExistsError."""
from myauth.models.user import UserCreate, UserUpdate
# Setup: Create another user with a different email
other_user_data = UserCreate(
email="other.user@example.com",
username="OtherUser",
password="OtherPass123!",
roles=["member"]
)
auth_service.register(other_user_data)
# Execute: Try to update original user's email to the other user's email
updates = UserUpdate(email="other.user@example.com")
with pytest.raises(UserAlreadyExistsError):
auth_service.update_user(self.user.id, updates)
def test_i_cannot_update_email_to_same_email_without_triggering_verification_reset(
self,
auth_service: AuthService
):
"""Success: Updating email to the same email does not reset is_verified."""
from myauth.models.user import UserUpdate
# Setup: Ensure user is verified
verify_updates = UserUpdate(is_verified=True)
auth_service.update_user(self.user.id, verify_updates)
# Execute: Update with the same email
updates = UserUpdate(email=self.original_email)
updated_user = auth_service.update_user(self.user.id, updates)
# Verify: is_verified should remain True
assert updated_user.is_verified is True
assert updated_user.email == self.original_email
def test_i_can_update_with_empty_updates(self, auth_service: AuthService):
"""Success: Updating with empty UserUpdate does not cause errors."""
from myauth.models.user import UserUpdate
updates = UserUpdate()
updated_user = auth_service.update_user(self.user.id, updates)
# Verify: User is returned and fields are unchanged
assert updated_user.id == self.user.id
assert updated_user.email == self.original_email
assert updated_user.username == self.original_username
def test_email_verification_reset_only_when_email_actually_changes(
self,
auth_service: AuthService
):
"""Success: is_verified is reset to False only when email actually changes."""
from myauth.models.user import UserUpdate
# Setup: Set user as verified
verify_updates = UserUpdate(is_verified=True)
auth_service.update_user(self.user.id, verify_updates)
verified_user = auth_service.user_repository.get_user_by_id(self.user.id)
assert verified_user.is_verified is True
# Test 1: Update with same email - verification should remain
same_email_updates = UserUpdate(email=self.original_email, username="SameEmailTest")
updated_user = auth_service.update_user(self.user.id, same_email_updates)
assert updated_user.is_verified is True
# Test 2: Update with different email - verification should reset
different_email_updates = UserUpdate(email="completely.new@example.com")
updated_user = auth_service.update_user(self.user.id, different_email_updates)
assert updated_user.is_verified is False
assert updated_user.email == "completely.new@example.com"
# class TestAuthServiceResetVerification(object):
# """Tests for password reset and email verification flows."""
#

View File

@@ -1,12 +1,12 @@
# tests/persistence/test_sqlite_user.py
import pytest
import json
from datetime import datetime
from myauth.persistence.sqlite import SQLiteUserRepository
from myauth.models.user import UserCreate, UserUpdate
import pytest
from myauth.exceptions import UserAlreadyExistsError, UserNotFoundError
from myauth.models.user import UserCreate, UserUpdate
from myauth.persistence.sqlite import SQLiteUserRepository
def test_i_can_create_and_retrieve_user_by_email(user_repository: SQLiteUserRepository,
@@ -153,3 +153,138 @@ def test_i_cannot_retrieve_non_existent_user_by_email(user_repository: SQLiteUse
retrieved_user = user_repository.get_user_by_email("ghost@example.com")
assert retrieved_user is None
def test_i_can_list_users_with_pagination(user_repository: SQLiteUserRepository,
test_user_hashed_password: str):
"""Verifies that list_users returns paginated results correctly."""
# Create multiple users
for i in range(5):
user_data = UserCreate(
email=f"user{i}@example.com",
username=f"User{i}",
password="#Password123",
roles=["user"],
user_settings={}
)
user_repository.create_user(user_data, test_user_hashed_password)
# Test: Get first 3 users
users_page1 = user_repository.list_users(skip=0, limit=3)
assert len(users_page1) == 3
# Test: Get next 2 users
users_page2 = user_repository.list_users(skip=3, limit=3)
assert len(users_page2) == 2
# Test: Verify no duplicates between pages
page1_ids = {user.id for user in users_page1}
page2_ids = {user.id for user in users_page2}
assert len(page1_ids.intersection(page2_ids)) == 0
def test_i_can_list_users_with_default_pagination(user_repository: SQLiteUserRepository,
test_user_data_create: UserCreate,
test_user_hashed_password: str):
"""Verifies that list_users works with default parameters."""
# Create 2 users
user_repository.create_user(test_user_data_create, test_user_hashed_password)
user_data2 = UserCreate(
email="user2@example.com",
username="User2",
password="#Password123",
roles=["user"],
user_settings={}
)
user_repository.create_user(user_data2, test_user_hashed_password)
# Test: Default parameters (skip=0, limit=100)
users = user_repository.list_users()
assert len(users) == 2
assert all(isinstance(user.created_at, datetime) for user in users)
def test_i_get_empty_list_when_no_users_exist(user_repository: SQLiteUserRepository):
"""Verifies that list_users returns an empty list when no users exist."""
users = user_repository.list_users()
assert users == []
assert isinstance(users, list)
def test_i_can_skip_beyond_available_users(user_repository: SQLiteUserRepository,
test_user_data_create: UserCreate,
test_user_hashed_password: str):
"""Verifies that skipping beyond available users returns an empty list."""
user_repository.create_user(test_user_data_create, test_user_hashed_password)
# Skip beyond the only user
users = user_repository.list_users(skip=10, limit=10)
assert users == []
def test_i_can_count_users(user_repository: SQLiteUserRepository,
test_user_hashed_password: str):
"""Verifies that count_users returns the correct number of users."""
# Initial count should be 0
assert user_repository.count_users() == 0
# Create first user
user_data1 = UserCreate(
email="user1@example.com",
username="User1",
password="#Password123",
roles=["user"],
user_settings={}
)
user_repository.create_user(user_data1, test_user_hashed_password)
assert user_repository.count_users() == 1
# Create second user
user_data2 = UserCreate(
email="user2@example.com",
username="User2",
password="#Password123",
roles=["user"],
user_settings={}
)
user_repository.create_user(user_data2, test_user_hashed_password)
assert user_repository.count_users() == 2
def test_i_get_zero_count_when_no_users_exist(user_repository: SQLiteUserRepository):
"""Verifies that count_users returns 0 when the database is empty."""
count = user_repository.count_users()
assert count == 0
assert isinstance(count, int)
def test_list_users_returns_correct_user_structure(user_repository: SQLiteUserRepository,
test_user_data_create: UserCreate,
test_user_hashed_password: str):
"""Verifies that list_users returns UserInDB objects with all fields."""
user_repository.create_user(test_user_data_create, test_user_hashed_password)
users = user_repository.list_users()
assert len(users) == 1
user = users[0]
# Verify all fields are present and correct type
assert user.id is not None
assert user.email == test_user_data_create.email
assert user.username == test_user_data_create.username
assert user.hashed_password == test_user_hashed_password
assert isinstance(user.roles, list)
assert isinstance(user.user_settings, dict)
assert isinstance(user.is_verified, bool)
assert isinstance(user.is_active, bool)
assert isinstance(user.created_at, datetime)
assert isinstance(user.updated_at, datetime)