Compare commits
5 Commits
7634631b90
...
master
| Author | SHA1 | Date | |
|---|---|---|---|
| 5f652b436e | |||
| 43603fe66f | |||
| ea7cf786cb | |||
| 5d869e3793 | |||
| 0138ac247a |
2
.gitignore
vendored
2
.gitignore
vendored
@@ -1,3 +1,5 @@
|
||||
**/UserDB
|
||||
|
||||
# Byte-compiled / optimized / DLL files
|
||||
__pycache__/
|
||||
*.py[codz]
|
||||
|
||||
19
Makefile
Normal file
19
Makefile
Normal file
@@ -0,0 +1,19 @@
|
||||
# Makefile for cleaning packaging files and directories
|
||||
|
||||
.PHONY: clean-package clean-build clean
|
||||
|
||||
# Clean distribution artifacts (dist/ and *.egg-info)
|
||||
clean-package:
|
||||
rm -rf dist
|
||||
rm -rf *.egg-info
|
||||
rm -rf src/*.egg-info
|
||||
|
||||
# Clean all Python build artifacts (dist, egg-info, pyc, and cache files)
|
||||
clean-build: clean-package
|
||||
find . -name "__pycache__" -type d -exec rm -rf {} +
|
||||
find . -name "*.pyc" -exec rm -f {} +
|
||||
find . -name "*.pyo" -exec rm -f {} +
|
||||
|
||||
# Alias to clean everything
|
||||
clean: clean-build
|
||||
find . -name "UserDB" -exec rm -f {} +
|
||||
155
README.md
155
README.md
@@ -22,7 +22,7 @@ with pluggable database backends.
|
||||
* Secure password reset (via secure token stored in DB).
|
||||
* Account activation / deactivation.
|
||||
* **Security:**
|
||||
* Password hashing with `bcrypt` (configurable rounds).
|
||||
* Password hashing with `argon2`.
|
||||
* Strict password validation (uppercase, lowercase, digit, special character).
|
||||
* **Flexible Architecture:**
|
||||
* **Pluggable Backends:** Supports MongoDB, PostgreSQL, and SQLite out of the box.
|
||||
@@ -75,30 +75,18 @@ This example configures myauth to use MongoDB as its backend.
|
||||
```Python
|
||||
|
||||
from fastapi import FastAPI
|
||||
from my_auth import AuthService
|
||||
from my_auth.api import auth_router
|
||||
from my_auth.persistence.mongodb import MongoUserRepository, MongoTokenRepository
|
||||
|
||||
from myauth import create_auth_app_for_mongodb
|
||||
|
||||
# 1. Initialize FastAPI app
|
||||
app = FastAPI()
|
||||
|
||||
# 2. Configure repositories for MongoDB
|
||||
# Make sure your connection string is correct
|
||||
user_repo = MongoUserRepository(connection_string="mongodb://localhost:27017/myappdb")
|
||||
token_repo = MongoTokenRepository(connection_string="mongodb://localhost:27017/myappdb")
|
||||
auth_app = create_auth_app_for_mongodb(mongodb_url="mongodb://localhost:27017",
|
||||
jwt_secret="THIS_NEEDS_TO_BE_CHANGED")
|
||||
|
||||
# 3. Configure the Authentication Service
|
||||
# IMPORTANT: Change this to a long, random, secret string
|
||||
auth_service = AuthService(
|
||||
user_repository=user_repo,
|
||||
token_repository=token_repo,
|
||||
jwt_secret="YOUR_SUPER_LONG_AND_SECURE_JWT_SECRET_HERE"
|
||||
# email_service will be added in the next step
|
||||
)
|
||||
|
||||
# 4. Include the authentication routes
|
||||
# Endpoints like /auth/login, /auth/register are now active
|
||||
app.include_router(auth_router)
|
||||
# 3. Include the authentication routes
|
||||
app.mount("/auth", auth_app)
|
||||
|
||||
|
||||
@app.get("/")
|
||||
@@ -113,37 +101,20 @@ This example configures myauth to use PostgreSQL as its backend.
|
||||
```Python
|
||||
|
||||
from fastapi import FastAPI
|
||||
from my_auth import AuthService
|
||||
from my_auth.api import auth_router
|
||||
from my_auth.persistence.postgresql import PostgreSQLUserRepository, PostgreSQLTokenRepository
|
||||
|
||||
from myauth import create_auth_app_for_postgresql
|
||||
|
||||
# 1. Initialize FastAPI app
|
||||
app = FastAPI()
|
||||
|
||||
# 2. Configure repositories for PostgreSQL
|
||||
# Update with your database credentials
|
||||
db_config = {
|
||||
"host": "localhost",
|
||||
"port": 5432,
|
||||
"database": "mydb",
|
||||
"user": "postgres",
|
||||
"password": "secret"
|
||||
}
|
||||
user_repo = PostgreSQLUserRepository(**db_config)
|
||||
token_repo = PostgreSQLTokenRepository(**db_config)
|
||||
auth_app = create_auth_app_for_postgresql(postgresql_url="mongodb://localhost:27017",
|
||||
username="admin",
|
||||
password="password",
|
||||
jwt_secret="THIS_NEEDS_TO_BE_CHANGED")
|
||||
|
||||
# 3. Configure the Authentication Service
|
||||
# IMPORTANT: Change this to a long, random, secret string
|
||||
auth_service = AuthService(
|
||||
user_repository=user_repo,
|
||||
token_repository=token_repo,
|
||||
jwt_secret="YOUR_SUPER_LONG_AND_SECURE_JWT_SECRET_HERE"
|
||||
# email_service will be added in the next step
|
||||
)
|
||||
|
||||
# 4. Include the authentication routes
|
||||
# Endpoints like /auth/login, /auth/register are now active
|
||||
app.include_router(auth_router)
|
||||
# 3. Include the authentication routes
|
||||
app.mount("/auth", auth_app)
|
||||
|
||||
|
||||
@app.get("/")
|
||||
@@ -158,31 +129,17 @@ This example configures myauth to use SQLite, which is ideal for development or
|
||||
```Python
|
||||
|
||||
from fastapi import FastAPI
|
||||
from my_auth import AuthService
|
||||
from my_auth.api import auth_router
|
||||
from my_auth.persistence.sqlite import SQLiteUserRepository, SQLiteTokenRepository
|
||||
|
||||
from myauth import create_auth_app_for_sqlite
|
||||
|
||||
# 1. Initialize FastAPI app
|
||||
app = FastAPI()
|
||||
|
||||
# 2. Configure repositories for SQLite
|
||||
# This will create/use a file named 'auth.db' in the current directory
|
||||
db_path = "./auth.db"
|
||||
user_repo = SQLiteUserRepository(db_path=db_path)
|
||||
token_repo = SQLiteTokenRepository(db_path=db_path)
|
||||
# 2. Configure repositories for MongoDB
|
||||
auth_app = create_auth_app_for_sqlite(db_path="./UserDB", jwt_secret="THIS_NEEDS_TO_BE_CHANGED")
|
||||
|
||||
# 3. Configure the Authentication Service
|
||||
# IMPORTANT: Change this to a long, random, secret string
|
||||
auth_service = AuthService(
|
||||
user_repository=user_repo,
|
||||
token_repository=token_repo,
|
||||
jwt_secret="YOUR_SUPER_LONG_AND_SECURE_JWT_SECRET_HERE"
|
||||
# email_service will be added in the next step
|
||||
)
|
||||
|
||||
# 4. Include the authentication routes
|
||||
# Endpoints like /auth/login, /auth/register are now active
|
||||
app.include_router(auth_router)
|
||||
# 3. Include the authentication routes
|
||||
app.mount("/auth", auth_app)
|
||||
|
||||
|
||||
@app.get("/")
|
||||
@@ -204,11 +161,15 @@ pip install "myauth[email]"
|
||||
|
||||
```Python
|
||||
|
||||
# ... (keep your app and repository config from the Quick Start)
|
||||
from fastapi import FastAPI
|
||||
|
||||
from my_auth.email.smtp import SMTPEmailService
|
||||
from myauth import create_auth_app_for_sqlite
|
||||
from myauth.emailing.smtp import SMTPEmailService
|
||||
|
||||
# 1. Configure the email service
|
||||
# 1. Initialize FastAPI app
|
||||
app = FastAPI()
|
||||
|
||||
# 2. Configure the email service
|
||||
email_service = SMTPEmailService(
|
||||
host="smtp.gmail.com",
|
||||
port=587,
|
||||
@@ -217,15 +178,12 @@ email_service = SMTPEmailService(
|
||||
use_tls=True
|
||||
)
|
||||
|
||||
# 2. Pass the email service to AuthService
|
||||
auth_service = AuthService(
|
||||
user_repository=user_repo, # From Quick Start
|
||||
token_repository=token_repo, # From Quick Start
|
||||
email_service=email_service, # Add this line
|
||||
jwt_secret="YOUR_SUPER_LONG_AND_SECURE_JWT_SECRET_HERE"
|
||||
)
|
||||
# 3. Configure repositories for MongoDB
|
||||
auth_app = create_auth_app_for_sqlite(db_path="./UserDB", jwt_secret="THIS_NEEDS_TO_BE_CHANGED",
|
||||
email_service=email_service)
|
||||
|
||||
# ... (keep 'app.include_router(auth_router)')
|
||||
# 4. Include the authentication routes
|
||||
app.mount("/auth", auth_app)
|
||||
```
|
||||
|
||||
### Option 2: Create a Custom Email Service
|
||||
@@ -234,9 +192,13 @@ If you use a third-party service (like AWS SES, Mailgun) that requires an API, y
|
||||
|
||||
```Python
|
||||
|
||||
# ... (keep your app and repository config from the Quick Start)
|
||||
from fastapi import FastAPI
|
||||
|
||||
from my_auth.email.base import EmailService
|
||||
from myauth import create_auth_app_for_sqlite
|
||||
from myauth.emailing.base import EmailService
|
||||
|
||||
# 1. Initialize FastAPI app
|
||||
app = FastAPI()
|
||||
|
||||
|
||||
# 1. Implement your custom email service
|
||||
@@ -263,14 +225,12 @@ class CustomEmailService(EmailService):
|
||||
email_service = CustomEmailService(api_key="YOUR_API_KEY_HERE")
|
||||
|
||||
# 3. Pass your custom service to AuthService
|
||||
auth_service = AuthService(
|
||||
user_repository=user_repo, # From Quick Start
|
||||
token_repository=token_repo, # From Quick Start
|
||||
email_service=email_service, # Add this line
|
||||
jwt_secret="YOUR_SUPER_LONG_AND_SECURE_JWT_SECRET_HERE"
|
||||
)
|
||||
auth_app = create_auth_app_for_sqlite(db_path="./UserDB", jwt_secret="THIS_NEEDS_TO_BE_CHANGED",
|
||||
email_service=email_service)
|
||||
|
||||
# 4. Include the authentication routes
|
||||
app.mount("/auth", auth_app)
|
||||
|
||||
# ... (keep 'app.include_router(auth_router)')
|
||||
```
|
||||
|
||||
## API Endpoints Reference
|
||||
@@ -287,6 +247,8 @@ POST /auth/password-reset # Reset password with token
|
||||
POST /auth/verify-email-request # Request email verification
|
||||
POST /auth/verify-email # Verify email with token
|
||||
GET /auth/me # Get current user info
|
||||
PATCH /auth/me # Update own profile
|
||||
PATCH /auth/users/{user_id} # Update user profile (by admin)
|
||||
```
|
||||
|
||||
### Error Handling
|
||||
@@ -302,25 +264,6 @@ The module uses custom exceptions that are automatically converted to the approp
|
||||
* `EmailNotVerifiedError` → **403 Forbidden (on login attempt)**
|
||||
* `AccountDisabledError` → **403 Forbidden (on login attempt)**
|
||||
|
||||
## Configuration Options
|
||||
|
||||
All options are passed during the `AuthService` initialization:
|
||||
|
||||
```Python
|
||||
|
||||
AuthService(
|
||||
user_repository: UserRepository, # Required
|
||||
token_repository: TokenRepository, # Required
|
||||
jwt_secret: str, # Required
|
||||
jwt_algorithm: str = "HS256", # Optional
|
||||
access_token_expire_minutes: int = 30, # Optional
|
||||
refresh_token_expire_days: int = 7, # Optional
|
||||
password_reset_token_expire_minutes: int = 15, # Optional
|
||||
password_hash_rounds: int = 12, # Optional (bcrypt cost)
|
||||
email_service: EmailService = None # Optional
|
||||
)
|
||||
```
|
||||
|
||||
## Appendix (Contributor & Development Details)
|
||||
|
||||
<details> <summary><b> Appendix A: Project Structure (src/my_auth)</b></summary>
|
||||
@@ -410,3 +353,9 @@ pytest tests/
|
||||
## License
|
||||
|
||||
MIT
|
||||
|
||||
## Release History
|
||||
|
||||
* 0.1.0 - Initial Release
|
||||
* 0.2.0 - Added admin user auto creation
|
||||
* 0.2.1 - Added user profile update (PATCH on /me and /users/{user_id})
|
||||
|
||||
@@ -7,7 +7,7 @@ build-backend = "setuptools.build_meta"
|
||||
|
||||
[project]
|
||||
name = "myauth"
|
||||
version = "0.2.0" # Start with an initial version
|
||||
version = "0.2.1"
|
||||
description = "A reusable, modular authentication system for FastAPI applications with pluggable database backends."
|
||||
readme = "README.md"
|
||||
authors = [
|
||||
@@ -38,8 +38,9 @@ dependencies = [
|
||||
"fastapi",
|
||||
"pydantic",
|
||||
"pydantic-settings",
|
||||
"pydantic[email]",
|
||||
"python-jose[cryptography]",
|
||||
"passlib[bcrypt]",
|
||||
"passlib[argon2]",
|
||||
"python-multipart"
|
||||
]
|
||||
|
||||
@@ -78,6 +79,9 @@ dev = [
|
||||
# Setuptools configuration
|
||||
# This section tells the build system where to find your package code
|
||||
# -------------------------------------------------------------------
|
||||
[tool.setuptools]
|
||||
package-dir = {"" = "src"}
|
||||
packages = ["my_auth"]
|
||||
#[tool.setuptools]
|
||||
#package-dir = {"myauth" = "src"}
|
||||
#packages = ["my_auth"]
|
||||
|
||||
[tool.setuptools.package-dir]
|
||||
myauth = "src/myauth"
|
||||
@@ -1,18 +1,33 @@
|
||||
annotated-types==0.7.0
|
||||
anyio==4.11.0
|
||||
bcrypt==5.0.0
|
||||
argon2-cffi==25.1.0
|
||||
argon2-cffi-bindings==25.1.0
|
||||
build==1.3.0
|
||||
certifi==2025.10.5
|
||||
cffi==2.0.0
|
||||
charset-normalizer==3.4.4
|
||||
cryptography==46.0.3
|
||||
dnspython==2.8.0
|
||||
docutils==0.22.2
|
||||
ecdsa==0.19.1
|
||||
email-validator==2.3.0
|
||||
fastapi==0.119.0
|
||||
h11==0.16.0
|
||||
httpcore==1.0.9
|
||||
httpx==0.28.1
|
||||
id==1.5.0
|
||||
idna==3.11
|
||||
iniconfig==2.1.0
|
||||
jaraco.classes==3.4.0
|
||||
jaraco.context==6.0.1
|
||||
jaraco.functools==4.3.0
|
||||
jeepney==0.9.0
|
||||
keyring==25.6.0
|
||||
markdown-it-py==4.0.0
|
||||
mdurl==0.1.2
|
||||
more-itertools==10.8.0
|
||||
nh3==0.3.1
|
||||
numpy==2.3.4
|
||||
packaging==25.0
|
||||
passlib==1.7.4
|
||||
pluggy==1.6.0
|
||||
@@ -22,13 +37,25 @@ pydantic==2.12.3
|
||||
pydantic-settings==2.11.0
|
||||
pydantic_core==2.41.4
|
||||
Pygments==2.19.2
|
||||
pyproject_hooks==1.2.0
|
||||
pytest==8.4.2
|
||||
python-dateutil==2.9.0.post0
|
||||
python-dotenv==1.1.1
|
||||
python-jose==3.5.0
|
||||
python-multipart==0.0.20
|
||||
pytz==2025.2
|
||||
readme_renderer==44.0
|
||||
requests==2.32.5
|
||||
requests-toolbelt==1.0.0
|
||||
rfc3986==2.0.0
|
||||
rich==14.2.0
|
||||
rsa==4.9.1
|
||||
SecretStorage==3.4.0
|
||||
six==1.17.0
|
||||
sniffio==1.3.1
|
||||
starlette==0.48.0
|
||||
twine==6.2.0
|
||||
typing-inspection==0.4.2
|
||||
typing_extensions==4.15.0
|
||||
tzdata==2025.2
|
||||
urllib3==2.5.0
|
||||
|
||||
22
src/main.py
Normal file
22
src/main.py
Normal file
@@ -0,0 +1,22 @@
|
||||
import uvicorn
|
||||
from fastapi import FastAPI
|
||||
|
||||
from myauth import create_auth_app_for_sqlite
|
||||
|
||||
# 1. Initialize FastAPI app
|
||||
app = FastAPI()
|
||||
|
||||
# 2. Configure repositories for MongoDB
|
||||
auth_app = create_auth_app_for_sqlite(db_path="./UserDB", jwt_secret="THIS_NEEDS_TO_BE_CHANGED")
|
||||
|
||||
# 3. Include the authentication routes
|
||||
app.mount("/auth", auth_app)
|
||||
|
||||
|
||||
@app.get("/")
|
||||
def read_root():
|
||||
return {"message": "Application running with MyAuth (SQLite)"}
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
uvicorn.run(app, host="0.0.0.0", port=8000)
|
||||
@@ -1,446 +0,0 @@
|
||||
"""
|
||||
Authentication service for the authentication module.
|
||||
|
||||
This module provides the main authentication service that orchestrates
|
||||
all authentication operations including registration, login, token management,
|
||||
password reset, and email verification.
|
||||
"""
|
||||
|
||||
from datetime import datetime
|
||||
from typing import Optional
|
||||
|
||||
from .password import PasswordManager
|
||||
from .token import TokenManager
|
||||
from ..persistence.base import UserRepository, TokenRepository
|
||||
from ..models.user import UserCreate, UserInDB, UserUpdate
|
||||
from ..models.token import AccessTokenResponse, TokenData
|
||||
from ..email.base import EmailService
|
||||
from ..exceptions import (
|
||||
InvalidCredentialsError,
|
||||
UserNotFoundError,
|
||||
AccountDisabledError,
|
||||
ExpiredTokenError,
|
||||
InvalidTokenError,
|
||||
RevokedTokenError
|
||||
)
|
||||
|
||||
|
||||
class AuthService:
|
||||
"""
|
||||
Main authentication service.
|
||||
|
||||
This service orchestrates all authentication-related operations by
|
||||
coordinating between password management, token management, and
|
||||
persistence layers.
|
||||
|
||||
Attributes:
|
||||
user_repository: Repository for user persistence operations.
|
||||
token_repository: Repository for token persistence operations.
|
||||
password_manager: Manager for password hashing and verification.
|
||||
token_manager: Manager for token creation and validation.
|
||||
email_service: Optional service for sending emails.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
user_repository: UserRepository,
|
||||
token_repository: TokenRepository,
|
||||
password_manager: PasswordManager,
|
||||
token_manager: TokenManager,
|
||||
email_service: Optional[EmailService] = None
|
||||
):
|
||||
"""
|
||||
Initialize the authentication service.
|
||||
|
||||
Args:
|
||||
user_repository: Repository for user persistence.
|
||||
token_repository: Repository for token persistence.
|
||||
password_manager: Manager for password hashing and verification.
|
||||
token_manager: Manager for token creation and validation.
|
||||
email_service: Optional service for sending emails (password reset, verification).
|
||||
"""
|
||||
self.user_repository = user_repository
|
||||
self.token_repository = token_repository
|
||||
self.password_manager = password_manager
|
||||
self.token_manager = token_manager
|
||||
self.email_service = email_service
|
||||
|
||||
def register(self, user_data: UserCreate) -> UserInDB:
|
||||
"""
|
||||
Register a new user.
|
||||
|
||||
This method creates a new user account with hashed password.
|
||||
The user's email is initially unverified.
|
||||
|
||||
Args:
|
||||
user_data: User registration data including password.
|
||||
|
||||
Returns:
|
||||
The created user (without password).
|
||||
|
||||
Raises:
|
||||
UserAlreadyExistsError: If email is already registered.
|
||||
|
||||
Example:
|
||||
>>> user_data = UserCreate(
|
||||
... email="user@example.com",
|
||||
... username="john_doe",
|
||||
... password="SecurePass123!"
|
||||
... )
|
||||
>>> user = auth_service.register(user_data)
|
||||
"""
|
||||
hashed_password = self.password_manager.hash_password(user_data.password)
|
||||
user = self.user_repository.create_user(user_data, hashed_password)
|
||||
return user
|
||||
|
||||
def login(self, email: str, password: str) -> tuple[UserInDB, AccessTokenResponse]:
|
||||
"""
|
||||
Authenticate a user and create tokens.
|
||||
|
||||
This method verifies credentials, checks account status, and generates
|
||||
both access and refresh tokens.
|
||||
|
||||
Args:
|
||||
email: User's email address.
|
||||
password: User's plain text password.
|
||||
|
||||
Returns:
|
||||
Tuple of (user, tokens) where tokens contains access_token and refresh_token.
|
||||
|
||||
Raises:
|
||||
InvalidCredentialsError: If email or password is incorrect.
|
||||
AccountDisabledError: If user account is disabled.
|
||||
|
||||
Example:
|
||||
>>> user, tokens = auth_service.login("user@example.com", "password")
|
||||
>>> print(tokens.access_token)
|
||||
"""
|
||||
# Get user by email
|
||||
user = self.user_repository.get_user_by_email(email)
|
||||
if not user:
|
||||
raise InvalidCredentialsError()
|
||||
|
||||
# Verify password
|
||||
if not self.password_manager.verify_password(password, user.hashed_password):
|
||||
raise InvalidCredentialsError()
|
||||
|
||||
# Check if account is active
|
||||
if not user.is_active:
|
||||
raise AccountDisabledError()
|
||||
|
||||
# Create tokens
|
||||
access_token = self.token_manager.create_access_token(user)
|
||||
refresh_token = self.token_manager.create_refresh_token()
|
||||
|
||||
# Store refresh token in database
|
||||
token_data = TokenData(
|
||||
token=refresh_token,
|
||||
token_type="refresh",
|
||||
user_id=user.id,
|
||||
expires_at=self.token_manager.get_refresh_token_expiration(),
|
||||
created_at=datetime.utcnow(),
|
||||
is_revoked=False
|
||||
)
|
||||
self.token_repository.save_token(token_data)
|
||||
|
||||
tokens = AccessTokenResponse(
|
||||
access_token=access_token,
|
||||
refresh_token=refresh_token,
|
||||
token_type="bearer"
|
||||
)
|
||||
|
||||
return user, tokens
|
||||
|
||||
def refresh_access_token(self, refresh_token: str) -> AccessTokenResponse:
|
||||
"""
|
||||
Create a new access token using a refresh token.
|
||||
|
||||
This method validates the refresh token and generates a new access token
|
||||
without requiring the user to re-enter their password.
|
||||
|
||||
Args:
|
||||
refresh_token: The refresh token to exchange.
|
||||
|
||||
Returns:
|
||||
New access and refresh tokens.
|
||||
|
||||
Raises:
|
||||
InvalidTokenError: If refresh token is invalid.
|
||||
ExpiredTokenError: If refresh token has expired.
|
||||
RevokedTokenError: If refresh token has been revoked.
|
||||
UserNotFoundError: If user no longer exists.
|
||||
AccountDisabledError: If user account is disabled.
|
||||
|
||||
Example:
|
||||
>>> tokens = auth_service.refresh_access_token(old_refresh_token)
|
||||
>>> print(tokens.access_token)
|
||||
"""
|
||||
# Validate refresh token
|
||||
token_data = self.token_repository.get_token(refresh_token, "refresh")
|
||||
if not token_data:
|
||||
raise InvalidTokenError("Invalid refresh token")
|
||||
|
||||
if token_data.is_revoked:
|
||||
raise RevokedTokenError()
|
||||
|
||||
if token_data.expires_at < datetime.utcnow():
|
||||
raise ExpiredTokenError()
|
||||
|
||||
# Get user
|
||||
user = self.user_repository.get_user_by_id(token_data.user_id)
|
||||
if not user:
|
||||
raise UserNotFoundError()
|
||||
|
||||
if not user.is_active:
|
||||
raise AccountDisabledError()
|
||||
|
||||
# Create new tokens
|
||||
access_token = self.token_manager.create_access_token(user)
|
||||
new_refresh_token = self.token_manager.create_refresh_token()
|
||||
|
||||
# Revoke old refresh token
|
||||
self.token_repository.revoke_token(refresh_token)
|
||||
|
||||
# Store new refresh token
|
||||
new_token_data = TokenData(
|
||||
token=new_refresh_token,
|
||||
token_type="refresh",
|
||||
user_id=user.id,
|
||||
expires_at=self.token_manager.get_refresh_token_expiration(),
|
||||
created_at=datetime.utcnow(),
|
||||
is_revoked=False
|
||||
)
|
||||
self.token_repository.save_token(new_token_data)
|
||||
|
||||
return AccessTokenResponse(
|
||||
access_token=access_token,
|
||||
refresh_token=new_refresh_token,
|
||||
token_type="bearer"
|
||||
)
|
||||
|
||||
def logout(self, refresh_token: str) -> bool:
|
||||
"""
|
||||
Logout a user by revoking their refresh token.
|
||||
|
||||
This prevents the refresh token from being used to obtain new
|
||||
access tokens. The current access token will remain valid until
|
||||
it expires naturally.
|
||||
|
||||
Args:
|
||||
refresh_token: The refresh token to revoke.
|
||||
|
||||
Returns:
|
||||
True if logout was successful, False if token not found.
|
||||
|
||||
Example:
|
||||
>>> auth_service.logout(refresh_token)
|
||||
True
|
||||
"""
|
||||
return self.token_repository.revoke_token(refresh_token)
|
||||
|
||||
def request_password_reset(self, email: str) -> str:
|
||||
"""
|
||||
Generate a password reset token for a user.
|
||||
|
||||
This method creates a secure token that can be sent to the user
|
||||
via email to reset their password. If an email service is configured,
|
||||
the email is sent automatically. Otherwise, the token is returned
|
||||
for manual handling.
|
||||
|
||||
Args:
|
||||
email: User's email address.
|
||||
|
||||
Returns:
|
||||
The password reset token to be sent via email.
|
||||
|
||||
Raises:
|
||||
UserNotFoundError: If email is not registered.
|
||||
|
||||
Example:
|
||||
>>> token = auth_service.request_password_reset("user@example.com")
|
||||
>>> # Token is automatically sent via email if service is configured
|
||||
"""
|
||||
user = self.user_repository.get_user_by_email(email)
|
||||
if not user:
|
||||
raise UserNotFoundError(f"No user found with email {email}")
|
||||
|
||||
# Create reset token
|
||||
reset_token = self.token_manager.create_password_reset_token()
|
||||
|
||||
# Store token in database
|
||||
token_data = TokenData(
|
||||
token=reset_token,
|
||||
token_type="password_reset",
|
||||
user_id=user.id,
|
||||
expires_at=self.token_manager.get_password_reset_token_expiration(),
|
||||
created_at=datetime.utcnow(),
|
||||
is_revoked=False
|
||||
)
|
||||
self.token_repository.save_token(token_data)
|
||||
|
||||
# Send email if service is configured
|
||||
if self.email_service:
|
||||
self.email_service.send_password_reset_email(email, reset_token)
|
||||
|
||||
return reset_token
|
||||
|
||||
def reset_password(self, token: str, new_password: str) -> bool:
|
||||
"""
|
||||
Reset a user's password using a reset token.
|
||||
|
||||
This method validates the reset token and updates the user's password.
|
||||
All existing refresh tokens for the user are revoked for security.
|
||||
|
||||
Args:
|
||||
token: Password reset token.
|
||||
new_password: New plain text password (will be hashed).
|
||||
|
||||
Returns:
|
||||
True if password was reset successfully.
|
||||
|
||||
Raises:
|
||||
InvalidTokenError: If reset token is invalid.
|
||||
ExpiredTokenError: If reset token has expired.
|
||||
RevokedTokenError: If reset token has been used.
|
||||
UserNotFoundError: If user no longer exists.
|
||||
|
||||
Example:
|
||||
>>> auth_service.reset_password(token, "NewSecurePass123!")
|
||||
True
|
||||
"""
|
||||
# Validate reset token
|
||||
token_data = self.token_repository.get_token(token, "password_reset")
|
||||
if not token_data:
|
||||
raise InvalidTokenError("Invalid password reset token")
|
||||
|
||||
if token_data.is_revoked:
|
||||
raise RevokedTokenError("Password reset token has already been used")
|
||||
|
||||
if token_data.expires_at < datetime.utcnow():
|
||||
raise ExpiredTokenError("Password reset token has expired")
|
||||
|
||||
# Get user
|
||||
user = self.user_repository.get_user_by_id(token_data.user_id)
|
||||
if not user:
|
||||
raise UserNotFoundError()
|
||||
|
||||
# Hash new password
|
||||
hashed_password = self.password_manager.hash_password(new_password)
|
||||
|
||||
# Update user password
|
||||
updates = UserUpdate(password=hashed_password)
|
||||
self.user_repository.update_user(user.id, updates)
|
||||
|
||||
# Revoke the reset token
|
||||
self.token_repository.revoke_token(token)
|
||||
|
||||
# Revoke all user's refresh tokens for security
|
||||
self.token_repository.revoke_all_user_tokens(user.id, "refresh")
|
||||
|
||||
return True
|
||||
|
||||
def request_email_verification(self, email: str) -> str:
|
||||
"""
|
||||
Generate an email verification token for a user.
|
||||
|
||||
This method creates a JWT token that can be sent to the user
|
||||
to verify their email address. If an email service is configured,
|
||||
the email is sent automatically. Otherwise, the token is returned
|
||||
for manual handling.
|
||||
|
||||
Args:
|
||||
email: User's email address.
|
||||
|
||||
Returns:
|
||||
The email verification token (JWT) to be sent via email.
|
||||
|
||||
Raises:
|
||||
UserNotFoundError: If email is not registered.
|
||||
|
||||
Example:
|
||||
>>> token = auth_service.request_email_verification("user@example.com")
|
||||
>>> # Token is automatically sent via email if service is configured
|
||||
"""
|
||||
user = self.user_repository.get_user_by_email(email)
|
||||
if not user:
|
||||
raise UserNotFoundError(f"No user found with email {email}")
|
||||
|
||||
verification_token = self.token_manager.create_email_verification_token(email)
|
||||
|
||||
# Send email if service is configured
|
||||
if self.email_service:
|
||||
self.email_service.send_verification_email(email, verification_token)
|
||||
|
||||
return verification_token
|
||||
|
||||
def verify_email(self, token: str) -> bool:
|
||||
"""
|
||||
Verify a user's email address using a verification token.
|
||||
|
||||
This method decodes the JWT token, extracts the email, and marks
|
||||
the user's email as verified.
|
||||
|
||||
Args:
|
||||
token: Email verification token (JWT).
|
||||
|
||||
Returns:
|
||||
True if email was verified successfully.
|
||||
|
||||
Raises:
|
||||
InvalidTokenError: If verification token is invalid.
|
||||
ExpiredTokenError: If verification token has expired.
|
||||
UserNotFoundError: If user no longer exists.
|
||||
|
||||
Example:
|
||||
>>> auth_service.verify_email(token)
|
||||
True
|
||||
"""
|
||||
# Decode and validate token
|
||||
email = self.token_manager.decode_email_verification_token(token)
|
||||
|
||||
# Get user
|
||||
user = self.user_repository.get_user_by_email(email)
|
||||
if not user:
|
||||
raise UserNotFoundError(f"No user found with email {email}")
|
||||
|
||||
# Update user's verified status
|
||||
updates = UserUpdate(is_verified=True)
|
||||
self.user_repository.update_user(user.id, updates)
|
||||
|
||||
return True
|
||||
|
||||
def get_current_user(self, access_token: str) -> UserInDB:
|
||||
"""
|
||||
Retrieve the current user from an access token.
|
||||
|
||||
This method decodes and validates the JWT access token and
|
||||
retrieves the corresponding user from the database.
|
||||
|
||||
Args:
|
||||
access_token: JWT access token.
|
||||
|
||||
Returns:
|
||||
The user associated with the token.
|
||||
|
||||
Raises:
|
||||
InvalidTokenError: If access token is invalid.
|
||||
ExpiredTokenError: If access token has expired.
|
||||
UserNotFoundError: If user no longer exists.
|
||||
AccountDisabledError: If user account is disabled.
|
||||
|
||||
Example:
|
||||
>>> user = auth_service.get_current_user(access_token)
|
||||
>>> print(user.email)
|
||||
"""
|
||||
# Decode and validate token
|
||||
token_payload = self.token_manager.decode_access_token(access_token)
|
||||
|
||||
# Get user
|
||||
user = self.user_repository.get_user_by_id(token_payload.sub)
|
||||
if not user:
|
||||
raise UserNotFoundError()
|
||||
|
||||
if not user.is_active:
|
||||
raise AccountDisabledError()
|
||||
|
||||
return user
|
||||
9
src/myauth/__init__.py
Normal file
9
src/myauth/__init__.py
Normal file
@@ -0,0 +1,9 @@
|
||||
from .factory import create_sqlite_auth_service, create_auth_app_for_sqlite, create_auth_app_for_mongodb, \
|
||||
create_auth_app_for_postgresql
|
||||
|
||||
__all__ = [
|
||||
'create_sqlite_auth_service',
|
||||
'create_auth_app_for_sqlite',
|
||||
'create_auth_app_for_mongodb',
|
||||
'create_auth_app_for_postgresql'
|
||||
]
|
||||
3
src/myauth/api/__init__.py
Normal file
3
src/myauth/api/__init__.py
Normal file
@@ -0,0 +1,3 @@
|
||||
from .routes import create_auth_app
|
||||
|
||||
__all__ = ["create_auth_app"]
|
||||
@@ -4,10 +4,9 @@ FastAPI routes for authentication module.
|
||||
This module provides ready-to-use FastAPI routes for all authentication
|
||||
operations. Routes are organized in an APIRouter with /auth prefix.
|
||||
"""
|
||||
|
||||
from typing import Annotated
|
||||
|
||||
from fastapi import APIRouter, Depends, HTTPException, status
|
||||
from fastapi import Depends, HTTPException, status, FastAPI
|
||||
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
|
||||
|
||||
from ..core.auth import AuthService
|
||||
@@ -18,13 +17,13 @@ from ..models.email_verification import (
|
||||
PasswordResetConfirm
|
||||
)
|
||||
from ..models.token import AccessTokenResponse, RefreshTokenRequest
|
||||
from ..models.user import UserCreate, UserResponse
|
||||
from ..models.user import UserCreate, UserResponse, UserUpdate, UserUpdateMe
|
||||
|
||||
# OAuth2 scheme for token authentication
|
||||
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/login")
|
||||
|
||||
|
||||
def create_auth_router(auth_service: AuthService) -> APIRouter:
|
||||
def create_auth_app(auth_service: AuthService) -> FastAPI:
|
||||
"""
|
||||
Create and configure the authentication router.
|
||||
|
||||
@@ -40,13 +39,13 @@ def create_auth_router(auth_service: AuthService) -> APIRouter:
|
||||
|
||||
Example:
|
||||
>>> from fastapi import FastAPI
|
||||
>>> from auth_module.api.routes import create_auth_router
|
||||
>>> from myauth.api.routes import create_auth_app
|
||||
>>>
|
||||
>>> app = FastAPI()
|
||||
>>> auth_router = create_auth_router(auth_service)
|
||||
>>> app.include_router(auth_router)
|
||||
>>> auth_api = create_auth_app(auth_service)
|
||||
>>> app.mount(auth_api)
|
||||
"""
|
||||
router = APIRouter(prefix="/auth", tags=["authentication"])
|
||||
auth_app = FastAPI(prefix="/auth", tags=["authentication"])
|
||||
|
||||
def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]) -> UserResponse:
|
||||
"""
|
||||
@@ -65,7 +64,7 @@ def create_auth_router(auth_service: AuthService) -> APIRouter:
|
||||
HTTPException: 401 if token is invalid or expired.
|
||||
|
||||
Example:
|
||||
>>> @app.get("/protected")
|
||||
>>> @auth_app.get("/protected")
|
||||
>>> def protected_route(user: UserResponse = Depends(get_current_user)):
|
||||
>>> return {"user_id": user.id}
|
||||
"""
|
||||
@@ -86,7 +85,36 @@ def create_auth_router(auth_service: AuthService) -> APIRouter:
|
||||
detail=e.message
|
||||
)
|
||||
|
||||
@router.post(
|
||||
def get_current_admin(current_user: Annotated[UserResponse, Depends(get_current_user)]) -> UserResponse:
|
||||
"""
|
||||
Dependency to ensure the current user has admin role.
|
||||
|
||||
This dependency can be used in any route that requires admin privileges.
|
||||
It first validates the user's token (via get_current_user) then checks
|
||||
for the 'admin' role.
|
||||
|
||||
Args:
|
||||
current_user: The authenticated user (injected by get_current_user dependency).
|
||||
|
||||
Returns:
|
||||
The current authenticated admin user.
|
||||
|
||||
Raises:
|
||||
HTTPException: 403 if user does not have admin role.
|
||||
|
||||
Example:
|
||||
>>> @auth_app.patch("/users/{user_id}")
|
||||
>>> def update_user(user_id: str, admin: UserResponse = Depends(get_current_admin)):
|
||||
>>> # Only admins can access this route
|
||||
"""
|
||||
if "admin" not in current_user.roles:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_403_FORBIDDEN,
|
||||
detail="Admin privileges required"
|
||||
)
|
||||
return current_user
|
||||
|
||||
@auth_app.post(
|
||||
"/register",
|
||||
response_model=UserResponse,
|
||||
status_code=status.HTTP_201_CREATED,
|
||||
@@ -128,7 +156,7 @@ def create_auth_router(auth_service: AuthService) -> APIRouter:
|
||||
detail=e.message
|
||||
)
|
||||
|
||||
@router.post(
|
||||
@auth_app.post(
|
||||
"/login",
|
||||
response_model=AccessTokenResponse,
|
||||
summary="Login with email and password",
|
||||
@@ -163,7 +191,7 @@ def create_auth_router(auth_service: AuthService) -> APIRouter:
|
||||
detail=e.message
|
||||
)
|
||||
|
||||
@router.post(
|
||||
@auth_app.post(
|
||||
"/logout",
|
||||
status_code=status.HTTP_204_NO_CONTENT,
|
||||
summary="Logout user",
|
||||
@@ -185,7 +213,7 @@ def create_auth_router(auth_service: AuthService) -> APIRouter:
|
||||
auth_service.logout(request.refresh_token)
|
||||
return None
|
||||
|
||||
@router.post(
|
||||
@auth_app.post(
|
||||
"/refresh",
|
||||
response_model=AccessTokenResponse,
|
||||
summary="Refresh access token",
|
||||
@@ -217,7 +245,7 @@ def create_auth_router(auth_service: AuthService) -> APIRouter:
|
||||
detail=e.message
|
||||
)
|
||||
|
||||
@router.post(
|
||||
@auth_app.post(
|
||||
"/password-reset-request",
|
||||
status_code=status.HTTP_200_OK,
|
||||
summary="Request password reset",
|
||||
@@ -252,7 +280,7 @@ def create_auth_router(auth_service: AuthService) -> APIRouter:
|
||||
detail=e.message
|
||||
)
|
||||
|
||||
@router.post(
|
||||
@auth_app.post(
|
||||
"/password-reset",
|
||||
status_code=status.HTTP_200_OK,
|
||||
summary="Reset password with token",
|
||||
@@ -284,7 +312,7 @@ def create_auth_router(auth_service: AuthService) -> APIRouter:
|
||||
detail=e.message
|
||||
)
|
||||
|
||||
@router.post(
|
||||
@auth_app.post(
|
||||
"/verify-email-request",
|
||||
status_code=status.HTTP_200_OK,
|
||||
summary="Request email verification",
|
||||
@@ -320,7 +348,7 @@ def create_auth_router(auth_service: AuthService) -> APIRouter:
|
||||
detail=e.message
|
||||
)
|
||||
|
||||
@router.get(
|
||||
@auth_app.get(
|
||||
"/verify-email",
|
||||
status_code=status.HTTP_200_OK,
|
||||
summary="Verify email with token",
|
||||
@@ -352,7 +380,7 @@ def create_auth_router(auth_service: AuthService) -> APIRouter:
|
||||
detail=e.message
|
||||
)
|
||||
|
||||
@router.get(
|
||||
@auth_app.get(
|
||||
"/me",
|
||||
response_model=UserResponse,
|
||||
summary="Get current user",
|
||||
@@ -376,4 +404,122 @@ def create_auth_router(auth_service: AuthService) -> APIRouter:
|
||||
"""
|
||||
return current_user
|
||||
|
||||
return router
|
||||
@auth_app.patch(
|
||||
"/me",
|
||||
response_model=UserResponse,
|
||||
summary="Update own profile",
|
||||
description="Update the current user's profile (restricted fields)."
|
||||
)
|
||||
def update_me(
|
||||
updates: UserUpdateMe,
|
||||
current_user: Annotated[UserResponse, Depends(get_current_user)]
|
||||
) -> UserResponse:
|
||||
"""
|
||||
Update the current authenticated user's profile.
|
||||
|
||||
This endpoint allows users to update their own profile with restrictions:
|
||||
- Allowed fields: email, username, password, user_settings
|
||||
- Forbidden fields: roles, is_active, is_verified (must use other flows)
|
||||
|
||||
When changing password, the current session is preserved if refresh_token
|
||||
is provided in the request body.
|
||||
|
||||
Args:
|
||||
updates: User update data (restricted fields only).
|
||||
current_user: The authenticated user (injected by dependency).
|
||||
|
||||
Returns:
|
||||
Updated user information.
|
||||
|
||||
Raises:
|
||||
HTTPException: 409 if new email already exists.
|
||||
HTTPException: 422 if validation fails.
|
||||
"""
|
||||
try:
|
||||
# Convert UserUpdateMe to UserUpdate (only allowed fields)
|
||||
from ..models.user import UserUpdate
|
||||
|
||||
user_update = UserUpdate(
|
||||
email=updates.email,
|
||||
username=updates.username,
|
||||
password=updates.password,
|
||||
user_settings=updates.user_settings
|
||||
)
|
||||
|
||||
# Update user with optional refresh_token to preserve session
|
||||
updated_user = auth_service.update_user(
|
||||
user_id=current_user.id,
|
||||
updates=user_update,
|
||||
refresh_token=updates.refresh_token
|
||||
)
|
||||
|
||||
return UserResponse(
|
||||
id=updated_user.id,
|
||||
email=updated_user.email,
|
||||
username=updated_user.username,
|
||||
roles=updated_user.roles,
|
||||
user_settings=updated_user.user_settings,
|
||||
created_at=updated_user.created_at,
|
||||
updated_at=updated_user.updated_at
|
||||
)
|
||||
except AuthError as e:
|
||||
raise HTTPException(
|
||||
status_code=e.status_code,
|
||||
detail=e.message
|
||||
)
|
||||
|
||||
@auth_app.patch(
|
||||
"/users/{user_id}",
|
||||
response_model=UserResponse,
|
||||
summary="Update any user (admin only)",
|
||||
description="Update any user's profile with full access to all fields (admin only)."
|
||||
)
|
||||
def update_user_by_id(
|
||||
user_id: str,
|
||||
updates: UserUpdate,
|
||||
admin: Annotated[UserResponse, Depends(get_current_admin)]
|
||||
) -> UserResponse:
|
||||
"""
|
||||
Update any user's profile (admin only).
|
||||
|
||||
This endpoint allows administrators to update any user with full access
|
||||
to all fields including roles, is_active, and is_verified.
|
||||
|
||||
Args:
|
||||
user_id: The ID of the user to update.
|
||||
updates: User update data (all fields allowed).
|
||||
admin: The authenticated admin user (injected by dependency).
|
||||
|
||||
Returns:
|
||||
Updated user information.
|
||||
|
||||
Raises:
|
||||
HTTPException: 403 if user is not an admin.
|
||||
HTTPException: 404 if user not found.
|
||||
HTTPException: 409 if new email already exists.
|
||||
HTTPException: 422 if validation fails.
|
||||
"""
|
||||
try:
|
||||
# Admin updates don't preserve refresh tokens (security)
|
||||
updated_user = auth_service.update_user(
|
||||
user_id=user_id,
|
||||
updates=updates,
|
||||
refresh_token=None
|
||||
)
|
||||
|
||||
return UserResponse(
|
||||
id=updated_user.id,
|
||||
email=updated_user.email,
|
||||
username=updated_user.username,
|
||||
roles=updated_user.roles,
|
||||
user_settings=updated_user.user_settings,
|
||||
created_at=updated_user.created_at,
|
||||
updated_at=updated_user.updated_at
|
||||
)
|
||||
except AuthError as e:
|
||||
raise HTTPException(
|
||||
status_code=e.status_code,
|
||||
detail=e.message
|
||||
)
|
||||
|
||||
return auth_app
|
||||
5
src/myauth/core/__init__.py
Normal file
5
src/myauth/core/__init__.py
Normal file
@@ -0,0 +1,5 @@
|
||||
from .auth import AuthService
|
||||
from .password import PasswordManager
|
||||
from .token import TokenManager
|
||||
|
||||
__all__ = ["AuthService", "PasswordManager", "TokenManager"]
|
||||
572
src/myauth/core/auth.py
Normal file
572
src/myauth/core/auth.py
Normal file
@@ -0,0 +1,572 @@
|
||||
"""
|
||||
Authentication service for the authentication module.
|
||||
|
||||
This module provides the main authentication service that orchestrates
|
||||
all authentication operations including registration, login, token management,
|
||||
password reset, and email verification.
|
||||
"""
|
||||
import os
|
||||
from datetime import datetime
|
||||
from typing import Optional
|
||||
|
||||
from .password import PasswordManager
|
||||
from .token import TokenManager
|
||||
from ..emailing.base import EmailService
|
||||
from ..exceptions import (
|
||||
InvalidCredentialsError,
|
||||
UserNotFoundError,
|
||||
AccountDisabledError,
|
||||
ExpiredTokenError,
|
||||
InvalidTokenError,
|
||||
RevokedTokenError, UserAlreadyExistsError
|
||||
)
|
||||
from ..models.token import AccessTokenResponse, TokenData
|
||||
from ..models.user import UserCreate, UserInDB, UserUpdate, UserCreateNoValidation
|
||||
from ..persistence.base import UserRepository, TokenRepository
|
||||
|
||||
|
||||
class AuthService:
|
||||
"""
|
||||
Main authentication service.
|
||||
|
||||
This service orchestrates all authentication-related operations by
|
||||
coordinating between password management, token management, and
|
||||
persistence layers.
|
||||
|
||||
Attributes:
|
||||
user_repository: Repository for user persistence operations.
|
||||
token_repository: Repository for token persistence operations.
|
||||
password_manager: Manager for password hashing and verification.
|
||||
token_manager: Manager for token creation and validation.
|
||||
email_service: Optional service for sending emails.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
user_repository: UserRepository,
|
||||
token_repository: TokenRepository,
|
||||
password_manager: PasswordManager,
|
||||
token_manager: TokenManager,
|
||||
email_service: Optional[EmailService] = None
|
||||
):
|
||||
"""
|
||||
Initialize the authentication service.
|
||||
|
||||
Args:
|
||||
user_repository: Repository for user persistence.
|
||||
token_repository: Repository for token persistence.
|
||||
password_manager: Manager for password hashing and verification.
|
||||
token_manager: Manager for token creation and validation.
|
||||
email_service: Optional service for sending emails (password reset, verification).
|
||||
"""
|
||||
self.user_repository = user_repository
|
||||
self.token_repository = token_repository
|
||||
self.password_manager = password_manager
|
||||
self.token_manager = token_manager
|
||||
self.email_service = email_service
|
||||
|
||||
def create_admin_if_needed(self, admin_email: str = None, admin_username: str = None, admin_password: str = None):
|
||||
"""
|
||||
Create the admin user if it does not exist. This function checks the current number
|
||||
of users in the system. If there are no users present, it creates a user with
|
||||
administrative privileges using the provided credentials or environment variables
|
||||
as defaults.
|
||||
|
||||
:param admin_email: The email of the admin user. Defaults to "AUTH_ADMIN_EMAIL"
|
||||
environment variable or "admin@myauth.com" if not provided.
|
||||
:type admin_email: str, optional
|
||||
:param admin_username: The username of the admin user. Defaults to
|
||||
"AUTH_ADMIN_USERNAME" environment variable or "admin" if not provided.
|
||||
:type admin_username: str, optional
|
||||
:param admin_password: The password of the admin user. Defaults to
|
||||
"AUTH_ADMIN_PASSWORD" environment variable or "admin" if not provided.
|
||||
:type admin_password: str, optional
|
||||
:return: True if an admin user is created, otherwise False.
|
||||
:rtype: bool
|
||||
"""
|
||||
# create the admin user if it doesn't exist
|
||||
nb_users = self.count_users()
|
||||
if nb_users == 0:
|
||||
admin_email = admin_email or os.getenv("AUTH_ADMIN_EMAIL", "admin@myauth.com")
|
||||
admin_username = admin_username or os.getenv("AUTH_ADMIN_USERNAME", "admin")
|
||||
admin_password = admin_password or os.getenv("AUTH_ADMIN_PASSWORD", "admin")
|
||||
|
||||
admin_user = UserCreateNoValidation(
|
||||
email=admin_email,
|
||||
username=admin_username,
|
||||
password=admin_password,
|
||||
roles=["admin"]
|
||||
)
|
||||
self.register(admin_user)
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
def register(self, user_data: UserCreate | UserCreateNoValidation) -> UserInDB:
|
||||
"""
|
||||
Register a new user.
|
||||
|
||||
This method creates a new user account with hashed password.
|
||||
The user's email is initially unverified.
|
||||
|
||||
Args:
|
||||
user_data: User registration data including password.
|
||||
|
||||
Returns:
|
||||
The created user (without password).
|
||||
|
||||
Raises:
|
||||
UserAlreadyExistsError: If email is already registered.
|
||||
|
||||
Example:
|
||||
>>> user_data = UserCreate(
|
||||
... email="user@example.com",
|
||||
... username="john_doe",
|
||||
... password="SecurePass123!"
|
||||
... )
|
||||
>>> user = auth_service.register(user_data)
|
||||
"""
|
||||
hashed_password = self.password_manager.hash_password(user_data.password)
|
||||
user = self.user_repository.create_user(user_data, hashed_password)
|
||||
return user
|
||||
|
||||
def login(self, email: str, password: str) -> tuple[UserInDB, AccessTokenResponse]:
|
||||
"""
|
||||
Authenticate a user and create tokens.
|
||||
|
||||
This method verifies credentials, checks account status, and generates
|
||||
both access and refresh tokens.
|
||||
|
||||
Args:
|
||||
email: User's email address.
|
||||
password: User's plain text password.
|
||||
|
||||
Returns:
|
||||
Tuple of (user, tokens) where tokens contains access_token and refresh_token.
|
||||
|
||||
Raises:
|
||||
InvalidCredentialsError: If email or password is incorrect.
|
||||
AccountDisabledError: If user account is disabled.
|
||||
|
||||
Example:
|
||||
>>> user, tokens = auth_service.login("user@example.com", "password")
|
||||
>>> print(tokens.access_token)
|
||||
"""
|
||||
# Get user by email
|
||||
user = self.user_repository.get_user_by_email(email)
|
||||
if not user:
|
||||
raise InvalidCredentialsError()
|
||||
|
||||
# Verify password
|
||||
if not self.password_manager.verify_password(password, user.hashed_password):
|
||||
raise InvalidCredentialsError()
|
||||
|
||||
# Check if account is active
|
||||
if not user.is_active:
|
||||
raise AccountDisabledError()
|
||||
|
||||
# Create tokens
|
||||
access_token = self.token_manager.create_access_token(user)
|
||||
refresh_token = self.token_manager.create_refresh_token()
|
||||
|
||||
# Store refresh token in database
|
||||
token_data = TokenData(
|
||||
token=refresh_token,
|
||||
token_type="refresh",
|
||||
user_id=user.id,
|
||||
expires_at=self.token_manager.get_refresh_token_expiration(),
|
||||
created_at=datetime.utcnow(),
|
||||
is_revoked=False
|
||||
)
|
||||
self.token_repository.save_token(token_data)
|
||||
|
||||
tokens = AccessTokenResponse(
|
||||
access_token=access_token,
|
||||
refresh_token=refresh_token,
|
||||
token_type="bearer"
|
||||
)
|
||||
|
||||
return user, tokens
|
||||
|
||||
def refresh_access_token(self, refresh_token: str) -> AccessTokenResponse:
|
||||
"""
|
||||
Create a new access token using a refresh token.
|
||||
|
||||
This method validates the refresh token and generates a new access token
|
||||
without requiring the user to re-enter their password.
|
||||
|
||||
Args:
|
||||
refresh_token: The refresh token to exchange.
|
||||
|
||||
Returns:
|
||||
New access and refresh tokens.
|
||||
|
||||
Raises:
|
||||
InvalidTokenError: If refresh token is invalid.
|
||||
ExpiredTokenError: If refresh token has expired.
|
||||
RevokedTokenError: If refresh token has been revoked.
|
||||
UserNotFoundError: If user no longer exists.
|
||||
AccountDisabledError: If user account is disabled.
|
||||
|
||||
Example:
|
||||
>>> tokens = auth_service.refresh_access_token(old_refresh_token)
|
||||
>>> print(tokens.access_token)
|
||||
"""
|
||||
# Validate refresh token
|
||||
token_data = self.token_repository.get_token(refresh_token, "refresh")
|
||||
if not token_data:
|
||||
raise InvalidTokenError("Invalid refresh token")
|
||||
|
||||
if token_data.is_revoked:
|
||||
raise RevokedTokenError()
|
||||
|
||||
if token_data.expires_at < datetime.utcnow():
|
||||
raise ExpiredTokenError()
|
||||
|
||||
# Get user
|
||||
user = self.user_repository.get_user_by_id(token_data.user_id)
|
||||
if not user:
|
||||
raise UserNotFoundError()
|
||||
|
||||
if not user.is_active:
|
||||
raise AccountDisabledError()
|
||||
|
||||
# Create new tokens
|
||||
access_token = self.token_manager.create_access_token(user)
|
||||
new_refresh_token = self.token_manager.create_refresh_token()
|
||||
|
||||
# Revoke old refresh token
|
||||
self.token_repository.revoke_token(refresh_token)
|
||||
|
||||
# Store new refresh token
|
||||
new_token_data = TokenData(
|
||||
token=new_refresh_token,
|
||||
token_type="refresh",
|
||||
user_id=user.id,
|
||||
expires_at=self.token_manager.get_refresh_token_expiration(),
|
||||
created_at=datetime.utcnow(),
|
||||
is_revoked=False
|
||||
)
|
||||
self.token_repository.save_token(new_token_data)
|
||||
|
||||
return AccessTokenResponse(
|
||||
access_token=access_token,
|
||||
refresh_token=new_refresh_token,
|
||||
token_type="bearer"
|
||||
)
|
||||
|
||||
def logout(self, refresh_token: str) -> bool:
|
||||
"""
|
||||
Logout a user by revoking their refresh token.
|
||||
|
||||
This prevents the refresh token from being used to obtain new
|
||||
access tokens. The current access token will remain valid until
|
||||
it expires naturally.
|
||||
|
||||
Args:
|
||||
refresh_token: The refresh token to revoke.
|
||||
|
||||
Returns:
|
||||
True if logout was successful, False if token not found.
|
||||
|
||||
Example:
|
||||
>>> auth_service.logout(refresh_token)
|
||||
True
|
||||
"""
|
||||
return self.token_repository.revoke_token(refresh_token)
|
||||
|
||||
def request_password_reset(self, email: str) -> str:
|
||||
"""
|
||||
Generate a password reset token for a user.
|
||||
|
||||
This method creates a secure token that can be sent to the user
|
||||
via email to reset their password. If an email service is configured,
|
||||
the email is sent automatically. Otherwise, the token is returned
|
||||
for manual handling.
|
||||
|
||||
Args:
|
||||
email: User's email address.
|
||||
|
||||
Returns:
|
||||
The password reset token to be sent via email.
|
||||
|
||||
Raises:
|
||||
UserNotFoundError: If email is not registered.
|
||||
|
||||
Example:
|
||||
>>> token = auth_service.request_password_reset("user@example.com")
|
||||
>>> # Token is automatically sent via email if service is configured
|
||||
"""
|
||||
user = self.user_repository.get_user_by_email(email)
|
||||
if not user:
|
||||
raise UserNotFoundError(f"No user found with email {email}")
|
||||
|
||||
# Create reset token
|
||||
reset_token = self.token_manager.create_password_reset_token()
|
||||
|
||||
# Store token in database
|
||||
token_data = TokenData(
|
||||
token=reset_token,
|
||||
token_type="password_reset",
|
||||
user_id=user.id,
|
||||
expires_at=self.token_manager.get_password_reset_token_expiration(),
|
||||
created_at=datetime.utcnow(),
|
||||
is_revoked=False
|
||||
)
|
||||
self.token_repository.save_token(token_data)
|
||||
|
||||
# Send email if service is configured
|
||||
if self.email_service:
|
||||
self.email_service.send_password_reset_email(email, reset_token)
|
||||
|
||||
return reset_token
|
||||
|
||||
def reset_password(self, token: str, new_password: str) -> bool:
|
||||
"""
|
||||
Reset a user's password using a reset token.
|
||||
|
||||
This method validates the reset token and updates the user's password.
|
||||
All existing refresh tokens for the user are revoked for security.
|
||||
|
||||
Args:
|
||||
token: Password reset token.
|
||||
new_password: New plain text password (will be hashed).
|
||||
|
||||
Returns:
|
||||
True if password was reset successfully.
|
||||
|
||||
Raises:
|
||||
InvalidTokenError: If reset token is invalid.
|
||||
ExpiredTokenError: If reset token has expired.
|
||||
RevokedTokenError: If reset token has been used.
|
||||
UserNotFoundError: If user no longer exists.
|
||||
|
||||
Example:
|
||||
>>> auth_service.reset_password(token, "NewSecurePass123!")
|
||||
True
|
||||
"""
|
||||
# Validate reset token
|
||||
token_data = self.token_repository.get_token(token, "password_reset")
|
||||
if not token_data:
|
||||
raise InvalidTokenError("Invalid password reset token")
|
||||
|
||||
if token_data.is_revoked:
|
||||
raise RevokedTokenError("Password reset token has already been used")
|
||||
|
||||
if token_data.expires_at < datetime.utcnow():
|
||||
raise ExpiredTokenError("Password reset token has expired")
|
||||
|
||||
# Get user
|
||||
user = self.user_repository.get_user_by_id(token_data.user_id)
|
||||
if not user:
|
||||
raise UserNotFoundError()
|
||||
|
||||
# Hash new password
|
||||
hashed_password = self.password_manager.hash_password(new_password)
|
||||
|
||||
# Update user password
|
||||
updates = UserUpdate(password=hashed_password)
|
||||
self.user_repository.update_user(user.id, updates)
|
||||
|
||||
# Revoke the reset token
|
||||
self.token_repository.revoke_token(token)
|
||||
|
||||
# Revoke all user's refresh tokens for security
|
||||
self.token_repository.revoke_all_user_tokens(user.id, "refresh")
|
||||
|
||||
return True
|
||||
|
||||
def request_email_verification(self, email: str) -> str:
|
||||
"""
|
||||
Generate an email verification token for a user.
|
||||
|
||||
This method creates a JWT token that can be sent to the user
|
||||
to verify their email address. If an email service is configured,
|
||||
the email is sent automatically. Otherwise, the token is returned
|
||||
for manual handling.
|
||||
|
||||
Args:
|
||||
email: User's email address.
|
||||
|
||||
Returns:
|
||||
The email verification token (JWT) to be sent via email.
|
||||
|
||||
Raises:
|
||||
UserNotFoundError: If email is not registered.
|
||||
|
||||
Example:
|
||||
>>> token = auth_service.request_email_verification("user@example.com")
|
||||
>>> # Token is automatically sent via email if service is configured
|
||||
"""
|
||||
user = self.user_repository.get_user_by_email(email)
|
||||
if not user:
|
||||
raise UserNotFoundError(f"No user found with email {email}")
|
||||
|
||||
verification_token = self.token_manager.create_email_verification_token(email)
|
||||
|
||||
# Send email if service is configured
|
||||
if self.email_service:
|
||||
self.email_service.send_verification_email(email, verification_token)
|
||||
|
||||
return verification_token
|
||||
|
||||
def verify_email(self, token: str) -> bool:
|
||||
"""
|
||||
Verify a user's email address using a verification token.
|
||||
|
||||
This method decodes the JWT token, extracts the email, and marks
|
||||
the user's email as verified.
|
||||
|
||||
Args:
|
||||
token: Email verification token (JWT).
|
||||
|
||||
Returns:
|
||||
True if email was verified successfully.
|
||||
|
||||
Raises:
|
||||
InvalidTokenError: If verification token is invalid.
|
||||
ExpiredTokenError: If verification token has expired.
|
||||
UserNotFoundError: If user no longer exists.
|
||||
|
||||
Example:
|
||||
>>> auth_service.verify_email(token)
|
||||
True
|
||||
"""
|
||||
# Decode and validate token
|
||||
email = self.token_manager.decode_email_verification_token(token)
|
||||
|
||||
# Get user
|
||||
user = self.user_repository.get_user_by_email(email)
|
||||
if not user:
|
||||
raise UserNotFoundError(f"No user found with email {email}")
|
||||
|
||||
# Update user's verified status
|
||||
updates = UserUpdate(is_verified=True)
|
||||
self.user_repository.update_user(user.id, updates)
|
||||
|
||||
return True
|
||||
|
||||
def get_current_user(self, access_token: str) -> UserInDB:
|
||||
"""
|
||||
Retrieve the current user from an access token.
|
||||
|
||||
This method decodes and validates the JWT access token and
|
||||
retrieves the corresponding user from the database.
|
||||
|
||||
Args:
|
||||
access_token: JWT access token.
|
||||
|
||||
Returns:
|
||||
The user associated with the token.
|
||||
|
||||
Raises:
|
||||
InvalidTokenError: If access token is invalid.
|
||||
ExpiredTokenError: If access token has expired.
|
||||
UserNotFoundError: If user no longer exists.
|
||||
AccountDisabledError: If user account is disabled.
|
||||
|
||||
Example:
|
||||
>>> user = auth_service.get_current_user(access_token)
|
||||
>>> print(user.email)
|
||||
"""
|
||||
# Decode and validate token
|
||||
token_payload = self.token_manager.decode_access_token(access_token)
|
||||
|
||||
# Get user
|
||||
user = self.user_repository.get_user_by_id(token_payload.sub)
|
||||
if not user:
|
||||
raise UserNotFoundError()
|
||||
|
||||
if not user.is_active:
|
||||
raise AccountDisabledError()
|
||||
|
||||
return user
|
||||
|
||||
def count_users(self) -> int:
|
||||
"""
|
||||
Counts the total number of users.
|
||||
|
||||
This method retrieves and returns the total count of users
|
||||
from the user repository. It provides functionality for
|
||||
fetching user count stored in the underlying repository of
|
||||
users.
|
||||
|
||||
:return: The total count of users.
|
||||
:rtype: int
|
||||
"""
|
||||
return self.user_repository.count_users()
|
||||
|
||||
def list_users(self, skip: int = 0, limit: int = 100):
|
||||
"""
|
||||
Lists users from the user repository with optional pagination.
|
||||
|
||||
This method retrieves a list of users, allowing optional pagination
|
||||
by specifying the number of records to skip and the maximum number
|
||||
of users to retrieve.
|
||||
|
||||
:param skip: The number of users to skip in the result set. Defaults to 0.
|
||||
:type skip: int
|
||||
:param limit: The maximum number of users to retrieve. Defaults to 100.
|
||||
:type limit: int
|
||||
:return: A list of users retrieved from the repository.
|
||||
:rtype: list
|
||||
"""
|
||||
return self.user_repository.list_users(skip, limit)
|
||||
|
||||
def update_user(self, user_id: str, updates: UserUpdate, refresh_token: Optional[str] = None) -> UserInDB:
|
||||
"""
|
||||
Update an existing user's information.
|
||||
|
||||
This method handles user profile updates with automatic security measures:
|
||||
- Validates email uniqueness if email is changed
|
||||
- Automatically sets is_verified=False if email is changed
|
||||
- Hashes password if provided
|
||||
- Revokes refresh tokens (except current one) if password is changed
|
||||
|
||||
Args:
|
||||
user_id: The unique user identifier.
|
||||
updates: Pydantic model containing fields to update.
|
||||
refresh_token: Optional current refresh token to preserve when password changes.
|
||||
|
||||
Returns:
|
||||
The updated user.
|
||||
|
||||
Raises:
|
||||
UserNotFoundError: If user does not exist.
|
||||
UserAlreadyExistsError: If new email is already used by another user.
|
||||
|
||||
Example:
|
||||
>>> updates = UserUpdate(email="newemail@example.com")
|
||||
>>> user = auth_service.update_user(user_id, updates)
|
||||
>>>
|
||||
>>> # Update password while preserving current session
|
||||
>>> updates = UserUpdate(password="NewSecurePass123!")
|
||||
>>> user = auth_service.update_user(user_id, updates, refresh_token=current_token)
|
||||
"""
|
||||
# Get current user to compare changes
|
||||
current_user = self.user_repository.get_user_by_id(user_id)
|
||||
if not current_user:
|
||||
raise UserNotFoundError(f"User with id {user_id} not found")
|
||||
|
||||
# Handle email change
|
||||
if updates.email is not None and updates.email != current_user.email:
|
||||
# Check if new email is already used by another user
|
||||
if self.user_repository.email_exists(updates.email):
|
||||
raise UserAlreadyExistsError(f"Email {updates.email} is already in use")
|
||||
|
||||
# Force email verification to false when email changes
|
||||
updates.is_verified = False
|
||||
|
||||
# Handle password change
|
||||
if updates.password is not None:
|
||||
# Hash the new password
|
||||
hashed_password = self.password_manager.hash_password(updates.password)
|
||||
updates.password = hashed_password
|
||||
|
||||
# Revoke all refresh tokens except the current one (if provided)
|
||||
self.token_repository.revoke_all_user_tokens(user_id, "refresh", except_token=refresh_token)
|
||||
|
||||
# Update user in database
|
||||
updated_user = self.user_repository.update_user(user_id, updates)
|
||||
|
||||
return updated_user
|
||||
@@ -12,31 +12,21 @@ class PasswordManager:
|
||||
"""
|
||||
Manager for password hashing and verification operations.
|
||||
|
||||
This class uses bcrypt for secure password hashing with configurable
|
||||
cost factor (rounds). Higher rounds provide better security but slower
|
||||
performance.
|
||||
|
||||
Attributes:
|
||||
rounds: Bcrypt cost factor (default: 12). Higher values increase
|
||||
security but also increase computation time.
|
||||
This class uses argon2 for secure password hashing
|
||||
"""
|
||||
|
||||
def __init__(self, rounds: int = 12):
|
||||
def __init__(self):
|
||||
"""
|
||||
Initialize the password manager.
|
||||
|
||||
Args:
|
||||
rounds: Bcrypt cost factor (4-31). Default is 12 which provides
|
||||
a good balance between security and performance.
|
||||
"""
|
||||
if rounds < 4 or rounds > 31:
|
||||
raise ValueError("Bcrypt rounds must be between 4 and 31")
|
||||
|
||||
self.rounds = rounds
|
||||
self._context = CryptContext(
|
||||
schemes=["bcrypt"],
|
||||
schemes=["argon2"],
|
||||
deprecated="auto",
|
||||
bcrypt__rounds=self.rounds
|
||||
# argon2__time_cost=3, # number of iterations (increases CPU time)
|
||||
# argon2__memory_cost=65536, # memory usage in KiB (64 MiB)
|
||||
# argon2__parallelism=2, # number of parallel threads
|
||||
# argon2__salt_len=16 # length of the random salt in bytes
|
||||
)
|
||||
|
||||
def hash_password(self, password: str) -> str:
|
||||
4
src/myauth/emailing/__init__.py
Normal file
4
src/myauth/emailing/__init__.py
Normal file
@@ -0,0 +1,4 @@
|
||||
from .base import EmailService
|
||||
from .smtp import SMTPEmailService
|
||||
|
||||
__all__ = ["EmailService", "SMTPEmailService"]
|
||||
94
src/myauth/factory.py
Normal file
94
src/myauth/factory.py
Normal file
@@ -0,0 +1,94 @@
|
||||
from typing import Optional
|
||||
|
||||
from .core import AuthService
|
||||
from .core.password import PasswordManager
|
||||
from .core.token import TokenManager
|
||||
from .emailing.base import EmailService
|
||||
|
||||
|
||||
def create_sqlite_auth_service(db_path: str,
|
||||
jwt_secret: str,
|
||||
email_service: Optional[EmailService],
|
||||
password_manager: PasswordManager = None,
|
||||
token_manager: TokenManager = None,
|
||||
):
|
||||
"""
|
||||
Creates and configures an authentication service using SQLite as the underlying
|
||||
data store.
|
||||
|
||||
This function is responsible for setting up the necessary repositories
|
||||
and managers required for the AuthService. It uses SQLite repositories
|
||||
for user and token data and initializes the required managers for password
|
||||
and token processing, as well as an optional email service.
|
||||
|
||||
:param db_path: Path to the SQLite database file used to persist authentication
|
||||
data.
|
||||
:type db_path: str
|
||||
:param jwt_secret: Secret key used to encode and decode JSON Web Tokens (JWTs).
|
||||
:type jwt_secret: str
|
||||
:param email_service: Optional email service instance for managing email-based
|
||||
communication.
|
||||
:type email_service: Optional[EmailService]
|
||||
:param password_manager: Optional custom PasswordManager instance responsible for
|
||||
password-related operations. Defaults to a new instance.
|
||||
:type password_manager: PasswordManager
|
||||
:param token_manager: Optional custom TokenManager instance responsible for token
|
||||
generation and verification. Defaults to a new instance
|
||||
configured with `jwt_secret`.
|
||||
:type token_manager: TokenManager
|
||||
:return: A fully configured instance of AuthService with the specified
|
||||
repositories and managers.
|
||||
:rtype: AuthService
|
||||
"""
|
||||
from .persistence.sqlite import SQLiteUserRepository
|
||||
from .persistence.sqlite import SQLiteTokenRepository
|
||||
user_repository = SQLiteUserRepository(db_path)
|
||||
token_repository = SQLiteTokenRepository(db_path)
|
||||
password_manager = password_manager or PasswordManager()
|
||||
token_manager = token_manager or TokenManager(jwt_secret)
|
||||
|
||||
auth_service = AuthService(
|
||||
user_repository=user_repository,
|
||||
token_repository=token_repository,
|
||||
password_manager=password_manager,
|
||||
token_manager=token_manager,
|
||||
email_service=email_service)
|
||||
|
||||
return auth_service
|
||||
|
||||
|
||||
def create_auth_app_for_sqlite(db_path: str,
|
||||
jwt_secret: str,
|
||||
create_admin_user: bool = True,
|
||||
email_service: Optional[EmailService] = None):
|
||||
"""
|
||||
Creates an application router designed to use with an SQLite database backend.
|
||||
This function initializes an authentication service using the provided database
|
||||
path and JWT secret, optionally integrating an email service for email-based
|
||||
authentication functionalities. The authentication service is then used to
|
||||
create an application router for handling authentication-related API routes.
|
||||
|
||||
:param db_path: Path to the SQLite database file.
|
||||
:param jwt_secret: A secret string used for signing and verifying JWT tokens.
|
||||
:param create_admin_user: Create an admin user if necessary.
|
||||
:param email_service: An optional email service instance for managing email-related
|
||||
communication and functionalities during authentication.
|
||||
:return: An application router configured for handling authentication API routes.
|
||||
"""
|
||||
auth_service = create_sqlite_auth_service(db_path, jwt_secret, email_service)
|
||||
if create_admin_user:
|
||||
auth_service.create_admin_if_needed()
|
||||
from .api.routes import create_auth_app
|
||||
return create_auth_app(auth_service)
|
||||
|
||||
|
||||
def create_auth_app_for_mongodb(mongodb_url="mongodb://localhost:27017",
|
||||
jwt_secret="THIS_NEEDS_TO_BE_CHANGED"):
|
||||
raise NotImplementedError("MongoDB support is not yet implemented.")
|
||||
|
||||
|
||||
def create_auth_app_for_postgresql(postgresql_url="mongodb://localhost:27017",
|
||||
username="admin",
|
||||
password="password",
|
||||
jwt_secret="THIS_NEEDS_TO_BE_CHANGED"):
|
||||
raise NotImplementedError("PostgreSQL support is not yet implemented.")
|
||||
@@ -7,7 +7,7 @@ reset operations including request and confirmation models.
|
||||
|
||||
from pydantic import BaseModel, EmailStr, field_validator
|
||||
|
||||
from my_auth.models.validators import validate_password_strength
|
||||
from ..models.validators import validate_password_strength
|
||||
|
||||
|
||||
class EmailVerificationRequest(BaseModel):
|
||||
@@ -10,7 +10,7 @@ from typing import Optional
|
||||
|
||||
from pydantic import BaseModel, EmailStr, Field, field_validator
|
||||
|
||||
from my_auth.models.validators import validate_password_strength, validate_username_not_empty
|
||||
from ..models.validators import validate_password_strength, validate_username_not_empty
|
||||
|
||||
|
||||
class UserBase(BaseModel):
|
||||
@@ -33,6 +33,16 @@ class UserBase(BaseModel):
|
||||
user_settings: dict = Field(default_factory=dict)
|
||||
|
||||
|
||||
class UserCreateNoValidation(UserBase):
|
||||
"""
|
||||
Model for user creation (registration).
|
||||
|
||||
This model extends UserBase with a password field
|
||||
"""
|
||||
|
||||
password: str
|
||||
|
||||
|
||||
class UserCreate(UserBase):
|
||||
"""
|
||||
Model for user creation (registration).
|
||||
@@ -143,6 +153,67 @@ class UserUpdate(BaseModel):
|
||||
Raises:
|
||||
ValueError: If username is provided but empty or too long.
|
||||
"""
|
||||
if value is None:
|
||||
return None
|
||||
return validate_username_not_empty(value)
|
||||
|
||||
|
||||
class UserUpdateMe(BaseModel):
|
||||
"""
|
||||
Model for user self-update (restricted fields).
|
||||
|
||||
This model allows users to update their own profile with restrictions:
|
||||
- Allowed fields: email, username, password, user_settings
|
||||
- Forbidden fields: roles, is_active, is_verified
|
||||
- Optional refresh_token to preserve current session when changing password
|
||||
|
||||
Attributes:
|
||||
email: Optional new email address.
|
||||
username: Optional new username.
|
||||
password: Optional new password (will be hashed and validated).
|
||||
user_settings: Optional new settings dict.
|
||||
refresh_token: Optional refresh token to preserve when changing password.
|
||||
"""
|
||||
email: Optional[EmailStr] = None
|
||||
username: Optional[str] = None
|
||||
password: Optional[str] = None
|
||||
user_settings: Optional[dict] = None
|
||||
refresh_token: Optional[str] = None
|
||||
|
||||
@field_validator('password')
|
||||
@classmethod
|
||||
def validate_password_strength(cls, value: Optional[str]) -> Optional[str]:
|
||||
"""
|
||||
Validate password meets security requirements if provided.
|
||||
|
||||
Args:
|
||||
value: The password to validate (can be None).
|
||||
|
||||
Returns:
|
||||
The validated password or None.
|
||||
|
||||
Raises:
|
||||
ValueError: If password is provided but does not meet security requirements.
|
||||
"""
|
||||
return validate_password_strength(value)
|
||||
|
||||
@field_validator('username')
|
||||
@classmethod
|
||||
def validate_username(cls, value: Optional[str]) -> Optional[str]:
|
||||
"""
|
||||
Validate username if provided.
|
||||
|
||||
Args:
|
||||
value: The username to validate (can be None).
|
||||
|
||||
Returns:
|
||||
The validated username or None.
|
||||
|
||||
Raises:
|
||||
ValueError: If username is provided but empty or too long.
|
||||
"""
|
||||
if value is None:
|
||||
return None
|
||||
return validate_username_not_empty(value)
|
||||
|
||||
|
||||
6
src/myauth/persistence/__init__.py
Normal file
6
src/myauth/persistence/__init__.py
Normal file
@@ -0,0 +1,6 @@
|
||||
from .sqlite import UserRepository, TokenRepository
|
||||
|
||||
__all__ = [
|
||||
"UserRepository",
|
||||
"TokenRepository",
|
||||
]
|
||||
@@ -8,6 +8,7 @@ PostgreSQL, custom engines, etc.).
|
||||
"""
|
||||
|
||||
from abc import ABC, abstractmethod
|
||||
from typing import Optional
|
||||
|
||||
from ..models.token import TokenData
|
||||
from ..models.user import UserCreate, UserInDB, UserUpdate
|
||||
@@ -121,6 +122,31 @@ class UserRepository(ABC):
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def list_users(self, skip: int = 0, limit: int = 100) -> list[UserInDB]:
|
||||
"""
|
||||
List users with pagination.
|
||||
|
||||
Args:
|
||||
skip (int): Number of users to skip (default: 0)
|
||||
limit (int): Maximum number of users to return (default: 100)
|
||||
|
||||
Returns:
|
||||
List[UserInDB]: List of users
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def count_users(self) -> int:
|
||||
"""
|
||||
Count total number of users.
|
||||
|
||||
Returns:
|
||||
int: Total number of users in system
|
||||
"""
|
||||
pass
|
||||
|
||||
|
||||
|
||||
class TokenRepository(ABC):
|
||||
"""
|
||||
@@ -178,16 +204,19 @@ class TokenRepository(ABC):
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def revoke_all_user_tokens(self, user_id: str, token_type: str) -> int:
|
||||
def revoke_all_user_tokens(self, user_id: str, token_type: str, except_token: Optional[str] = None) -> int:
|
||||
"""
|
||||
Revoke all tokens of a specific type for a user.
|
||||
|
||||
This is useful for logout-all-devices functionality or when a user's
|
||||
password is changed (invalidating all refresh tokens).
|
||||
password is changed (invalidating all refresh tokens). Optionally,
|
||||
a specific token can be excluded from revocation to preserve the
|
||||
current session.
|
||||
|
||||
Args:
|
||||
user_id: The user whose tokens should be revoked.
|
||||
token_type: Type of tokens to revoke ("refresh" or "password_reset").
|
||||
except_token: Optional token string to exclude from revocation.
|
||||
|
||||
Returns:
|
||||
Number of tokens revoked.
|
||||
@@ -13,9 +13,9 @@ from typing import Optional
|
||||
from uuid import uuid4
|
||||
|
||||
from .base import UserRepository, TokenRepository
|
||||
from ..models.user import UserCreate, UserInDB, UserUpdate
|
||||
from ..models.token import TokenData
|
||||
from ..exceptions import UserAlreadyExistsError, UserNotFoundError
|
||||
from ..models.token import TokenData
|
||||
from ..models.user import UserCreate, UserInDB, UserUpdate
|
||||
|
||||
|
||||
class SQLiteUserRepository(UserRepository):
|
||||
@@ -365,6 +365,51 @@ class SQLiteUserRepository(UserRepository):
|
||||
cursor.execute("SELECT 1 FROM users WHERE email = ? LIMIT 1", (email,))
|
||||
return cursor.fetchone() is not None
|
||||
|
||||
def list_users(self, skip: int = 0, limit: int = 100) -> list[UserInDB]:
|
||||
"""
|
||||
List users with pagination.
|
||||
|
||||
Args:
|
||||
skip: Number of users to skip (default: 0).
|
||||
limit: Maximum number of users to return (default: 100).
|
||||
|
||||
Returns:
|
||||
List of users.
|
||||
"""
|
||||
with sqlite3.connect(self.db_path) as conn:
|
||||
cursor = conn.cursor()
|
||||
cursor.execute("""
|
||||
SELECT id,
|
||||
email,
|
||||
username,
|
||||
hashed_password,
|
||||
roles,
|
||||
user_settings,
|
||||
is_verified,
|
||||
is_active,
|
||||
created_at,
|
||||
updated_at
|
||||
FROM users
|
||||
ORDER BY created_at DESC LIMIT ?
|
||||
OFFSET ?
|
||||
""", (limit, skip))
|
||||
|
||||
rows = cursor.fetchall()
|
||||
return [self._row_to_user(row) for row in rows]
|
||||
|
||||
def count_users(self) -> int:
|
||||
"""
|
||||
Count total number of users.
|
||||
|
||||
Returns:
|
||||
Total number of users in system.
|
||||
"""
|
||||
with sqlite3.connect(self.db_path) as conn:
|
||||
cursor = conn.cursor()
|
||||
cursor.execute("SELECT COUNT(*) FROM users")
|
||||
result = cursor.fetchone()
|
||||
return result[0] if result else 0
|
||||
|
||||
|
||||
class SQLiteTokenRepository(TokenRepository):
|
||||
"""
|
||||
@@ -545,25 +590,44 @@ class SQLiteTokenRepository(TokenRepository):
|
||||
conn.commit()
|
||||
return cursor.rowcount > 0
|
||||
|
||||
def revoke_all_user_tokens(self, user_id: str, token_type: str) -> int:
|
||||
def revoke_all_user_tokens(self, user_id: str, token_type: str, except_token: Optional[str] = None) -> int:
|
||||
"""
|
||||
Revoke all tokens of a specific type for a user.
|
||||
|
||||
This is useful for logout-all-devices functionality or when a user's
|
||||
password is changed (invalidating all refresh tokens). Optionally,
|
||||
a specific token can be excluded from revocation to preserve the
|
||||
current session.
|
||||
|
||||
Args:
|
||||
user_id: The user whose tokens should be revoked.
|
||||
token_type: Type of tokens to revoke.
|
||||
token_type: Type of tokens to revoke ("refresh" or "password_reset").
|
||||
except_token: Optional token string to exclude from revocation.
|
||||
|
||||
Returns:
|
||||
Number of tokens revoked.
|
||||
"""
|
||||
with sqlite3.connect(self.db_path) as conn:
|
||||
cursor = conn.cursor()
|
||||
cursor.execute("""
|
||||
UPDATE tokens
|
||||
SET is_revoked = 1
|
||||
WHERE user_id = ?
|
||||
AND token_type = ?
|
||||
""", (user_id, token_type))
|
||||
|
||||
if except_token is not None:
|
||||
# Revoke all tokens except the specified one
|
||||
cursor.execute("""
|
||||
UPDATE tokens
|
||||
SET is_revoked = 1
|
||||
WHERE user_id = ?
|
||||
AND token_type = ?
|
||||
AND token != ?
|
||||
""", (user_id, token_type, except_token))
|
||||
else:
|
||||
# Revoke all tokens
|
||||
cursor.execute("""
|
||||
UPDATE tokens
|
||||
SET is_revoked = 1
|
||||
WHERE user_id = ?
|
||||
AND token_type = ?
|
||||
""", (user_id, token_type))
|
||||
|
||||
conn.commit()
|
||||
return cursor.rowcount
|
||||
|
||||
@@ -12,9 +12,9 @@ import pytest
|
||||
from fastapi import FastAPI
|
||||
from fastapi.testclient import TestClient
|
||||
|
||||
from my_auth.api.routes import create_auth_router
|
||||
from my_auth.core.auth import AuthService
|
||||
from my_auth.exceptions import (
|
||||
from myauth.api import create_auth_app
|
||||
from myauth.core.auth import AuthService
|
||||
from myauth.exceptions import (
|
||||
UserAlreadyExistsError,
|
||||
InvalidCredentialsError,
|
||||
UserNotFoundError,
|
||||
@@ -23,8 +23,8 @@ from my_auth.exceptions import (
|
||||
RevokedTokenError,
|
||||
AccountDisabledError
|
||||
)
|
||||
from my_auth.models.token import AccessTokenResponse
|
||||
from my_auth.models.user import UserInDB
|
||||
from myauth.models.token import AccessTokenResponse
|
||||
from myauth.models.user import UserInDB
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
@@ -51,8 +51,8 @@ def test_app(mock_auth_service):
|
||||
FastAPI application configured for testing.
|
||||
"""
|
||||
app = FastAPI()
|
||||
auth_router = create_auth_router(mock_auth_service)
|
||||
app.include_router(auth_router)
|
||||
auth_app = create_auth_app(mock_auth_service)
|
||||
app.mount("/auth", auth_app)
|
||||
return app
|
||||
|
||||
|
||||
@@ -513,3 +513,361 @@ def test_i_cannot_access_protected_route_with_invalid_token(client, mock_auth_se
|
||||
)
|
||||
|
||||
assert response.status_code == 401
|
||||
|
||||
|
||||
def test_i_can_update_my_email(client, mock_auth_service, sample_user):
|
||||
"""
|
||||
Test user can update their own email.
|
||||
|
||||
Verifies that a PATCH request to /auth/me with a new email
|
||||
successfully updates the user's email address.
|
||||
"""
|
||||
updated_user = sample_user.model_copy(update={"email": "newemail@example.com"})
|
||||
mock_auth_service.get_current_user.return_value = sample_user
|
||||
mock_auth_service.update_user.return_value = updated_user
|
||||
|
||||
response = client.patch(
|
||||
"/auth/me",
|
||||
headers={"Authorization": "Bearer sample.access.token"},
|
||||
json={"email": "newemail@example.com"}
|
||||
)
|
||||
|
||||
assert response.status_code == 200
|
||||
data = response.json()
|
||||
assert data["email"] == "newemail@example.com"
|
||||
mock_auth_service.update_user.assert_called_once()
|
||||
|
||||
|
||||
def test_i_can_update_my_username(client, mock_auth_service, sample_user):
|
||||
"""
|
||||
Test user can update their own username.
|
||||
|
||||
Verifies that a PATCH request to /auth/me with a new username
|
||||
successfully updates the user's username.
|
||||
"""
|
||||
updated_user = sample_user.model_copy(update={"username": "newusername"})
|
||||
mock_auth_service.get_current_user.return_value = sample_user
|
||||
mock_auth_service.update_user.return_value = updated_user
|
||||
|
||||
response = client.patch(
|
||||
"/auth/me",
|
||||
headers={"Authorization": "Bearer sample.access.token"},
|
||||
json={"username": "newusername"}
|
||||
)
|
||||
|
||||
assert response.status_code == 200
|
||||
data = response.json()
|
||||
assert data["username"] == "newusername"
|
||||
mock_auth_service.update_user.assert_called_once()
|
||||
|
||||
|
||||
def test_i_can_update_my_password(client, mock_auth_service, sample_user):
|
||||
"""
|
||||
Test user can update their own password.
|
||||
|
||||
Verifies that a PATCH request to /auth/me with a new password
|
||||
successfully updates the password (which will be hashed by the service).
|
||||
"""
|
||||
mock_auth_service.get_current_user.return_value = sample_user
|
||||
mock_auth_service.update_user.return_value = sample_user
|
||||
|
||||
response = client.patch(
|
||||
"/auth/me",
|
||||
headers={"Authorization": "Bearer sample.access.token"},
|
||||
json={"password": "NewSecurePass123!"}
|
||||
)
|
||||
|
||||
assert response.status_code == 200
|
||||
mock_auth_service.update_user.assert_called_once()
|
||||
# Verify password was included in the update
|
||||
call_args = mock_auth_service.update_user.call_args
|
||||
assert call_args[1]["updates"].password == "NewSecurePass123!"
|
||||
|
||||
|
||||
def test_i_can_update_my_password_and_preserve_session(client, mock_auth_service, sample_user):
|
||||
"""
|
||||
Test user can update password while preserving current session.
|
||||
|
||||
Verifies that when a refresh_token is provided in the request body,
|
||||
it is passed to the service to preserve the current session.
|
||||
"""
|
||||
mock_auth_service.get_current_user.return_value = sample_user
|
||||
mock_auth_service.update_user.return_value = sample_user
|
||||
|
||||
response = client.patch(
|
||||
"/auth/me",
|
||||
headers={"Authorization": "Bearer sample.access.token"},
|
||||
json={
|
||||
"password": "NewSecurePass123!",
|
||||
"refresh_token": "current_refresh_token"
|
||||
}
|
||||
)
|
||||
|
||||
assert response.status_code == 200
|
||||
mock_auth_service.update_user.assert_called_once()
|
||||
# Verify refresh_token was passed to preserve session
|
||||
call_args = mock_auth_service.update_user.call_args
|
||||
assert call_args[1]["refresh_token"] == "current_refresh_token"
|
||||
|
||||
|
||||
def test_i_can_update_my_user_settings(client, mock_auth_service, sample_user):
|
||||
"""
|
||||
Test user can update their own settings.
|
||||
|
||||
Verifies that a PATCH request to /auth/me with new user_settings
|
||||
successfully updates the user's custom settings.
|
||||
"""
|
||||
new_settings = {"theme": "dark", "language": "fr", "notifications": True}
|
||||
updated_user = sample_user.model_copy(update={"user_settings": new_settings})
|
||||
mock_auth_service.get_current_user.return_value = sample_user
|
||||
mock_auth_service.update_user.return_value = updated_user
|
||||
|
||||
response = client.patch(
|
||||
"/auth/me",
|
||||
headers={"Authorization": "Bearer sample.access.token"},
|
||||
json={"user_settings": new_settings}
|
||||
)
|
||||
|
||||
assert response.status_code == 200
|
||||
data = response.json()
|
||||
assert data["user_settings"] == new_settings
|
||||
mock_auth_service.update_user.assert_called_once()
|
||||
|
||||
|
||||
def test_i_can_update_multiple_fields_on_my_profile(client, mock_auth_service, sample_user):
|
||||
"""
|
||||
Test user can update multiple fields simultaneously.
|
||||
|
||||
Verifies that a PATCH request to /auth/me can update multiple
|
||||
fields (email, username, user_settings) at once.
|
||||
"""
|
||||
updated_user = sample_user.model_copy(update={
|
||||
"email": "multiemail@example.com",
|
||||
"username": "multiuser",
|
||||
"user_settings": {"theme": "light"}
|
||||
})
|
||||
mock_auth_service.get_current_user.return_value = sample_user
|
||||
mock_auth_service.update_user.return_value = updated_user
|
||||
|
||||
response = client.patch(
|
||||
"/auth/me",
|
||||
headers={"Authorization": "Bearer sample.access.token"},
|
||||
json={
|
||||
"email": "multiemail@example.com",
|
||||
"username": "multiuser",
|
||||
"user_settings": {"theme": "light"}
|
||||
}
|
||||
)
|
||||
|
||||
assert response.status_code == 200
|
||||
data = response.json()
|
||||
assert data["email"] == "multiemail@example.com"
|
||||
assert data["username"] == "multiuser"
|
||||
assert data["user_settings"] == {"theme": "light"}
|
||||
mock_auth_service.update_user.assert_called_once()
|
||||
|
||||
|
||||
def test_i_cannot_update_my_profile_without_authentication(client):
|
||||
"""
|
||||
Test updating profile fails without authentication.
|
||||
|
||||
Verifies that a PATCH request to /auth/me without a Bearer token
|
||||
returns 401 Unauthorized status.
|
||||
"""
|
||||
response = client.patch(
|
||||
"/auth/me",
|
||||
json={"username": "shouldfail"}
|
||||
)
|
||||
|
||||
assert response.status_code == 401
|
||||
|
||||
|
||||
def test_i_cannot_update_my_email_to_existing_email(client, mock_auth_service, sample_user):
|
||||
"""
|
||||
Test updating email fails if already exists.
|
||||
|
||||
Verifies that attempting to update email to an already registered
|
||||
email returns 409 Conflict status.
|
||||
"""
|
||||
mock_auth_service.get_current_user.return_value = sample_user
|
||||
mock_auth_service.update_user.side_effect = UserAlreadyExistsError(
|
||||
"Email already in use"
|
||||
)
|
||||
|
||||
response = client.patch(
|
||||
"/auth/me",
|
||||
headers={"Authorization": "Bearer sample.access.token"},
|
||||
json={"email": "existing@example.com"}
|
||||
)
|
||||
|
||||
assert response.status_code == 409
|
||||
assert "already" in response.json()["detail"].lower()
|
||||
|
||||
|
||||
def test_i_cannot_update_my_profile_with_invalid_token(client, mock_auth_service):
|
||||
"""
|
||||
Test updating profile fails with invalid token.
|
||||
|
||||
Verifies that attempting to access /auth/me with an invalid token
|
||||
returns 401 Unauthorized status.
|
||||
"""
|
||||
mock_auth_service.get_current_user.side_effect = InvalidTokenError(
|
||||
"Invalid access token"
|
||||
)
|
||||
|
||||
response = client.patch(
|
||||
"/auth/me",
|
||||
headers={"Authorization": "Bearer invalid.token"},
|
||||
json={"username": "shouldfail"}
|
||||
)
|
||||
|
||||
assert response.status_code == 401
|
||||
|
||||
|
||||
def test_admin_can_update_any_user(client, mock_auth_service, sample_user):
|
||||
"""
|
||||
Test admin can update any user.
|
||||
|
||||
Verifies that an admin can successfully update another user's
|
||||
information via PATCH /auth/users/{user_id}.
|
||||
"""
|
||||
admin_user = sample_user.model_copy(update={"roles": ["admin"]})
|
||||
target_user = sample_user.model_copy(update={"id": "target_user_id", "username": "targetuser"})
|
||||
updated_user = target_user.model_copy(update={"username": "updatedusername"})
|
||||
|
||||
mock_auth_service.get_current_user.return_value = admin_user
|
||||
mock_auth_service.update_user.return_value = updated_user
|
||||
|
||||
response = client.patch(
|
||||
"/auth/users/target_user_id",
|
||||
headers={"Authorization": "Bearer admin.access.token"},
|
||||
json={"username": "updatedusername"}
|
||||
)
|
||||
|
||||
assert response.status_code == 200
|
||||
data = response.json()
|
||||
assert data["username"] == "updatedusername"
|
||||
mock_auth_service.update_user.assert_called_once()
|
||||
|
||||
|
||||
def test_admin_can_update_user_roles(client, mock_auth_service, sample_user):
|
||||
"""
|
||||
Test admin can update user roles.
|
||||
|
||||
Verifies that an admin can change a user's roles, which is forbidden
|
||||
for regular users on the /auth/me endpoint.
|
||||
"""
|
||||
admin_user = sample_user.model_copy(update={"roles": ["admin"]})
|
||||
target_user = sample_user.model_copy(update={"id": "target_user_id", "roles": ["user"]})
|
||||
updated_user = target_user.model_copy(update={"roles": ["admin", "moderator"]})
|
||||
|
||||
mock_auth_service.get_current_user.return_value = admin_user
|
||||
mock_auth_service.update_user.return_value = updated_user
|
||||
|
||||
response = client.patch(
|
||||
"/auth/users/target_user_id",
|
||||
headers={"Authorization": "Bearer admin.access.token"},
|
||||
json={"roles": ["admin", "moderator"]}
|
||||
)
|
||||
|
||||
assert response.status_code == 200
|
||||
data = response.json()
|
||||
assert data["roles"] == ["admin", "moderator"]
|
||||
mock_auth_service.update_user.assert_called_once()
|
||||
|
||||
|
||||
def test_admin_can_update_user_is_active(client, mock_auth_service, sample_user):
|
||||
"""
|
||||
Test admin can update user active status.
|
||||
|
||||
Verifies that an admin can activate or deactivate a user account,
|
||||
which is forbidden for regular users on the /auth/me endpoint.
|
||||
"""
|
||||
admin_user = sample_user.model_copy(update={"roles": ["admin"]})
|
||||
target_user = sample_user.model_copy(update={"id": "target_user_id", "is_active": True})
|
||||
updated_user = target_user.model_copy(update={"is_active": False})
|
||||
|
||||
mock_auth_service.get_current_user.return_value = admin_user
|
||||
mock_auth_service.update_user.return_value = updated_user
|
||||
|
||||
response = client.patch(
|
||||
"/auth/users/target_user_id",
|
||||
headers={"Authorization": "Bearer admin.access.token"},
|
||||
json={"is_active": False}
|
||||
)
|
||||
|
||||
assert response.status_code == 200
|
||||
# Note: is_active is not in UserResponse, so we just verify the call was made
|
||||
mock_auth_service.update_user.assert_called_once()
|
||||
call_args = mock_auth_service.update_user.call_args
|
||||
assert call_args[1]["updates"].is_active is False
|
||||
|
||||
|
||||
def test_admin_can_update_user_is_verified(client, mock_auth_service, sample_user):
|
||||
"""
|
||||
Test admin can update user verification status.
|
||||
|
||||
Verifies that an admin can change a user's email verification status,
|
||||
which is forbidden for regular users on the /auth/me endpoint.
|
||||
"""
|
||||
admin_user = sample_user.model_copy(update={"roles": ["admin"]})
|
||||
target_user = sample_user.model_copy(update={"id": "target_user_id", "is_verified": False})
|
||||
updated_user = target_user.model_copy(update={"is_verified": True})
|
||||
|
||||
mock_auth_service.get_current_user.return_value = admin_user
|
||||
mock_auth_service.update_user.return_value = updated_user
|
||||
|
||||
response = client.patch(
|
||||
"/auth/users/target_user_id",
|
||||
headers={"Authorization": "Bearer admin.access.token"},
|
||||
json={"is_verified": True}
|
||||
)
|
||||
|
||||
assert response.status_code == 200
|
||||
# Note: is_verified is not in UserResponse, so we just verify the call was made
|
||||
mock_auth_service.update_user.assert_called_once()
|
||||
call_args = mock_auth_service.update_user.call_args
|
||||
assert call_args[1]["updates"].is_verified is True
|
||||
|
||||
|
||||
def test_non_admin_cannot_update_other_users(client, mock_auth_service, sample_user):
|
||||
"""
|
||||
Test non-admin cannot update other users.
|
||||
|
||||
Verifies that a regular user (without admin role) cannot access
|
||||
the PATCH /auth/users/{user_id} endpoint and receives 403 Forbidden.
|
||||
"""
|
||||
regular_user = sample_user.model_copy(update={"roles": ["user"]})
|
||||
mock_auth_service.get_current_user.return_value = regular_user
|
||||
|
||||
response = client.patch(
|
||||
"/auth/users/other_user_id",
|
||||
headers={"Authorization": "Bearer user.access.token"},
|
||||
json={"username": "shouldfail"}
|
||||
)
|
||||
|
||||
assert response.status_code == 403
|
||||
assert "admin" in response.json()["detail"].lower()
|
||||
|
||||
|
||||
def test_admin_cannot_update_non_existent_user(client, mock_auth_service, sample_user):
|
||||
"""
|
||||
Test admin cannot update non-existent user.
|
||||
|
||||
Verifies that attempting to update a non-existent user returns
|
||||
404 Not Found status.
|
||||
"""
|
||||
admin_user = sample_user.model_copy(update={"roles": ["admin"]})
|
||||
mock_auth_service.get_current_user.return_value = admin_user
|
||||
mock_auth_service.update_user.side_effect = UserNotFoundError(
|
||||
"User not found"
|
||||
)
|
||||
|
||||
response = client.patch(
|
||||
"/auth/users/non_existent_id",
|
||||
headers={"Authorization": "Bearer admin.access.token"},
|
||||
json={"username": "shouldfail"}
|
||||
)
|
||||
|
||||
assert response.status_code == 404
|
||||
assert "not found" in response.json()["detail"].lower()
|
||||
|
||||
@@ -7,11 +7,11 @@ from unittest.mock import MagicMock
|
||||
|
||||
import pytest
|
||||
|
||||
from my_auth.core.password import PasswordManager
|
||||
from my_auth.core.token import TokenManager
|
||||
from src.my_auth.core.auth import AuthService
|
||||
from src.my_auth.models.user import UserCreate, UserInDB
|
||||
from src.my_auth.persistence.sqlite import SQLiteUserRepository, SQLiteTokenRepository
|
||||
from myauth.core.password import PasswordManager
|
||||
from myauth.core.token import TokenManager
|
||||
from myauth.core.auth import AuthService
|
||||
from myauth.models.user import UserCreate, UserInDB
|
||||
from myauth.persistence.sqlite import SQLiteUserRepository, SQLiteTokenRepository
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
|
||||
@@ -1,15 +1,15 @@
|
||||
# tests/core/test_auth_service.py
|
||||
|
||||
from datetime import datetime, timedelta
|
||||
from unittest.mock import MagicMock, patch
|
||||
from datetime import datetime
|
||||
from unittest.mock import patch
|
||||
|
||||
import pytest
|
||||
|
||||
from src.my_auth.core.auth import AuthService
|
||||
from src.my_auth.exceptions import UserAlreadyExistsError, InvalidCredentialsError, InvalidTokenError, \
|
||||
from myauth.core.auth import AuthService
|
||||
from myauth.exceptions import UserAlreadyExistsError, InvalidCredentialsError, InvalidTokenError, \
|
||||
ExpiredTokenError, RevokedTokenError
|
||||
from src.my_auth.models.token import TokenData, TokenPayload
|
||||
from src.my_auth.models.user import UserCreate, UserUpdate
|
||||
from myauth.models.token import TokenPayload
|
||||
from myauth.models.user import UserCreate
|
||||
|
||||
|
||||
class TestAuthServiceRegisterLogin(object):
|
||||
@@ -78,6 +78,178 @@ class TestAuthServiceRegisterLogin(object):
|
||||
with pytest.raises(InvalidCredentialsError):
|
||||
auth_service.login("non.existent@example.com", "AnyPassword")
|
||||
|
||||
def test_create_admin_if_needed_success_with_custom_credentials(
|
||||
self,
|
||||
auth_service: AuthService
|
||||
):
|
||||
"""Success: Admin is created with custom credentials when no users exist."""
|
||||
|
||||
# Arrange
|
||||
custom_email = "custom.admin@example.com"
|
||||
custom_username = "custom_admin"
|
||||
custom_password = "CustomAdminPass123!"
|
||||
|
||||
# Act
|
||||
result = auth_service.create_admin_if_needed(
|
||||
admin_email=custom_email,
|
||||
admin_username=custom_username,
|
||||
admin_password=custom_password
|
||||
)
|
||||
|
||||
# Assert
|
||||
assert result is True
|
||||
|
||||
# Verify admin user was created
|
||||
admin_user = auth_service.user_repository.get_user_by_email(custom_email)
|
||||
assert admin_user is not None
|
||||
assert admin_user.email == custom_email
|
||||
assert admin_user.username == custom_username
|
||||
assert "admin" in admin_user.roles
|
||||
|
||||
# Verify password was hashed
|
||||
auth_service.password_manager.hash_password.assert_called()
|
||||
|
||||
def test_create_admin_if_needed_success_with_default_credentials(
|
||||
self,
|
||||
auth_service: AuthService,
|
||||
monkeypatch
|
||||
):
|
||||
"""Success: Admin is created with default credentials from environment variables."""
|
||||
|
||||
# Arrange
|
||||
monkeypatch.setenv("AUTH_ADMIN_EMAIL", "env.admin@example.com")
|
||||
monkeypatch.setenv("AUTH_ADMIN_USERNAME", "env_admin")
|
||||
monkeypatch.setenv("AUTH_ADMIN_PASSWORD", "EnvAdminPass123!")
|
||||
|
||||
# Act
|
||||
result = auth_service.create_admin_if_needed()
|
||||
|
||||
# Assert
|
||||
assert result is True
|
||||
|
||||
# Verify admin user was created with env variables
|
||||
admin_user = auth_service.user_repository.get_user_by_email("env.admin@example.com")
|
||||
assert admin_user is not None
|
||||
assert admin_user.email == "env.admin@example.com"
|
||||
assert admin_user.username == "env_admin"
|
||||
assert "admin" in admin_user.roles
|
||||
|
||||
def test_create_admin_if_needed_success_with_hardcoded_defaults(
|
||||
self,
|
||||
auth_service: AuthService,
|
||||
monkeypatch
|
||||
):
|
||||
"""Success: Admin is created with hardcoded defaults when no env vars or params provided."""
|
||||
|
||||
# Arrange - Clear any existing env variables
|
||||
monkeypatch.delenv("AUTH_ADMIN_EMAIL", raising=False)
|
||||
monkeypatch.delenv("AUTH_ADMIN_USERNAME", raising=False)
|
||||
monkeypatch.delenv("AUTH_ADMIN_PASSWORD", raising=False)
|
||||
|
||||
# Act
|
||||
result = auth_service.create_admin_if_needed()
|
||||
|
||||
# Assert
|
||||
assert result is True
|
||||
|
||||
# Verify admin user was created with hardcoded defaults
|
||||
admin_user = auth_service.user_repository.get_user_by_email("admin@myauth.com")
|
||||
assert admin_user is not None
|
||||
assert admin_user.email == "admin@myauth.com"
|
||||
assert admin_user.username == "admin"
|
||||
assert "admin" in admin_user.roles
|
||||
|
||||
def test_create_admin_if_needed_no_creation_when_users_exist(
|
||||
self,
|
||||
auth_service: AuthService,
|
||||
test_user_data_create: UserCreate
|
||||
):
|
||||
"""Failure: Admin is not created when users already exist in the system."""
|
||||
|
||||
# Arrange - Create a regular user first
|
||||
auth_service.register(test_user_data_create)
|
||||
|
||||
# Act
|
||||
result = auth_service.create_admin_if_needed(
|
||||
admin_email="should.not.be.created@example.com",
|
||||
admin_username="should_not_exist",
|
||||
admin_password="ShouldNotExist123!"
|
||||
)
|
||||
|
||||
# Assert
|
||||
assert result is False
|
||||
|
||||
# Verify admin user was NOT created
|
||||
admin_user = auth_service.user_repository.get_user_by_email(
|
||||
"should.not.be.created@example.com"
|
||||
)
|
||||
assert admin_user is None
|
||||
|
||||
# Verify only the original user exists
|
||||
assert auth_service.count_users() == 1
|
||||
|
||||
def test_create_admin_if_needed_parameters_override_env_variables(
|
||||
self,
|
||||
auth_service: AuthService,
|
||||
monkeypatch
|
||||
):
|
||||
"""Success: Parameters take precedence over environment variables."""
|
||||
|
||||
# Arrange
|
||||
monkeypatch.setenv("AUTH_ADMIN_EMAIL", "env.admin@example.com")
|
||||
monkeypatch.setenv("AUTH_ADMIN_USERNAME", "env_admin")
|
||||
monkeypatch.setenv("AUTH_ADMIN_PASSWORD", "EnvAdminPass123!")
|
||||
|
||||
param_email = "param.admin@example.com"
|
||||
param_username = "param_admin"
|
||||
param_password = "ParamAdminPass123!"
|
||||
|
||||
# Act
|
||||
result = auth_service.create_admin_if_needed(
|
||||
admin_email=param_email,
|
||||
admin_username=param_username,
|
||||
admin_password=param_password
|
||||
)
|
||||
|
||||
# Assert
|
||||
assert result is True
|
||||
|
||||
# Verify parameters were used, not env variables
|
||||
admin_user = auth_service.user_repository.get_user_by_email(param_email)
|
||||
assert admin_user is not None
|
||||
assert admin_user.email == param_email
|
||||
assert admin_user.username == param_username
|
||||
|
||||
# Verify env admin was NOT created
|
||||
env_admin = auth_service.user_repository.get_user_by_email("env.admin@example.com")
|
||||
assert env_admin is None
|
||||
|
||||
def test_create_admin_if_needed_mixed_parameters_and_env(
|
||||
self,
|
||||
auth_service: AuthService,
|
||||
monkeypatch
|
||||
):
|
||||
"""Success: Partial parameters combine with environment variables."""
|
||||
|
||||
# Arrange
|
||||
monkeypatch.setenv("AUTH_ADMIN_EMAIL", "env.admin@example.com")
|
||||
monkeypatch.setenv("AUTH_ADMIN_USERNAME", "env_admin")
|
||||
monkeypatch.setenv("AUTH_ADMIN_PASSWORD", "EnvAdminPass123!")
|
||||
|
||||
# Act - Only provide email as parameter
|
||||
result = auth_service.create_admin_if_needed(
|
||||
admin_email="partial.admin@example.com"
|
||||
)
|
||||
|
||||
# Assert
|
||||
assert result is True
|
||||
|
||||
# Verify email from parameter, username and password from env
|
||||
admin_user = auth_service.user_repository.get_user_by_email("partial.admin@example.com")
|
||||
assert admin_user is not None
|
||||
assert admin_user.email == "partial.admin@example.com"
|
||||
assert admin_user.username == "env_admin"
|
||||
|
||||
|
||||
class TestAuthServiceTokenManagement(object):
|
||||
"""Tests for token-related flows (Refresh, Logout, GetCurrentUser)."""
|
||||
@@ -172,6 +344,296 @@ class TestAuthServiceTokenManagement(object):
|
||||
auth_service.get_current_user("expired_access_jwt")
|
||||
|
||||
|
||||
class TestAuthServiceUserUpdate(object):
|
||||
"""Tests for user update operations."""
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def setup_user(self, auth_service: AuthService, test_user_data_create: UserCreate):
|
||||
"""Sets up a registered user for update tests."""
|
||||
|
||||
pm = auth_service.password_manager
|
||||
original_hash = pm.hash_password.return_value
|
||||
|
||||
# Temporarily set hash for setup
|
||||
pm.hash_password.return_value = "HASHED_PASS"
|
||||
user = auth_service.register(test_user_data_create)
|
||||
self.user = user
|
||||
self.original_email = user.email
|
||||
self.original_username = user.username
|
||||
|
||||
# Restore hash mock
|
||||
pm.hash_password.return_value = original_hash
|
||||
|
||||
def test_i_can_update_user_email(self, auth_service: AuthService):
|
||||
"""Success: Email can be updated and is_verified is automatically set to False."""
|
||||
|
||||
from myauth.models.user import UserUpdate
|
||||
|
||||
new_email = "updated.email@example.com"
|
||||
updates = UserUpdate(email=new_email)
|
||||
|
||||
updated_user = auth_service.update_user(self.user.id, updates)
|
||||
|
||||
assert updated_user.email == new_email
|
||||
assert updated_user.is_verified is False
|
||||
|
||||
def test_i_can_update_user_username(self, auth_service: AuthService):
|
||||
"""Success: Username can be updated."""
|
||||
|
||||
from myauth.models.user import UserUpdate
|
||||
|
||||
new_username = "UpdatedUsername"
|
||||
updates = UserUpdate(username=new_username)
|
||||
|
||||
updated_user = auth_service.update_user(self.user.id, updates)
|
||||
|
||||
assert updated_user.username == new_username
|
||||
|
||||
def test_i_can_update_user_password(self, auth_service: AuthService):
|
||||
"""Success: Password can be updated and is properly hashed."""
|
||||
|
||||
from myauth.models.user import UserUpdate
|
||||
|
||||
pm = auth_service.password_manager
|
||||
new_password = "NewSecurePass123!"
|
||||
|
||||
with patch.object(pm, 'hash_password', return_value="NEW_HASHED_PASSWORD") as mock_hash:
|
||||
updates = UserUpdate(password=new_password)
|
||||
updated_user = auth_service.update_user(self.user.id, updates)
|
||||
|
||||
mock_hash.assert_called_once_with(new_password)
|
||||
assert updated_user.hashed_password == "NEW_HASHED_PASSWORD"
|
||||
|
||||
def test_i_can_update_user_password_and_all_refresh_tokens_are_revoked(
|
||||
self,
|
||||
auth_service: AuthService
|
||||
):
|
||||
"""Success: Updating password revokes all refresh tokens when no current token provided."""
|
||||
|
||||
from myauth.models.user import UserUpdate
|
||||
|
||||
# Setup: Create some refresh tokens for the user
|
||||
from myauth.models.token import TokenData
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
token1 = TokenData(
|
||||
token="refresh_token_1",
|
||||
token_type="refresh",
|
||||
user_id=self.user.id,
|
||||
expires_at=datetime.now() + timedelta(days=1),
|
||||
created_at=datetime.now(),
|
||||
is_revoked=False
|
||||
)
|
||||
token2 = TokenData(
|
||||
token="refresh_token_2",
|
||||
token_type="refresh",
|
||||
user_id=self.user.id,
|
||||
expires_at=datetime.now() + timedelta(days=1),
|
||||
created_at=datetime.now(),
|
||||
is_revoked=False
|
||||
)
|
||||
auth_service.token_repository.save_token(token1)
|
||||
auth_service.token_repository.save_token(token2)
|
||||
|
||||
# Execute: Update password without providing current token
|
||||
updates = UserUpdate(password="NewPassword123!")
|
||||
auth_service.update_user(self.user.id, updates)
|
||||
|
||||
# Verify: Both tokens are revoked
|
||||
token1_after = auth_service.token_repository.get_token("refresh_token_1", "refresh")
|
||||
token2_after = auth_service.token_repository.get_token("refresh_token_2", "refresh")
|
||||
|
||||
assert token1_after.is_revoked is True
|
||||
assert token2_after.is_revoked is True
|
||||
|
||||
def test_i_can_update_user_password_and_preserve_current_session(
|
||||
self,
|
||||
auth_service: AuthService
|
||||
):
|
||||
"""Success: Updating password preserves the current refresh token when provided."""
|
||||
|
||||
from myauth.models.user import UserUpdate
|
||||
from myauth.models.token import TokenData
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
# Setup: Create some refresh tokens
|
||||
current_token = TokenData(
|
||||
token="current_refresh_token",
|
||||
token_type="refresh",
|
||||
user_id=self.user.id,
|
||||
expires_at=datetime.now() + timedelta(days=1),
|
||||
created_at=datetime.now(),
|
||||
is_revoked=False
|
||||
)
|
||||
other_token = TokenData(
|
||||
token="other_refresh_token",
|
||||
token_type="refresh",
|
||||
user_id=self.user.id,
|
||||
expires_at=datetime.now() + timedelta(days=1),
|
||||
created_at=datetime.now(),
|
||||
is_revoked=False
|
||||
)
|
||||
auth_service.token_repository.save_token(current_token)
|
||||
auth_service.token_repository.save_token(other_token)
|
||||
|
||||
# Execute: Update password while providing current token
|
||||
updates = UserUpdate(password="NewPassword123!")
|
||||
auth_service.update_user(self.user.id, updates, refresh_token="current_refresh_token")
|
||||
|
||||
# Verify: Current token is preserved, other is revoked
|
||||
current_after = auth_service.token_repository.get_token("current_refresh_token", "refresh")
|
||||
other_after = auth_service.token_repository.get_token("other_refresh_token", "refresh")
|
||||
|
||||
assert current_after.is_revoked is False
|
||||
assert other_after.is_revoked is True
|
||||
|
||||
def test_i_can_update_multiple_fields_at_once(self, auth_service: AuthService):
|
||||
"""Success: Multiple fields can be updated simultaneously."""
|
||||
|
||||
from myauth.models.user import UserUpdate
|
||||
|
||||
updates = UserUpdate(
|
||||
username="MultiUpdateUser",
|
||||
roles=["admin", "member"],
|
||||
user_settings={"theme": "light", "language": "en"}
|
||||
)
|
||||
|
||||
updated_user = auth_service.update_user(self.user.id, updates)
|
||||
|
||||
assert updated_user.username == "MultiUpdateUser"
|
||||
assert updated_user.roles == ["admin", "member"]
|
||||
assert updated_user.user_settings == {"theme": "light", "language": "en"}
|
||||
|
||||
def test_i_can_update_user_roles(self, auth_service: AuthService):
|
||||
"""Success: User roles can be updated."""
|
||||
|
||||
from myauth.models.user import UserUpdate
|
||||
|
||||
new_roles = ["admin", "moderator"]
|
||||
updates = UserUpdate(roles=new_roles)
|
||||
|
||||
updated_user = auth_service.update_user(self.user.id, updates)
|
||||
|
||||
assert updated_user.roles == new_roles
|
||||
|
||||
def test_i_can_update_user_settings(self, auth_service: AuthService):
|
||||
"""Success: User settings can be updated."""
|
||||
|
||||
from myauth.models.user import UserUpdate
|
||||
|
||||
new_settings = {"theme": "light", "notifications": True, "language": "fr"}
|
||||
updates = UserUpdate(user_settings=new_settings)
|
||||
|
||||
updated_user = auth_service.update_user(self.user.id, updates)
|
||||
|
||||
assert updated_user.user_settings == new_settings
|
||||
|
||||
def test_i_can_update_is_active_status(self, auth_service: AuthService):
|
||||
"""Success: User active status can be updated."""
|
||||
|
||||
from myauth.models.user import UserUpdate
|
||||
|
||||
# Deactivate user
|
||||
updates = UserUpdate(is_active=False)
|
||||
updated_user = auth_service.update_user(self.user.id, updates)
|
||||
|
||||
assert updated_user.is_active is False
|
||||
|
||||
# Reactivate user
|
||||
updates = UserUpdate(is_active=True)
|
||||
updated_user = auth_service.update_user(self.user.id, updates)
|
||||
|
||||
assert updated_user.is_active is True
|
||||
|
||||
def test_i_cannot_update_user_with_invalid_user_id(self, auth_service: AuthService):
|
||||
"""Failure: Updating a non-existent user raises UserNotFoundError."""
|
||||
|
||||
from myauth.models.user import UserUpdate
|
||||
from myauth.exceptions import UserNotFoundError
|
||||
|
||||
updates = UserUpdate(username="ShouldFail")
|
||||
|
||||
with pytest.raises(UserNotFoundError):
|
||||
auth_service.update_user("non_existent_id", updates)
|
||||
|
||||
def test_i_cannot_update_email_to_existing_email(self, auth_service: AuthService):
|
||||
"""Failure: Updating email to an already registered email raises UserAlreadyExistsError."""
|
||||
|
||||
from myauth.models.user import UserCreate, UserUpdate
|
||||
|
||||
# Setup: Create another user with a different email
|
||||
other_user_data = UserCreate(
|
||||
email="other.user@example.com",
|
||||
username="OtherUser",
|
||||
password="OtherPass123!",
|
||||
roles=["member"]
|
||||
)
|
||||
auth_service.register(other_user_data)
|
||||
|
||||
# Execute: Try to update original user's email to the other user's email
|
||||
updates = UserUpdate(email="other.user@example.com")
|
||||
|
||||
with pytest.raises(UserAlreadyExistsError):
|
||||
auth_service.update_user(self.user.id, updates)
|
||||
|
||||
def test_i_cannot_update_email_to_same_email_without_triggering_verification_reset(
|
||||
self,
|
||||
auth_service: AuthService
|
||||
):
|
||||
"""Success: Updating email to the same email does not reset is_verified."""
|
||||
|
||||
from myauth.models.user import UserUpdate
|
||||
|
||||
# Setup: Ensure user is verified
|
||||
verify_updates = UserUpdate(is_verified=True)
|
||||
auth_service.update_user(self.user.id, verify_updates)
|
||||
|
||||
# Execute: Update with the same email
|
||||
updates = UserUpdate(email=self.original_email)
|
||||
updated_user = auth_service.update_user(self.user.id, updates)
|
||||
|
||||
# Verify: is_verified should remain True
|
||||
assert updated_user.is_verified is True
|
||||
assert updated_user.email == self.original_email
|
||||
|
||||
def test_i_can_update_with_empty_updates(self, auth_service: AuthService):
|
||||
"""Success: Updating with empty UserUpdate does not cause errors."""
|
||||
|
||||
from myauth.models.user import UserUpdate
|
||||
|
||||
updates = UserUpdate()
|
||||
updated_user = auth_service.update_user(self.user.id, updates)
|
||||
|
||||
# Verify: User is returned and fields are unchanged
|
||||
assert updated_user.id == self.user.id
|
||||
assert updated_user.email == self.original_email
|
||||
assert updated_user.username == self.original_username
|
||||
|
||||
def test_email_verification_reset_only_when_email_actually_changes(
|
||||
self,
|
||||
auth_service: AuthService
|
||||
):
|
||||
"""Success: is_verified is reset to False only when email actually changes."""
|
||||
|
||||
from myauth.models.user import UserUpdate
|
||||
|
||||
# Setup: Set user as verified
|
||||
verify_updates = UserUpdate(is_verified=True)
|
||||
auth_service.update_user(self.user.id, verify_updates)
|
||||
verified_user = auth_service.user_repository.get_user_by_id(self.user.id)
|
||||
assert verified_user.is_verified is True
|
||||
|
||||
# Test 1: Update with same email - verification should remain
|
||||
same_email_updates = UserUpdate(email=self.original_email, username="SameEmailTest")
|
||||
updated_user = auth_service.update_user(self.user.id, same_email_updates)
|
||||
assert updated_user.is_verified is True
|
||||
|
||||
# Test 2: Update with different email - verification should reset
|
||||
different_email_updates = UserUpdate(email="completely.new@example.com")
|
||||
updated_user = auth_service.update_user(self.user.id, different_email_updates)
|
||||
assert updated_user.is_verified is False
|
||||
assert updated_user.email == "completely.new@example.com"
|
||||
|
||||
# class TestAuthServiceResetVerification(object):
|
||||
# """Tests for password reset and email verification flows."""
|
||||
#
|
||||
@@ -190,7 +652,7 @@ class TestAuthServiceTokenManagement(object):
|
||||
# # Restore hash mock
|
||||
# pm.hash_password.return_value = original_hash
|
||||
#
|
||||
# @patch('src.my_auth.core.email.send_email')
|
||||
# @patch('myauth.core.email.send_email')
|
||||
# def test_request_password_reset_success(self, mock_send_email: MagicMock, auth_service: AuthService):
|
||||
# """Success: Requesting a password reset generates a token and sends an email."""
|
||||
#
|
||||
@@ -226,7 +688,7 @@ class TestAuthServiceTokenManagement(object):
|
||||
# updated_user = auth_service.user_repository.get_user_by_id(self.user.id)
|
||||
# assert updated_user.hashed_password == "NEW_HASHED_PASSWORD_FOR_RESET"
|
||||
#
|
||||
# @patch('src.my_auth.core.email.send_email')
|
||||
# @patch('myauth.core.email.send_email')
|
||||
# def test_request_email_verification_success(self, mock_send_email: MagicMock, auth_service: AuthService):
|
||||
# """Success: Requesting verification generates a token and sends an email."""
|
||||
#
|
||||
|
||||
25
tests/core/test_password.py
Normal file
25
tests/core/test_password.py
Normal file
@@ -0,0 +1,25 @@
|
||||
import pytest
|
||||
|
||||
from myauth.core import PasswordManager
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def password_manager():
|
||||
return PasswordManager()
|
||||
|
||||
|
||||
def test_i_can_hash_password(password_manager):
|
||||
hashed_password = password_manager.hash_password("password")
|
||||
assert hashed_password is not None
|
||||
assert hashed_password != "password"
|
||||
|
||||
|
||||
def test_i_can_verify_password(password_manager):
|
||||
password = "password"
|
||||
hashed_password = password_manager.hash_password(password)
|
||||
assert password_manager.verify_password(password, hashed_password)
|
||||
|
||||
def test_i_cannot_verify_invalid_password(password_manager):
|
||||
password = "password"
|
||||
hashed_password = password_manager.hash_password(password)
|
||||
assert not password_manager.verify_password("invalid_password", hashed_password)
|
||||
@@ -6,9 +6,9 @@ from unittest.mock import MagicMock, patch
|
||||
import pytest
|
||||
from jose import jwt
|
||||
|
||||
from src.my_auth.core.token import TokenManager
|
||||
from src.my_auth.exceptions import InvalidTokenError, ExpiredTokenError
|
||||
from src.my_auth.models.user import UserInDB # Assuming you have a fixture for this
|
||||
from myauth.core.token import TokenManager
|
||||
from myauth.exceptions import InvalidTokenError, ExpiredTokenError
|
||||
from myauth.models.user import UserInDB # Assuming you have a fixture for this
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
@@ -99,7 +99,7 @@ class TestTokenExpirationCalculations:
|
||||
"""Tests for token expiration date methods."""
|
||||
|
||||
# We patch datetime.now() to ensure stable calculations
|
||||
@patch('src.my_auth.core.token.datetime')
|
||||
@patch('myauth.core.token.datetime')
|
||||
def test_get_refresh_token_expiration(self, mock_datetime, token_manager: TokenManager):
|
||||
"""Should calculate refresh token expiration correctly."""
|
||||
|
||||
@@ -112,7 +112,7 @@ class TestTokenExpirationCalculations:
|
||||
|
||||
assert actual_exp == expected_exp
|
||||
|
||||
@patch('src.my_auth.core.token.datetime')
|
||||
@patch('myauth.core.token.datetime')
|
||||
def test_get_password_reset_token_expiration(self, mock_datetime, token_manager: TokenManager):
|
||||
"""Should calculate password reset token expiration correctly."""
|
||||
|
||||
|
||||
@@ -1,12 +1,12 @@
|
||||
from uuid import uuid4
|
||||
from datetime import datetime, timedelta
|
||||
from pathlib import Path
|
||||
from uuid import uuid4
|
||||
|
||||
import pytest
|
||||
|
||||
from my_auth.models.token import TokenData
|
||||
from my_auth.models.user import UserCreate
|
||||
from my_auth.persistence.sqlite import SQLiteUserRepository, SQLiteTokenRepository
|
||||
from myauth.models.token import TokenData
|
||||
from myauth.models.user import UserCreate
|
||||
from myauth.persistence.sqlite import SQLiteUserRepository, SQLiteTokenRepository
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
|
||||
@@ -2,8 +2,8 @@
|
||||
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
from my_auth.models.token import TokenData
|
||||
from my_auth.persistence.sqlite import SQLiteTokenRepository
|
||||
from myauth.models.token import TokenData
|
||||
from myauth.persistence.sqlite import SQLiteTokenRepository
|
||||
|
||||
|
||||
def test_i_can_save_and_retrieve_token(token_repository: SQLiteTokenRepository,
|
||||
|
||||
@@ -1,12 +1,12 @@
|
||||
# tests/persistence/test_sqlite_user.py
|
||||
|
||||
import pytest
|
||||
import json
|
||||
from datetime import datetime
|
||||
|
||||
from my_auth.persistence.sqlite import SQLiteUserRepository
|
||||
from my_auth.models.user import UserCreate, UserUpdate
|
||||
from my_auth.exceptions import UserAlreadyExistsError, UserNotFoundError
|
||||
import pytest
|
||||
|
||||
from myauth.exceptions import UserAlreadyExistsError, UserNotFoundError
|
||||
from myauth.models.user import UserCreate, UserUpdate
|
||||
from myauth.persistence.sqlite import SQLiteUserRepository
|
||||
|
||||
|
||||
def test_i_can_create_and_retrieve_user_by_email(user_repository: SQLiteUserRepository,
|
||||
@@ -153,3 +153,138 @@ def test_i_cannot_retrieve_non_existent_user_by_email(user_repository: SQLiteUse
|
||||
|
||||
retrieved_user = user_repository.get_user_by_email("ghost@example.com")
|
||||
assert retrieved_user is None
|
||||
|
||||
|
||||
def test_i_can_list_users_with_pagination(user_repository: SQLiteUserRepository,
|
||||
test_user_hashed_password: str):
|
||||
"""Verifies that list_users returns paginated results correctly."""
|
||||
|
||||
# Create multiple users
|
||||
for i in range(5):
|
||||
user_data = UserCreate(
|
||||
email=f"user{i}@example.com",
|
||||
username=f"User{i}",
|
||||
password="#Password123",
|
||||
roles=["user"],
|
||||
user_settings={}
|
||||
)
|
||||
user_repository.create_user(user_data, test_user_hashed_password)
|
||||
|
||||
# Test: Get first 3 users
|
||||
users_page1 = user_repository.list_users(skip=0, limit=3)
|
||||
assert len(users_page1) == 3
|
||||
|
||||
# Test: Get next 2 users
|
||||
users_page2 = user_repository.list_users(skip=3, limit=3)
|
||||
assert len(users_page2) == 2
|
||||
|
||||
# Test: Verify no duplicates between pages
|
||||
page1_ids = {user.id for user in users_page1}
|
||||
page2_ids = {user.id for user in users_page2}
|
||||
assert len(page1_ids.intersection(page2_ids)) == 0
|
||||
|
||||
|
||||
def test_i_can_list_users_with_default_pagination(user_repository: SQLiteUserRepository,
|
||||
test_user_data_create: UserCreate,
|
||||
test_user_hashed_password: str):
|
||||
"""Verifies that list_users works with default parameters."""
|
||||
|
||||
# Create 2 users
|
||||
user_repository.create_user(test_user_data_create, test_user_hashed_password)
|
||||
|
||||
user_data2 = UserCreate(
|
||||
email="user2@example.com",
|
||||
username="User2",
|
||||
password="#Password123",
|
||||
roles=["user"],
|
||||
user_settings={}
|
||||
)
|
||||
user_repository.create_user(user_data2, test_user_hashed_password)
|
||||
|
||||
# Test: Default parameters (skip=0, limit=100)
|
||||
users = user_repository.list_users()
|
||||
assert len(users) == 2
|
||||
assert all(isinstance(user.created_at, datetime) for user in users)
|
||||
|
||||
|
||||
def test_i_get_empty_list_when_no_users_exist(user_repository: SQLiteUserRepository):
|
||||
"""Verifies that list_users returns an empty list when no users exist."""
|
||||
|
||||
users = user_repository.list_users()
|
||||
assert users == []
|
||||
assert isinstance(users, list)
|
||||
|
||||
|
||||
def test_i_can_skip_beyond_available_users(user_repository: SQLiteUserRepository,
|
||||
test_user_data_create: UserCreate,
|
||||
test_user_hashed_password: str):
|
||||
"""Verifies that skipping beyond available users returns an empty list."""
|
||||
|
||||
user_repository.create_user(test_user_data_create, test_user_hashed_password)
|
||||
|
||||
# Skip beyond the only user
|
||||
users = user_repository.list_users(skip=10, limit=10)
|
||||
assert users == []
|
||||
|
||||
|
||||
def test_i_can_count_users(user_repository: SQLiteUserRepository,
|
||||
test_user_hashed_password: str):
|
||||
"""Verifies that count_users returns the correct number of users."""
|
||||
|
||||
# Initial count should be 0
|
||||
assert user_repository.count_users() == 0
|
||||
|
||||
# Create first user
|
||||
user_data1 = UserCreate(
|
||||
email="user1@example.com",
|
||||
username="User1",
|
||||
password="#Password123",
|
||||
roles=["user"],
|
||||
user_settings={}
|
||||
)
|
||||
user_repository.create_user(user_data1, test_user_hashed_password)
|
||||
assert user_repository.count_users() == 1
|
||||
|
||||
# Create second user
|
||||
user_data2 = UserCreate(
|
||||
email="user2@example.com",
|
||||
username="User2",
|
||||
password="#Password123",
|
||||
roles=["user"],
|
||||
user_settings={}
|
||||
)
|
||||
user_repository.create_user(user_data2, test_user_hashed_password)
|
||||
assert user_repository.count_users() == 2
|
||||
|
||||
|
||||
def test_i_get_zero_count_when_no_users_exist(user_repository: SQLiteUserRepository):
|
||||
"""Verifies that count_users returns 0 when the database is empty."""
|
||||
|
||||
count = user_repository.count_users()
|
||||
assert count == 0
|
||||
assert isinstance(count, int)
|
||||
|
||||
|
||||
def test_list_users_returns_correct_user_structure(user_repository: SQLiteUserRepository,
|
||||
test_user_data_create: UserCreate,
|
||||
test_user_hashed_password: str):
|
||||
"""Verifies that list_users returns UserInDB objects with all fields."""
|
||||
|
||||
user_repository.create_user(test_user_data_create, test_user_hashed_password)
|
||||
|
||||
users = user_repository.list_users()
|
||||
|
||||
assert len(users) == 1
|
||||
user = users[0]
|
||||
|
||||
# Verify all fields are present and correct type
|
||||
assert user.id is not None
|
||||
assert user.email == test_user_data_create.email
|
||||
assert user.username == test_user_data_create.username
|
||||
assert user.hashed_password == test_user_hashed_password
|
||||
assert isinstance(user.roles, list)
|
||||
assert isinstance(user.user_settings, dict)
|
||||
assert isinstance(user.is_verified, bool)
|
||||
assert isinstance(user.is_active, bool)
|
||||
assert isinstance(user.created_at, datetime)
|
||||
assert isinstance(user.updated_at, datetime)
|
||||
|
||||
Reference in New Issue
Block a user