Updated README.md

This commit is contained in:
2025-10-18 22:42:22 +02:00
parent 79a31ecf40
commit ece8af0678
24 changed files with 2782 additions and 755 deletions

8
.idea/.gitignore generated vendored Normal file
View File

@@ -0,0 +1,8 @@
# Default ignored files
/shelf/
/workspace.xml
# Editor-based HTTP Client requests
/httpRequests/
# Datasource local storage ignored files
/dataSources/
/dataSources.local.xml

11
.idea/MyAuth.iml generated Normal file
View File

@@ -0,0 +1,11 @@
<?xml version="1.0" encoding="UTF-8"?>
<module type="PYTHON_MODULE" version="4">
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$">
<sourceFolder url="file://$MODULE_DIR$/src" isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/tests" isTestSource="true" />
</content>
<orderEntry type="jdk" jdkName="Python 3.12.3 WSL (Ubuntu-24.04): (/home/kodjo/.virtualenvs/MyAuth/bin/python)" jdkType="Python SDK" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
</module>

View File

@@ -0,0 +1,14 @@
<component name="InspectionProjectProfileManager">
<profile version="1.0">
<option name="myName" value="Project Default" />
<inspection_tool class="Eslint" enabled="true" level="WARNING" enabled_by_default="true" />
<inspection_tool class="PyInitNewSignatureInspection" enabled="false" level="WARNING" enabled_by_default="false" />
<inspection_tool class="PyPackageRequirementsInspection" enabled="true" level="WARNING" enabled_by_default="true">
<option name="ignoredPackages">
<list>
<option value="bson" />
</list>
</option>
</inspection_tool>
</profile>
</component>

View File

@@ -0,0 +1,6 @@
<component name="InspectionProjectProfileManager">
<settings>
<option name="USE_PROJECT_PROFILE" value="false" />
<version value="1.0" />
</settings>
</component>

7
.idea/misc.xml generated Normal file
View File

@@ -0,0 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="Black">
<option name="sdkName" value="Python 3.12.3 WSL (Ubuntu-24.04): (/home/kodjo/.virtualenvs/MyAuth/bin/python)" />
</component>
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.12.3 WSL (Ubuntu-24.04): (/home/kodjo/.virtualenvs/MyAuth/bin/python)" project-jdk-type="Python SDK" />
</project>

8
.idea/modules.xml generated Normal file
View File

@@ -0,0 +1,8 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectModuleManager">
<modules>
<module fileurl="file://$PROJECT_DIR$/.idea/MyAuth.iml" filepath="$PROJECT_DIR$/.idea/MyAuth.iml" />
</modules>
</component>
</project>

6
.idea/vcs.xml generated Normal file
View File

@@ -0,0 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="$PROJECT_DIR$" vcs="Git" />
</component>
</project>

21
LICENCE Normal file
View File

@@ -0,0 +1,21 @@
MIT License
Copyright (c) 2025 Kodjo Sossouvi
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

285
README-dev.md Normal file
View File

@@ -0,0 +1,285 @@
# MyAuth Module
A reusable, modular authentication system for FastAPI applications with pluggable database backends.
## Overview
This module provides a complete authentication solution designed to be deployed on internal PyPI servers and reused across multiple projects. It handles user registration, login/logout, token management, email verification, and password reset functionality.
## Features
### Core Authentication
- ✅ User registration with email and username
- ✅ Login/logout with email-based authentication
- ✅ JWT-based access tokens (30 minutes validity)
- ✅ Opaque refresh tokens stored in database (7 days validity)
- ✅ Password hashing with configurable bcrypt rounds
### User Management
- ✅ Email verification with JWT tokens
- ✅ Password reset with secure random tokens (15 minutes validity)
- ✅ User roles management (flexible, no predefined roles)
- ✅ User settings storage (dict field)
- ✅ Account activation/deactivation
### Password Security
- ✅ Strict password validation (via Pydantic):
- Minimum 8 characters
- At least 1 uppercase letter
- At least 1 lowercase letter
- At least 1 digit
- At least 1 special character
### Architecture
- ✅ Abstract base classes for database persistence
- ✅ Multiple database implementations: MongoDB, SQLite, PostgreSQL
- ✅ Abstract email service interface with optional SMTP implementation
- ✅ Custom exceptions with FastAPI integration
- ✅ Synchronous implementation
- ✅ Ready-to-use FastAPI router with `/auth` prefix
## Project Structure
```
├──src
│ my_auth/
│ ├── __init__.py
│ ├── models/ # Pydantic models
│ │ ├── user.py # User model with roles and settings
│ │ ├── token.py # Token models (access, refresh, reset)
│ │ └── email_verification.py # Email verification models
│ ├── core/ # Business logic
│ │ ├── auth.py # Authentication service
│ │ ├── password.py # Password hashing/verification
│ │ └── token.py # Token generation/validation
│ ├── persistence/ # Database abstraction
│ │ ├── base.py # Abstract base classes
│ │ ├── mongodb.py # MongoDB implementation
│ │ ├── sqlite.py # SQLite implementation
│ │ └── postgresql.py # PostgreSQL implementation
│ ├── api/ # FastAPI routes
│ │ └── routes.py # All authentication endpoints
│ ├── email/ # Email service
│ │ ├── base.py # Abstract interface
│ │ └── smtp.py # SMTP implementation (optional)
│ ├── exceptions.py # Custom exceptions
│ └── config.py # Configuration classes
├── tests
```
## User Model
```python
class User:
id: str # Unique identifier
email: str # Unique, required
username: str # Required, non-unique
hashed_password: str # Bcrypt hashed
roles: list[str] # Free-form roles, no defaults
user_settings: dict # Custom user settings
is_verified: bool # Email verification status
is_active: bool # Account active status
created_at: datetime
updated_at: datetime
```
## Token Management
### Token Types
The module uses a unified tokens collection with a discriminator field:
1. **Access Token (JWT)**: 30 minutes validity, stateless
2. **Refresh Token (Opaque)**: 7 days validity, stored in DB
3. **Password Reset Token (Random)**: 15 minutes validity, stored in DB
4. **Email Verification Token (JWT)**: Stateless, no DB storage
### Token Storage
All tokens requiring storage (refresh and password reset) are kept in a single `tokens` collection/table with a `token_type` discriminator field.
## API Endpoints
The module exposes a pre-configured FastAPI router with the following endpoints:
```
POST /auth/register # User registration
POST /auth/login # Login (email + password)
POST /auth/logout # Logout (revokes refresh token)
POST /auth/refresh # Refresh access token
POST /auth/password-reset-request # Request password reset
POST /auth/password-reset # Reset password with token
POST /auth/verify-email-request # Request email verification
POST /auth/verify-email # Verify email with token
GET /auth/me # Get current user info
```
## Installation
### Dependencies
**Core dependencies:**
```bash
pip install fastapi pydantic pydantic-settings python-jose[cryptography] passlib[bcrypt] python-multipart
```
**Database-specific dependencies:**
- MongoDB: `pip install pymongo`
- SQLite: Built-in (no additional dependency)
- PostgreSQL: `pip install psycopg2-binary`
**Optional email dependency:**
- SMTP: `pip install secure-smtplib`
## Usage
### Basic Setup
```python
from fastapi import FastAPI
from auth_module import AuthService
from auth_module.persistence.mongodb import MongoUserRepository, MongoTokenRepository
from auth_module.api import auth_router
# Initialize repositories
user_repo = MongoUserRepository(connection_string="mongodb://localhost:27017/mydb")
token_repo = MongoTokenRepository(connection_string="mongodb://localhost:27017/mydb")
# Initialize auth service
auth_service = AuthService(
user_repository=user_repo,
token_repository=token_repo,
jwt_secret="your-secret-key-here",
access_token_expire_minutes=30,
refresh_token_expire_days=7,
password_reset_token_expire_minutes=15,
password_hash_rounds=12
)
# Create FastAPI app and include auth router
app = FastAPI()
app.include_router(auth_router) # Mounts at /auth prefix
```
### Using Different Databases
#### SQLite
```python
from auth_module.persistence.sqlite import SQLiteUserRepository, SQLiteTokenRepository
user_repo = SQLiteUserRepository(db_path="./auth.db")
token_repo = SQLiteTokenRepository(db_path="./auth.db")
```
#### PostgreSQL
```python
from auth_module.persistence.postgresql import PostgreSQLUserRepository, PostgreSQLTokenRepository
user_repo = PostgreSQLUserRepository(
host="localhost",
port=5432,
database="mydb",
user="postgres",
password="secret"
)
token_repo = PostgreSQLTokenRepository(...)
```
### Email Service Configuration
```python
from auth_module.email.smtp import SMTPEmailService
email_service = SMTPEmailService(
host="smtp.gmail.com",
port=587,
username="your-email@gmail.com",
password="your-app-password",
use_tls=True
)
auth_service = AuthService(
user_repository=user_repo,
token_repository=token_repo,
email_service=email_service, # Optional
...
)
```
### Custom Email Service
Implement your own email service by extending the abstract base class:
```python
from auth_module.email.base import EmailService
class CustomEmailService(EmailService):
def send_verification_email(self, email: str, token: str) -> None:
# Your implementation (SendGrid, AWS SES, etc.)
pass
def send_password_reset_email(self, email: str, token: str) -> None:
# Your implementation
pass
```
## Error Handling
The module uses custom exceptions that are automatically converted to appropriate HTTP responses:
- `InvalidCredentialsError` → 401 Unauthorized
- `UserAlreadyExistsError` → 409 Conflict
- `UserNotFoundError` → 404 Not Found
- `InvalidTokenError` → 401 Unauthorized
- `RevokedTokenError` → 401 Unauthorized
- `ExpiredTokenError` → 401 Unauthorized
- `EmailNotVerifiedError` → 403 Forbidden
- `AccountDisabledError` → 403 Forbidden
## Configuration Options
```python
AuthService(
user_repository: UserRepository, # Required
token_repository: TokenRepository, # Required
jwt_secret: str, # Required
jwt_algorithm: str = "HS256", # Optional
access_token_expire_minutes: int = 30, # Optional
refresh_token_expire_days: int = 7, # Optional
password_reset_token_expire_minutes: int = 15, # Optional
password_hash_rounds: int = 12, # Optional (bcrypt cost)
email_service: EmailService = None # Optional
)
```
## Testing
The module is fully testable with pytest. Test fixtures are provided for each database implementation.
```bash
pytest tests/
```
## Security Considerations
- Passwords are hashed using bcrypt with configurable rounds
- JWT tokens are signed with HS256 (configurable)
- Refresh tokens are opaque and stored securely
- Password reset tokens are single-use and expire after 15 minutes
- Email verification tokens are stateless JWT
- Rate limiting should be implemented at the application level
- HTTPS should be enforced by the application
## Future Enhancements (Not Included)
- Multi-factor authentication (2FA/MFA)
- Rate limiting on login attempts
- OAuth2 provider integration
- Session management (multiple device tracking)
- Account lockout after failed attempts
## License
[Your License Here]
## Contributing
[Your Contributing Guidelines Here]

587
README.md
View File

@@ -1,104 +1,281 @@
# MyAuth Module
A reusable, modular authentication system for FastAPI applications with pluggable database backends.
A reusable, modular authentication system for FastAPI applications, designed for deployment on an internal PyPI server.
## Overview
This module provides a complete authentication solution designed to be deployed on internal PyPI servers and reused across multiple projects. It handles user registration, login/logout, token management, email verification, and password reset functionality.
MyAuth provides a complete, 'out-of-the-box' authentication solution for your FastAPI projects. It's designed to be
installed as a dependency and configured in just a few lines, letting you focus on your business logic instead of user
and token management.
This module handles registration, login/logout, email verification, password reset, and token management (JWT & Refresh)
with pluggable database backends.
## Features
### Core Authentication
- ✅ User registration with email and username
- ✅ Login/logout with email-based authentication
- ✅ JWT-based access tokens (30 minutes validity)
- ✅ Opaque refresh tokens stored in database (7 days validity)
- ✅ Password hashing with configurable bcrypt rounds
* **Complete Authentication:** User registration, email-based login, logout.
* **Token Management:**
* **JWT Access Tokens** (default: 30 min validity).
* **Opaque Refresh Tokens** securely stored in the database (default: 7 days validity).
* **User Lifecycle:**
* Email verification (via JWT token).
* Secure password reset (via secure token stored in DB).
* Account activation / deactivation.
* **Security:**
* Password hashing with `bcrypt` (configurable rounds).
* Strict password validation (uppercase, lowercase, digit, special character).
* **Flexible Architecture:**
* **Pluggable Backends:** Supports MongoDB, PostgreSQL, and SQLite out of the box.
* **Abstract Email Service:** Use the built-in SMTP implementation or plug in your own (e.g., SendGrid, AWS SES).
* **FastAPI Integration:**
* A ready-to-use `APIRouter` that exposes all necessary endpoints under the `/auth` prefix.
* Automatic handling of custom exceptions into correct HTTP responses.
### User Management
- ✅ Email verification with JWT tokens
- ✅ Password reset with secure random tokens (15 minutes validity)
- ✅ User roles management (flexible, no predefined roles)
- ✅ User settings storage (dict field)
- ✅ Account activation/deactivation
---
### Password Security
- ✅ Strict password validation (via Pydantic):
- Minimum 8 characters
- At least 1 uppercase letter
- At least 1 lowercase letter
- At least 1 digit
- At least 1 special character
## Installation
### Architecture
- ✅ Abstract base classes for database persistence
- ✅ Multiple database implementations: MongoDB, SQLite, PostgreSQL
- ✅ Abstract email service interface with optional SMTP implementation
- ✅ Custom exceptions with FastAPI integration
- ✅ Synchronous implementation
- ✅ Ready-to-use FastAPI router with `/auth` prefix
Install the base module and choose your "extras" based on your infrastructure.
## Project Structure
```bash
# Base installation (core logic, no DB or email drivers)
pip install myauth --index-url [http://your-pypiserver.com/](http://your-pypiserver.com/)
```
├──src
│ my_auth/
│ ├── __init__.py
│ ├── models/ # Pydantic models
│ │ ├── user.py # User model with roles and settings
│ │ ├── token.py # Token models (access, refresh, reset)
│ │ └── email_verification.py # Email verification models
│ ├── core/ # Business logic
│ │ ├── auth.py # Authentication service
│ │ ├── password.py # Password hashing/verification
│ │ └── token.py # Token generation/validation
│ ├── persistence/ # Database abstraction
│ │ ├── base.py # Abstract base classes
│ │ ├── mongodb.py # MongoDB implementation
│ │ ├── sqlite.py # SQLite implementation
│ │ └── postgresql.py # PostgreSQL implementation
│ ├── api/ # FastAPI routes
│ │ └── routes.py # All authentication endpoints
│ ├── email/ # Email service
│ │ ├── base.py # Abstract interface
│ │ └── smtp.py # SMTP implementation (optional)
│ ├── exceptions.py # Custom exceptions
│ └── config.py # Configuration classes
├── tests
# --- Installation with Extras (Recommended) ---
# To use with MongoDB
pip install "myauth[mongodb]" --index-url [http://your-pypiserver.com/](http://your-pypiserver.com/)
# To use with PostgreSQL
pip install "myauth[postgresql]" --index-url [http://your-pypiserver.com/](http://your-pypiserver.com/)
# To use the built-in SMTP email service
pip install "myauth[email]" --index-url [http://your-pypiserver.com/](http://your-pypiserver.com/)
# To combine extras (e.g., PostgreSQL + Email)
pip install "myauth[postgresql,email]" --index-url [http://your-pypiserver.com/](http://your-pypiserver.com/)
```
## User Model
### Core Dependencies
```python
class User:
id: str # Unique identifier
email: str # Unique, required
username: str # Required, non-unique
hashed_password: str # Bcrypt hashed
roles: list[str] # Free-form roles, no defaults
user_settings: dict # Custom user settings
is_verified: bool # Email verification status
is_active: bool # Account active status
created_at: datetime
updated_at: datetime
The core module requires: fastapi, pydantic, pydantic-settings, python-jose[cryptography], passlib[bcrypt],
python-multipart
## Quick Start
To get started, choose one of the database options below. Each example is complete and self-contained: it sets up the
FastAPI app, the authentication service, and the related routes.
Copy-paste the example that matches your infrastructure into your main.py.
### Option 1: Quick Start with MongoDB
This example configures myauth to use MongoDB as its backend.
```Python
from fastapi import FastAPI
from my_auth import AuthService
from my_auth.api import auth_router
from my_auth.persistence.mongodb import MongoUserRepository, MongoTokenRepository
# 1. Initialize FastAPI app
app = FastAPI()
# 2. Configure repositories for MongoDB
# Make sure your connection string is correct
user_repo = MongoUserRepository(connection_string="mongodb://localhost:27017/myappdb")
token_repo = MongoTokenRepository(connection_string="mongodb://localhost:27017/myappdb")
# 3. Configure the Authentication Service
# IMPORTANT: Change this to a long, random, secret string
auth_service = AuthService(
user_repository=user_repo,
token_repository=token_repo,
jwt_secret="YOUR_SUPER_LONG_AND_SECURE_JWT_SECRET_HERE"
# email_service will be added in the next step
)
# 4. Include the authentication routes
# Endpoints like /auth/login, /auth/register are now active
app.include_router(auth_router)
@app.get("/")
def read_root():
return {"message": "Application running with MyAuth (MongoDB)"}
```
## Token Management
### Option 2: Quick Start with PostgreSQL
### Token Types
The module uses a unified tokens collection with a discriminator field:
This example configures myauth to use PostgreSQL as its backend.
1. **Access Token (JWT)**: 30 minutes validity, stateless
2. **Refresh Token (Opaque)**: 7 days validity, stored in DB
3. **Password Reset Token (Random)**: 15 minutes validity, stored in DB
4. **Email Verification Token (JWT)**: Stateless, no DB storage
```Python
### Token Storage
All tokens requiring storage (refresh and password reset) are kept in a single `tokens` collection/table with a `token_type` discriminator field.
from fastapi import FastAPI
from my_auth import AuthService
from my_auth.api import auth_router
from my_auth.persistence.postgresql import PostgreSQLUserRepository, PostgreSQLTokenRepository
## API Endpoints
# 1. Initialize FastAPI app
app = FastAPI()
The module exposes a pre-configured FastAPI router with the following endpoints:
# 2. Configure repositories for PostgreSQL
# Update with your database credentials
db_config = {
"host": "localhost",
"port": 5432,
"database": "mydb",
"user": "postgres",
"password": "secret"
}
user_repo = PostgreSQLUserRepository(**db_config)
token_repo = PostgreSQLTokenRepository(**db_config)
# 3. Configure the Authentication Service
# IMPORTANT: Change this to a long, random, secret string
auth_service = AuthService(
user_repository=user_repo,
token_repository=token_repo,
jwt_secret="YOUR_SUPER_LONG_AND_SECURE_JWT_SECRET_HERE"
# email_service will be added in the next step
)
# 4. Include the authentication routes
# Endpoints like /auth/login, /auth/register are now active
app.include_router(auth_router)
@app.get("/")
def read_root():
return {"message": "Application running with MyAuth (PostgreSQL)"}
```
### Option 3: Quick Start with SQLite
This example configures myauth to use SQLite, which is ideal for development or small applications.
```Python
from fastapi import FastAPI
from my_auth import AuthService
from my_auth.api import auth_router
from my_auth.persistence.sqlite import SQLiteUserRepository, SQLiteTokenRepository
# 1. Initialize FastAPI app
app = FastAPI()
# 2. Configure repositories for SQLite
# This will create/use a file named 'auth.db' in the current directory
db_path = "./auth.db"
user_repo = SQLiteUserRepository(db_path=db_path)
token_repo = SQLiteTokenRepository(db_path=db_path)
# 3. Configure the Authentication Service
# IMPORTANT: Change this to a long, random, secret string
auth_service = AuthService(
user_repository=user_repo,
token_repository=token_repo,
jwt_secret="YOUR_SUPER_LONG_AND_SECURE_JWT_SECRET_HERE"
# email_service will be added in the next step
)
# 4. Include the authentication routes
# Endpoints like /auth/login, /auth/register are now active
app.include_router(auth_router)
@app.get("/")
def read_root():
return {"message": "Application running with MyAuth (SQLite)"}
```
## Next Step: Configure the Email Service
For email verification (/auth/verify-email) and password resets (/auth/password-reset) to work, you must provide an
email service to the AuthService.
Simply modify the AuthService initialization from the Quick Start step you chose.
### Option 1: Use the Built-in SMTP Service
This is the simplest option if you have an SMTP server (like Gmail, SendGrid SMTP, etc.). Remember to install the extra:
pip install "myauth[email]"
```Python
# ... (keep your app and repository config from the Quick Start)
from my_auth.email.smtp import SMTPEmailService
# 1. Configure the email service
email_service = SMTPEmailService(
host="smtp.gmail.com",
port=587,
username="your-email@gmail.com",
password="your-app-password", # Use an 'App Password' for Gmail
use_tls=True
)
# 2. Pass the email service to AuthService
auth_service = AuthService(
user_repository=user_repo, # From Quick Start
token_repository=token_repo, # From Quick Start
email_service=email_service, # Add this line
jwt_secret="YOUR_SUPER_LONG_AND_SECURE_JWT_SECRET_HERE"
)
# ... (keep 'app.include_router(auth_router)')
```
### Option 2: Create a Custom Email Service
If you use a third-party service (like AWS SES, Mailgun) that requires an API, you can implement your own.
```Python
# ... (keep your app and repository config from the Quick Start)
from my_auth.email.base import EmailService
# 1. Implement your custom email service
class CustomEmailService(EmailService):
def __init__(self, api_key: str):
# self.api_key = api_key
# self.client = ThirdPartyClient(api_key=api_key)
print("Custom Email Service Initialized")
def send_verification_email(self, email: str, token: str) -> None:
# Your custom logic to send an email via an API
# verification_link = f"http://localhost:8000/verify?token={token}"
print(f"Sending VERIFICATION to {email} with token {token}")
pass
def send_password_reset_email(self, email: str, token: str) -> None:
# Your custom logic to send an email via an API
# reset_link = f"http://localhost:8000/reset-password?token={token}"
print(f"Sending PASSWORD RESET to {email} with token {token}")
pass
# 2. Initialize your custom service
email_service = CustomEmailService(api_key="YOUR_API_KEY_HERE")
# 3. Pass your custom service to AuthService
auth_service = AuthService(
user_repository=user_repo, # From Quick Start
token_repository=token_repo, # From Quick Start
email_service=email_service, # Add this line
jwt_secret="YOUR_SUPER_LONG_AND_SECURE_JWT_SECRET_HERE"
)
# ... (keep 'app.include_router(auth_router)')
```
## API Endpoints Reference
The `auth_router` exposes the following endpoints under the `/auth` prefix:
```
POST /auth/register # User registration
@@ -112,174 +289,124 @@ POST /auth/verify-email # Verify email with token
GET /auth/me # Get current user info
```
## Installation
### Error Handling
### Dependencies
The module uses custom exceptions that are automatically converted to the appropriate HTTP responses by FastAPI:
**Core dependencies:**
```bash
pip install fastapi pydantic pydantic-settings python-jose[cryptography] passlib[bcrypt] python-multipart
```
**Database-specific dependencies:**
- MongoDB: `pip install pymongo`
- SQLite: Built-in (no additional dependency)
- PostgreSQL: `pip install psycopg2-binary`
**Optional email dependency:**
- SMTP: `pip install secure-smtplib`
## Usage
### Basic Setup
```python
from fastapi import FastAPI
from auth_module import AuthService
from auth_module.persistence.mongodb import MongoUserRepository, MongoTokenRepository
from auth_module.api import auth_router
# Initialize repositories
user_repo = MongoUserRepository(connection_string="mongodb://localhost:27017/mydb")
token_repo = MongoTokenRepository(connection_string="mongodb://localhost:27017/mydb")
# Initialize auth service
auth_service = AuthService(
user_repository=user_repo,
token_repository=token_repo,
jwt_secret="your-secret-key-here",
access_token_expire_minutes=30,
refresh_token_expire_days=7,
password_reset_token_expire_minutes=15,
password_hash_rounds=12
)
# Create FastAPI app and include auth router
app = FastAPI()
app.include_router(auth_router) # Mounts at /auth prefix
```
### Using Different Databases
#### SQLite
```python
from auth_module.persistence.sqlite import SQLiteUserRepository, SQLiteTokenRepository
user_repo = SQLiteUserRepository(db_path="./auth.db")
token_repo = SQLiteTokenRepository(db_path="./auth.db")
```
#### PostgreSQL
```python
from auth_module.persistence.postgresql import PostgreSQLUserRepository, PostgreSQLTokenRepository
user_repo = PostgreSQLUserRepository(
host="localhost",
port=5432,
database="mydb",
user="postgres",
password="secret"
)
token_repo = PostgreSQLTokenRepository(...)
```
### Email Service Configuration
```python
from auth_module.email.smtp import SMTPEmailService
email_service = SMTPEmailService(
host="smtp.gmail.com",
port=587,
username="your-email@gmail.com",
password="your-app-password",
use_tls=True
)
auth_service = AuthService(
user_repository=user_repo,
token_repository=token_repo,
email_service=email_service, # Optional
...
)
```
### Custom Email Service
Implement your own email service by extending the abstract base class:
```python
from auth_module.email.base import EmailService
class CustomEmailService(EmailService):
def send_verification_email(self, email: str, token: str) -> None:
# Your implementation (SendGrid, AWS SES, etc.)
pass
def send_password_reset_email(self, email: str, token: str) -> None:
# Your implementation
pass
```
## Error Handling
The module uses custom exceptions that are automatically converted to appropriate HTTP responses:
- `InvalidCredentialsError` → 401 Unauthorized
- `UserAlreadyExistsError` → 409 Conflict
- `UserNotFoundError` → 404 Not Found
- `InvalidTokenError` → 401 Unauthorized
- `RevokedTokenError` → 401 Unauthorized
- `ExpiredTokenError` → 401 Unauthorized
- `EmailNotVerifiedError` → 403 Forbidden
- `AccountDisabledError` → 403 Forbidden
* `InvalidCredentialsError`**401 Unauthorized**
* `UserAlreadyExistsError`**409 Conflict**
* `UserNotFoundError`**404 Not Found**
* `InvalidTokenError`**401 Unauthorized**
* `RevokedTokenError`**401 Unauthorized**
* `ExpiredTokenError`**401 Unauthorized**
* `EmailNotVerifiedError`**403 Forbidden (on login attempt)**
* `AccountDisabledError`**403 Forbidden (on login attempt)**
## Configuration Options
```python
All options are passed during the `AuthService` initialization:
```Python
AuthService(
user_repository: UserRepository, # Required
token_repository: TokenRepository, # Required
jwt_secret: str, # Required
jwt_algorithm: str = "HS256", # Optional
access_token_expire_minutes: int = 30, # Optional
refresh_token_expire_days: int = 7, # Optional
password_reset_token_expire_minutes: int = 15, # Optional
password_hash_rounds: int = 12, # Optional (bcrypt cost)
email_service: EmailService = None # Optional
user_repository: UserRepository, # Required
token_repository: TokenRepository, # Required
jwt_secret: str, # Required
jwt_algorithm: str = "HS256", # Optional
access_token_expire_minutes: int = 30, # Optional
refresh_token_expire_days: int = 7, # Optional
password_reset_token_expire_minutes: int = 15, # Optional
password_hash_rounds: int = 12, # Optional (bcrypt cost)
email_service: EmailService = None # Optional
)
```
## Testing
## Appendix (Contributor & Development Details)
The module is fully testable with pytest. Test fixtures are provided for each database implementation.
<details> <summary><b> Appendix A: Project Structure (src/my_auth)</b></summary>
```
my_auth/
├── __init__.py
├── models/ # Pydantic models (User, Token...)
│ ├── user.py
│ ├── token.py
│ └── email_verification.py
├── core/ # Business logic (Auth, Password, Token services)
│ ├── auth.py
│ ├── password.py
│ └── token.py
├── persistence/ # Database abstraction
│ ├── base.py # Abstract base classes
│ ├── mongodb.py
│ ├── sqlite.py
│ └── postgresql.py
├── api/ # FastAPI routes
│ └── routes.py
├── email/ # Email service
│ ├── base.py # Abstract interface
│ └── smtp.py
├── exceptions.py # Custom HTTP exceptions
└── config.py # Configuration classes (if any)
```
</details>
<details> <summary><b> Appendix B: Internal Data Models </b></summary>
### User Model
This is the internal Pydantic model used by the service.
```Python
class User:
id: str # Unique identifier
email: str # Unique, required
username: str # Required, non-unique
hashed_password: str # Bcrypt hashed
roles: list[str] # Free-form roles, no defaults
user_settings: dict # Custom user settings
is_verified: bool # Email verification status
is_active: bool # Account active status
created_at: datetime
updated_at: datetime
```
### Token Management
The module uses a unified `tokens` collection/table with a `token_type` discriminator field for all persistent tokens.
1. Access Token (JWT): 30 minutes validity, stateless, not stored in DB.
1. Refresh Token (Opaque): 7 days validity, stored in DB.
1. Password Reset Token (Random): 15 minutes validity, stored in DB.
1. Email Verification Token (JWT): Stateless, not stored in DB.
</details>
<details> <summary><b> Appendix C: Testing & Security </b></summary>
### Testing
The module is testable with `pytest`.
```Bash
```bash
pytest tests/
```
## Security Considerations
### Security Considerations
- Passwords are hashed using bcrypt with configurable rounds
- JWT tokens are signed with HS256 (configurable)
- Refresh tokens are opaque and stored securely
- Password reset tokens are single-use and expire after 15 minutes
- Email verification tokens are stateless JWT
- Rate limiting should be implemented at the application level
- HTTPS should be enforced by the application
* Passwords are hashed using bcrypt with configurable rounds.
* JWT tokens are signed with HS256 (configurable).
* Refresh tokens are opaque and stored securely in the database.
* Password reset tokens are single-use, opaque, and expire quickly (15 min).
* Rate Limiting is not included and should be implemented at the application level (e.g., using `slowapi`).
* HTTPS should be enforced by the production web server (e.g., Nginx, Traefik).
## Future Enhancements (Not Included)
- Multi-factor authentication (2FA/MFA)
- Rate limiting on login attempts
- OAuth2 provider integration
- Session management (multiple device tracking)
- Account lockout after failed attempts
</details>
## License
[Your License Here]
## Contributing
[Your Contributing Guidelines Here]
MIT

83
pyproject.toml Normal file
View File

@@ -0,0 +1,83 @@
# File: pyproject.toml
[build-system]
# Define the build system requirements
requires = ["setuptools>=80.9", "wheel"]
build-backend = "setuptools.build_meta"
[project]
name = "myauth"
version = "0.1.0" # Start with an initial version
description = "A reusable, modular authentication system for FastAPI applications with pluggable database backends."
readme = "README.md"
authors = [
{ name = "Kodjo Sossouvi", email = "kodjo.sossouvi@gmail.com" },
]
maintainers = [
{ name = "Kodjo Sossouvi", email = "kodjo.sossouvi@gmail.com" }
]
license = "MIT"
requires-python = ">=3.8"
classifiers = [
"Operating System :: OS Independent",
"Framework :: FastAPI",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
"Topic :: Auth",
]
# -------------------------------------------------------------------
# Core dependencies from your README
# Note: 'requirements.txt' is for development, this is for the package
# -------------------------------------------------------------------
dependencies = [
"fastapi",
"pydantic",
"pydantic-settings",
"python-jose[cryptography]",
"passlib[bcrypt]",
"python-multipart"
]
[project.urls]
# Optional: Link to your internal repository or documentation
Homepage = "https://gitea.sheerka.synology.me/kodjo/MyAuth"
Documentation = "https://gitea.sheerka.synology.me/kodjo/MyAuth#readme"
Repository = "https://gitea.sheerka.synology.me/kodjo/MyAuth"
Issues = "https://gitea.sheerka.synology.me/kodjo/MyAuth/issues"
# -------------------------------------------------------------------
# Optional dependencies ("extras")
# This allows users to install only what they need, e.g.:
# pip install myauth[mongodb,email]
# -------------------------------------------------------------------
[project.optional-dependencies]
mongodb = [
"pymongo"
]
postgresql = [
"psycopg2-binary"
]
email = [
"secure-smtplib"
]
# For development and testing (from your requirements.txt)
dev = [
"pytest",
"httpx",
"anyio",
"email-validator",
"python-dotenv"
]
# -------------------------------------------------------------------
# Setuptools configuration
# This section tells the build system where to find your package code
# -------------------------------------------------------------------
[tool.setuptools]
package-dir = {"" = "src"}
packages = ["my_auth"]

34
requirements.txt Normal file
View File

@@ -0,0 +1,34 @@
annotated-types==0.7.0
anyio==4.11.0
bcrypt==5.0.0
certifi==2025.10.5
cffi==2.0.0
cryptography==46.0.3
dnspython==2.8.0
ecdsa==0.19.1
email-validator==2.3.0
fastapi==0.119.0
h11==0.16.0
httpcore==1.0.9
httpx==0.28.1
idna==3.11
iniconfig==2.1.0
packaging==25.0
passlib==1.7.4
pluggy==1.6.0
pyasn1==0.6.1
pycparser==2.23
pydantic==2.12.3
pydantic-settings==2.11.0
pydantic_core==2.41.4
Pygments==2.19.2
pytest==8.4.2
python-dotenv==1.1.1
python-jose==3.5.0
python-multipart==0.0.20
rsa==4.9.1
six==1.17.0
sniffio==1.3.1
starlette==0.48.0
typing-inspection==0.4.2
typing_extensions==4.15.0

View File

379
src/my_auth/api/routes.py Normal file
View File

@@ -0,0 +1,379 @@
"""
FastAPI routes for authentication module.
This module provides ready-to-use FastAPI routes for all authentication
operations. Routes are organized in an APIRouter with /auth prefix.
"""
from typing import Annotated
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from ..core.auth import AuthService
from ..exceptions import AuthError
from ..models.email_verification import (
EmailVerificationRequest,
PasswordResetRequest,
PasswordResetConfirm
)
from ..models.token import AccessTokenResponse, RefreshTokenRequest
from ..models.user import UserCreate, UserResponse
# OAuth2 scheme for token authentication
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/login")
def create_auth_router(auth_service: AuthService) -> APIRouter:
"""
Create and configure the authentication router.
This factory function creates an APIRouter with all authentication
endpoints configured. The router includes automatic exception handling
for authentication errors.
Args:
auth_service: Configured authentication service instance.
Returns:
Configured APIRouter ready to be included in a FastAPI app.
Example:
>>> from fastapi import FastAPI
>>> from auth_module.api.routes import create_auth_router
>>>
>>> app = FastAPI()
>>> auth_router = create_auth_router(auth_service)
>>> app.include_router(auth_router)
"""
router = APIRouter(prefix="/auth", tags=["authentication"])
def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]) -> UserResponse:
"""
Dependency to extract and validate the current user from access token.
This dependency can be used in any route that requires authentication.
It extracts the Bearer token from the Authorization header and validates it.
Args:
token: JWT access token from Authorization header.
Returns:
The current authenticated user (as UserResponse).
Raises:
HTTPException: 401 if token is invalid or expired.
Example:
>>> @app.get("/protected")
>>> def protected_route(user: UserResponse = Depends(get_current_user)):
>>> return {"user_id": user.id}
"""
try:
user = auth_service.get_current_user(token)
return UserResponse(
id=user.id,
email=user.email,
username=user.username,
roles=user.roles,
user_settings=user.user_settings,
created_at=user.created_at,
updated_at=user.updated_at
)
except AuthError as e:
raise HTTPException(
status_code=e.status_code,
detail=e.message
)
@router.post(
"/register",
response_model=UserResponse,
status_code=status.HTTP_201_CREATED,
summary="Register a new user",
description="Create a new user account with email, username, and password."
)
def register(user_data: UserCreate) -> UserResponse:
"""
Register a new user.
Creates a new user account with the provided credentials. The password
is automatically hashed before storage. Email verification is optional
and the account is created with is_verified=False.
Args:
user_data: User registration data including email, username, and password.
Returns:
The created user information (without password).
Raises:
HTTPException: 409 if email already exists.
HTTPException: 422 if validation fails (password strength, etc.).
"""
try:
user = auth_service.register(user_data)
return UserResponse(
id=user.id,
email=user.email,
username=user.username,
roles=user.roles,
user_settings=user.user_settings,
created_at=user.created_at,
updated_at=user.updated_at
)
except AuthError as e:
raise HTTPException(
status_code=e.status_code,
detail=e.message
)
@router.post(
"/login",
response_model=AccessTokenResponse,
summary="Login with email and password",
description="Authenticate a user and receive access and refresh tokens."
)
def login(form_data: Annotated[OAuth2PasswordRequestForm, Depends()]) -> AccessTokenResponse:
"""
Authenticate a user and generate tokens.
This endpoint accepts form data with username (which should contain the email)
and password. It returns both access and refresh tokens upon successful authentication.
Note: OAuth2PasswordRequestForm uses 'username' field, but we treat it as email.
Args:
form_data: OAuth2 form with username (email) and password fields.
Returns:
Access token (JWT) and refresh token with token_type="bearer".
Raises:
HTTPException: 401 if credentials are invalid.
HTTPException: 403 if account is disabled.
"""
try:
# OAuth2PasswordRequestForm uses 'username' but we treat it as email
user, tokens = auth_service.login(form_data.username, form_data.password)
return tokens
except AuthError as e:
raise HTTPException(
status_code=e.status_code,
detail=e.message
)
@router.post(
"/logout",
status_code=status.HTTP_204_NO_CONTENT,
summary="Logout user",
description="Revoke the refresh token to logout the user."
)
def logout(request: RefreshTokenRequest) -> None:
"""
Logout a user by revoking their refresh token.
This prevents the refresh token from being used to obtain new access tokens.
The current access token remains valid until it expires naturally (30 minutes).
Args:
request: Request body containing the refresh token to revoke.
Returns:
204 No Content on success.
"""
auth_service.logout(request.refresh_token)
return None
@router.post(
"/refresh",
response_model=AccessTokenResponse,
summary="Refresh access token",
description="Exchange a refresh token for new access and refresh tokens."
)
def refresh_token(request: RefreshTokenRequest) -> AccessTokenResponse:
"""
Obtain a new access token using a refresh token.
This endpoint allows clients to obtain a new access token without
requiring the user to re-enter their password. The old refresh token
is revoked and a new one is issued.
Args:
request: Request body containing the refresh token.
Returns:
New access token and refresh token.
Raises:
HTTPException: 401 if refresh token is invalid, expired, or revoked.
"""
try:
tokens = auth_service.refresh_access_token(request.refresh_token)
return tokens
except AuthError as e:
raise HTTPException(
status_code=e.status_code,
detail=e.message
)
@router.post(
"/password-reset-request",
status_code=status.HTTP_200_OK,
summary="Request password reset",
description="Generate a password reset token and return it (to be sent via email)."
)
def request_password_reset(request: PasswordResetRequest) -> dict:
"""
Request a password reset token.
This endpoint generates a secure token that can be used to reset the password.
In production, this token should be sent via email. For this module, the token
is returned in the response so the consuming application can handle email delivery.
Args:
request: Request body containing the email address.
Returns:
Dictionary with the reset token and a message.
Raises:
HTTPException: 404 if email is not registered.
"""
try:
token = auth_service.request_password_reset(request.email)
return {
"message": "Password reset token generated",
"token": token
}
except AuthError as e:
raise HTTPException(
status_code=e.status_code,
detail=e.message
)
@router.post(
"/password-reset",
status_code=status.HTTP_200_OK,
summary="Reset password with token",
description="Reset user password using a valid reset token."
)
def reset_password(request: PasswordResetConfirm) -> dict:
"""
Reset a user's password using a reset token.
This endpoint validates the reset token and updates the user's password.
All existing refresh tokens are revoked for security.
Args:
request: Request body containing the reset token and new password.
Returns:
Success message.
Raises:
HTTPException: 401 if token is invalid, expired, or already used.
HTTPException: 422 if new password doesn't meet requirements.
"""
try:
auth_service.reset_password(request.token, request.new_password)
return {"message": "Password reset successfully"}
except AuthError as e:
raise HTTPException(
status_code=e.status_code,
detail=e.message
)
@router.post(
"/verify-email-request",
status_code=status.HTTP_200_OK,
summary="Request email verification",
description="Generate an email verification token and return it (to be sent via email)."
)
def request_email_verification(request: EmailVerificationRequest) -> dict:
"""
Request an email verification token.
This endpoint generates a JWT token for email verification. In production,
this token should be sent via email with a verification link. For this module,
the token is returned in the response so the consuming application can handle
email delivery.
Args:
request: Request body containing the email address.
Returns:
Dictionary with the verification token and a message.
Raises:
HTTPException: 404 if email is not registered.
"""
try:
token = auth_service.request_email_verification(request.email)
return {
"message": "Email verification token generated",
"token": token
}
except AuthError as e:
raise HTTPException(
status_code=e.status_code,
detail=e.message
)
@router.get(
"/verify-email",
status_code=status.HTTP_200_OK,
summary="Verify email with token",
description="Verify user email using a verification token from query parameter."
)
def verify_email(token: str) -> dict:
"""
Verify a user's email address.
This endpoint validates the verification token and marks the user's
email as verified. It uses a query parameter so it can be easily
accessed via a link in an email.
Args:
token: Email verification token (JWT) from query parameter.
Returns:
Success message.
Raises:
HTTPException: 401 if token is invalid or expired.
"""
try:
auth_service.verify_email(token)
return {"message": "Email verified successfully"}
except AuthError as e:
raise HTTPException(
status_code=e.status_code,
detail=e.message
)
@router.get(
"/me",
response_model=UserResponse,
summary="Get current user",
description="Get information about the currently authenticated user."
)
def get_me(current_user: Annotated[UserResponse, Depends(get_current_user)]) -> UserResponse:
"""
Get current authenticated user information.
This is a protected route that requires a valid access token in the
Authorization header (Bearer token).
Args:
current_user: The authenticated user (injected by dependency).
Returns:
Current user information.
Raises:
HTTPException: 401 if token is invalid or expired.
"""
return current_user
return router

View File

@@ -7,439 +7,440 @@ password reset, and email verification.
"""
from datetime import datetime
from typing import Optional
from .password import PasswordManager
from .token import TokenManager
from ..exceptions import (
InvalidCredentialsError,
UserNotFoundError,
AccountDisabledError,
ExpiredTokenError,
InvalidTokenError,
RevokedTokenError
)
from ..models.token import AccessTokenResponse, TokenData
from ..models.user import UserCreate, UserInDB, UserUpdate
from ..persistence.base import UserRepository, TokenRepository
from ..models.user import UserCreate, UserInDB, UserUpdate
from ..models.token import AccessTokenResponse, TokenData
from ..email.base import EmailService
from ..exceptions import (
InvalidCredentialsError,
UserNotFoundError,
AccountDisabledError,
ExpiredTokenError,
InvalidTokenError,
RevokedTokenError
)
class AuthService:
"""
Main authentication service.
This service orchestrates all authentication-related operations by
coordinating between password management, token management, and
persistence layers.
Attributes:
user_repository: Repository for user persistence operations.
token_repository: Repository for token persistence operations.
password_manager: Manager for password hashing and verification.
token_manager: Manager for token creation and validation.
"""
def __init__(
self,
user_repository: UserRepository,
token_repository: TokenRepository,
password_manager: PasswordManager,
token_manager: TokenManager
):
"""
Initialize the authentication service.
Args:
user_repository: Repository for user persistence.
token_repository: Repository for token persistence.
jwt_secret: Secret key for JWT signing.
jwt_algorithm: JWT algorithm (default: HS256).
access_token_expire_minutes: Access token validity (default: 30).
refresh_token_expire_days: Refresh token validity (default: 7).
password_reset_token_expire_minutes: Reset token validity (default: 15).
password_hash_rounds: Bcrypt rounds (default: 12).
Main authentication service.
This service orchestrates all authentication-related operations by
coordinating between password management, token management, and
persistence layers.
Attributes:
user_repository: Repository for user persistence operations.
token_repository: Repository for token persistence operations.
password_manager: Manager for password hashing and verification.
token_manager: Manager for token creation and validation.
email_service: Optional service for sending emails.
"""
self.user_repository = user_repository
self.token_repository = token_repository
self.password_manager = password_manager
self.token_manager = token_manager
def register(self, user_data: UserCreate) -> UserInDB:
"""
Register a new user.
This method creates a new user account with hashed password.
The user's email is initially unverified.
Args:
user_data: User registration data including password.
Returns:
The created user (without password).
Raises:
UserAlreadyExistsError: If email is already registered.
Example:
>>> user_data = UserCreate(
... email="user@example.com",
... username="john_doe",
... password="SecurePass123!"
... )
>>> user = auth_service.register(user_data)
"""
hashed_password = self.password_manager.hash_password(user_data.password)
user = self.user_repository.create_user(user_data, hashed_password)
return user
def login(self, email: str, password: str) -> tuple[UserInDB, AccessTokenResponse]:
"""
Authenticate a user and create tokens.
This method verifies credentials, checks account status, and generates
both access and refresh tokens.
Args:
email: User's email address.
password: User's plain text password.
Returns:
Tuple of (user, tokens) where tokens contains access_token and refresh_token.
Raises:
InvalidCredentialsError: If email or password is incorrect.
AccountDisabledError: If user account is disabled.
Example:
>>> user, tokens = auth_service.login("user@example.com", "password")
>>> print(tokens.access_token)
"""
# Get user by email
user = self.user_repository.get_user_by_email(email)
if not user:
raise InvalidCredentialsError()
# Verify password
if not self.password_manager.verify_password(password, user.hashed_password):
raise InvalidCredentialsError()
def __init__(
self,
user_repository: UserRepository,
token_repository: TokenRepository,
password_manager: PasswordManager,
token_manager: TokenManager,
email_service: Optional[EmailService] = None
):
"""
Initialize the authentication service.
Args:
user_repository: Repository for user persistence.
token_repository: Repository for token persistence.
password_manager: Manager for password hashing and verification.
token_manager: Manager for token creation and validation.
email_service: Optional service for sending emails (password reset, verification).
"""
self.user_repository = user_repository
self.token_repository = token_repository
self.password_manager = password_manager
self.token_manager = token_manager
self.email_service = email_service
# Check if account is active
if not user.is_active:
raise AccountDisabledError()
def register(self, user_data: UserCreate) -> UserInDB:
"""
Register a new user.
This method creates a new user account with hashed password.
The user's email is initially unverified.
Args:
user_data: User registration data including password.
Returns:
The created user (without password).
Raises:
UserAlreadyExistsError: If email is already registered.
Example:
>>> user_data = UserCreate(
... email="user@example.com",
... username="john_doe",
... password="SecurePass123!"
... )
>>> user = auth_service.register(user_data)
"""
hashed_password = self.password_manager.hash_password(user_data.password)
user = self.user_repository.create_user(user_data, hashed_password)
return user
# Create tokens
access_token = self.token_manager.create_access_token(user)
refresh_token = self.token_manager.create_refresh_token()
def login(self, email: str, password: str) -> tuple[UserInDB, AccessTokenResponse]:
"""
Authenticate a user and create tokens.
This method verifies credentials, checks account status, and generates
both access and refresh tokens.
Args:
email: User's email address.
password: User's plain text password.
Returns:
Tuple of (user, tokens) where tokens contains access_token and refresh_token.
Raises:
InvalidCredentialsError: If email or password is incorrect.
AccountDisabledError: If user account is disabled.
Example:
>>> user, tokens = auth_service.login("user@example.com", "password")
>>> print(tokens.access_token)
"""
# Get user by email
user = self.user_repository.get_user_by_email(email)
if not user:
raise InvalidCredentialsError()
# Verify password
if not self.password_manager.verify_password(password, user.hashed_password):
raise InvalidCredentialsError()
# Check if account is active
if not user.is_active:
raise AccountDisabledError()
# Create tokens
access_token = self.token_manager.create_access_token(user)
refresh_token = self.token_manager.create_refresh_token()
# Store refresh token in database
token_data = TokenData(
token=refresh_token,
token_type="refresh",
user_id=user.id,
expires_at=self.token_manager.get_refresh_token_expiration(),
created_at=datetime.utcnow(),
is_revoked=False
)
self.token_repository.save_token(token_data)
tokens = AccessTokenResponse(
access_token=access_token,
refresh_token=refresh_token,
token_type="bearer"
)
return user, tokens
# Store refresh token in database
token_data = TokenData(
token=refresh_token,
token_type="refresh",
user_id=user.id,
expires_at=self.token_manager.get_refresh_token_expiration(),
created_at=datetime.now(),
is_revoked=False
)
self.token_repository.save_token(token_data)
def refresh_access_token(self, refresh_token: str) -> AccessTokenResponse:
"""
Create a new access token using a refresh token.
This method validates the refresh token and generates a new access token
without requiring the user to re-enter their password.
Args:
refresh_token: The refresh token to exchange.
Returns:
New access and refresh tokens.
Raises:
InvalidTokenError: If refresh token is invalid.
ExpiredTokenError: If refresh token has expired.
RevokedTokenError: If refresh token has been revoked.
UserNotFoundError: If user no longer exists.
AccountDisabledError: If user account is disabled.
Example:
>>> tokens = auth_service.refresh_access_token(old_refresh_token)
>>> print(tokens.access_token)
"""
# Validate refresh token
token_data = self.token_repository.get_token(refresh_token, "refresh")
if not token_data:
raise InvalidTokenError("Invalid refresh token")
if token_data.is_revoked:
raise RevokedTokenError()
if token_data.expires_at < datetime.utcnow():
raise ExpiredTokenError()
# Get user
user = self.user_repository.get_user_by_id(token_data.user_id)
if not user:
raise UserNotFoundError()
if not user.is_active:
raise AccountDisabledError()
# Create new tokens
access_token = self.token_manager.create_access_token(user)
new_refresh_token = self.token_manager.create_refresh_token()
# Revoke old refresh token
self.token_repository.revoke_token(refresh_token)
# Store new refresh token
new_token_data = TokenData(
token=new_refresh_token,
token_type="refresh",
user_id=user.id,
expires_at=self.token_manager.get_refresh_token_expiration(),
created_at=datetime.utcnow(),
is_revoked=False
)
self.token_repository.save_token(new_token_data)
return AccessTokenResponse(
access_token=access_token,
refresh_token=new_refresh_token,
token_type="bearer"
)
tokens = AccessTokenResponse(
access_token=access_token,
refresh_token=refresh_token,
token_type="bearer"
)
def logout(self, refresh_token: str) -> bool:
"""
Logout a user by revoking their refresh token.
This prevents the refresh token from being used to obtain new
access tokens. The current access token will remain valid until
it expires naturally.
Args:
refresh_token: The refresh token to revoke.
Returns:
True if logout was successful, False if token not found.
Example:
>>> auth_service.logout(refresh_token)
True
"""
return self.token_repository.revoke_token(refresh_token)
return user, tokens
def refresh_access_token(self, refresh_token: str) -> AccessTokenResponse:
"""
Create a new access token using a refresh token.
This method validates the refresh token and generates a new access token
without requiring the user to re-enter their password.
Args:
refresh_token: The refresh token to exchange.
Returns:
New access and refresh tokens.
Raises:
InvalidTokenError: If refresh token is invalid.
ExpiredTokenError: If refresh token has expired.
RevokedTokenError: If refresh token has been revoked.
UserNotFoundError: If user no longer exists.
AccountDisabledError: If user account is disabled.
Example:
>>> tokens = auth_service.refresh_access_token(old_refresh_token)
>>> print(tokens.access_token)
"""
# Validate refresh token
token_data = self.token_repository.get_token(refresh_token, "refresh")
if not token_data:
raise InvalidTokenError("Invalid refresh token")
def request_password_reset(self, email: str) -> str:
"""
Generate a password reset token for a user.
This method creates a secure token that can be sent to the user
via email to reset their password. If an email service is configured,
the email is sent automatically. Otherwise, the token is returned
for manual handling.
Args:
email: User's email address.
Returns:
The password reset token to be sent via email.
Raises:
UserNotFoundError: If email is not registered.
Example:
>>> token = auth_service.request_password_reset("user@example.com")
>>> # Token is automatically sent via email if service is configured
"""
user = self.user_repository.get_user_by_email(email)
if not user:
raise UserNotFoundError(f"No user found with email {email}")
# Create reset token
reset_token = self.token_manager.create_password_reset_token()
# Store token in database
token_data = TokenData(
token=reset_token,
token_type="password_reset",
user_id=user.id,
expires_at=self.token_manager.get_password_reset_token_expiration(),
created_at=datetime.utcnow(),
is_revoked=False
)
self.token_repository.save_token(token_data)
# Send email if service is configured
if self.email_service:
self.email_service.send_password_reset_email(email, reset_token)
return reset_token
if token_data.is_revoked:
raise RevokedTokenError()
def reset_password(self, token: str, new_password: str) -> bool:
"""
Reset a user's password using a reset token.
This method validates the reset token and updates the user's password.
All existing refresh tokens for the user are revoked for security.
Args:
token: Password reset token.
new_password: New plain text password (will be hashed).
Returns:
True if password was reset successfully.
Raises:
InvalidTokenError: If reset token is invalid.
ExpiredTokenError: If reset token has expired.
RevokedTokenError: If reset token has been used.
UserNotFoundError: If user no longer exists.
Example:
>>> auth_service.reset_password(token, "NewSecurePass123!")
True
"""
# Validate reset token
token_data = self.token_repository.get_token(token, "password_reset")
if not token_data:
raise InvalidTokenError("Invalid password reset token")
if token_data.is_revoked:
raise RevokedTokenError("Password reset token has already been used")
if token_data.expires_at < datetime.utcnow():
raise ExpiredTokenError("Password reset token has expired")
# Get user
user = self.user_repository.get_user_by_id(token_data.user_id)
if not user:
raise UserNotFoundError()
# Hash new password
hashed_password = self.password_manager.hash_password(new_password)
# Update user password
updates = UserUpdate(password=hashed_password)
self.user_repository.update_user(user.id, updates)
# Revoke the reset token
self.token_repository.revoke_token(token)
# Revoke all user's refresh tokens for security
self.token_repository.revoke_all_user_tokens(user.id, "refresh")
return True
if token_data.expires_at < datetime.now():
raise ExpiredTokenError()
def request_email_verification(self, email: str) -> str:
"""
Generate an email verification token for a user.
This method creates a JWT token that can be sent to the user
to verify their email address. If an email service is configured,
the email is sent automatically. Otherwise, the token is returned
for manual handling.
Args:
email: User's email address.
Returns:
The email verification token (JWT) to be sent via email.
Raises:
UserNotFoundError: If email is not registered.
Example:
>>> token = auth_service.request_email_verification("user@example.com")
>>> # Token is automatically sent via email if service is configured
"""
user = self.user_repository.get_user_by_email(email)
if not user:
raise UserNotFoundError(f"No user found with email {email}")
verification_token = self.token_manager.create_email_verification_token(email)
# Send email if service is configured
if self.email_service:
self.email_service.send_verification_email(email, verification_token)
return verification_token
# Get user
user = self.user_repository.get_user_by_id(token_data.user_id)
if not user:
raise UserNotFoundError()
def verify_email(self, token: str) -> bool:
"""
Verify a user's email address using a verification token.
This method decodes the JWT token, extracts the email, and marks
the user's email as verified.
Args:
token: Email verification token (JWT).
Returns:
True if email was verified successfully.
Raises:
InvalidTokenError: If verification token is invalid.
ExpiredTokenError: If verification token has expired.
UserNotFoundError: If user no longer exists.
Example:
>>> auth_service.verify_email(token)
True
"""
# Decode and validate token
email = self.token_manager.decode_email_verification_token(token)
# Get user
user = self.user_repository.get_user_by_email(email)
if not user:
raise UserNotFoundError(f"No user found with email {email}")
# Update user's verified status
updates = UserUpdate(is_verified=True)
self.user_repository.update_user(user.id, updates)
return True
if not user.is_active:
raise AccountDisabledError()
# Create new tokens
access_token = self.token_manager.create_access_token(user)
new_refresh_token = self.token_manager.create_refresh_token()
# Revoke old refresh token
self.token_repository.revoke_token(refresh_token)
# Store new refresh token
new_token_data = TokenData(
token=new_refresh_token,
token_type="refresh",
user_id=user.id,
expires_at=self.token_manager.get_refresh_token_expiration(),
created_at=datetime.now(),
is_revoked=False
)
self.token_repository.save_token(new_token_data)
return AccessTokenResponse(
access_token=access_token,
refresh_token=new_refresh_token,
token_type="bearer"
)
def logout(self, refresh_token: str) -> bool:
"""
Logout a user by revoking their refresh token.
This prevents the refresh token from being used to obtain new
access tokens. The current access token will remain valid until
it expires naturally.
Args:
refresh_token: The refresh token to revoke.
Returns:
True if logout was successful, False if token not found.
Example:
>>> auth_service.logout(refresh_token)
True
"""
return self.token_repository.revoke_token(refresh_token)
def request_password_reset(self, email: str) -> str:
"""
Generate a password reset token for a user.
This method creates a secure token that can be sent to the user
via email to reset their password.
Args:
email: User's email address.
Returns:
The password reset token to be sent via email.
Raises:
UserNotFoundError: If email is not registered.
Example:
>>> token = auth_service.request_password_reset("user@example.com")
>>> # Send token via email service
"""
user = self.user_repository.get_user_by_email(email)
if not user:
raise UserNotFoundError(f"No user found with email {email}")
# Create reset token
reset_token = self.token_manager.create_password_reset_token()
# Store token in database
token_data = TokenData(
token=reset_token,
token_type="password_reset",
user_id=user.id,
expires_at=self.token_manager.get_password_reset_token_expiration(),
created_at=datetime.now(),
is_revoked=False
)
self.token_repository.save_token(token_data)
return reset_token
def reset_password(self, token: str, new_password: str) -> bool:
"""
Reset a user's password using a reset token.
This method validates the reset token and updates the user's password.
All existing refresh tokens for the user are revoked for security.
Args:
token: Password reset token.
new_password: New plain text password (will be hashed).
Returns:
True if password was reset successfully.
Raises:
InvalidTokenError: If reset token is invalid.
ExpiredTokenError: If reset token has expired.
RevokedTokenError: If reset token has been used.
UserNotFoundError: If user no longer exists.
Example:
>>> auth_service.reset_password(token, "NewSecurePass123!")
True
"""
# Validate reset token
token_data = self.token_repository.get_token(token, "password_reset")
if not token_data:
raise InvalidTokenError("Invalid password reset token")
if token_data.is_revoked:
raise RevokedTokenError("Password reset token has already been used")
if token_data.expires_at < datetime.now():
raise ExpiredTokenError("Password reset token has expired")
# Get user
user = self.user_repository.get_user_by_id(token_data.user_id)
if not user:
raise UserNotFoundError()
# Hash new password
hashed_password = self.password_manager.hash_password(new_password)
# Update user password
updates = UserUpdate(password=hashed_password)
self.user_repository.update_user(user.id, updates)
# Revoke the reset token
self.token_repository.revoke_token(token)
# Revoke all user's refresh tokens for security
self.token_repository.revoke_all_user_tokens(user.id, "refresh")
return True
def request_email_verification(self, email: str) -> str:
"""
Generate an email verification token for a user.
This method creates a JWT token that can be sent to the user
to verify their email address.
Args:
email: User's email address.
Returns:
The email verification token (JWT) to be sent via email.
Raises:
UserNotFoundError: If email is not registered.
Example:
>>> token = auth_service.request_email_verification("user@example.com")
>>> # Send token via email service
"""
user = self.user_repository.get_user_by_email(email)
if not user:
raise UserNotFoundError(f"No user found with email {email}")
return self.token_manager.create_email_verification_token(email)
def verify_email(self, token: str) -> bool:
"""
Verify a user's email address using a verification token.
This method decodes the JWT token, extracts the email, and marks
the user's email as verified.
Args:
token: Email verification token (JWT).
Returns:
True if email was verified successfully.
Raises:
InvalidTokenError: If verification token is invalid.
ExpiredTokenError: If verification token has expired.
UserNotFoundError: If user no longer exists.
Example:
>>> auth_service.verify_email(token)
True
"""
# Decode and validate token
email = self.token_manager.decode_email_verification_token(token)
# Get user
user = self.user_repository.get_user_by_email(email)
if not user:
raise UserNotFoundError(f"No user found with email {email}")
# Update user's verified status
updates = UserUpdate(is_verified=True)
self.user_repository.update_user(user.id, updates)
return True
def get_current_user(self, access_token: str) -> UserInDB:
"""
Retrieve the current user from an access token.
This method decodes and validates the JWT access token and
retrieves the corresponding user from the database.
Args:
access_token: JWT access token.
Returns:
The user associated with the token.
Raises:
InvalidTokenError: If access token is invalid.
ExpiredTokenError: If access token has expired.
UserNotFoundError: If user no longer exists.
AccountDisabledError: If user account is disabled.
Example:
>>> user = auth_service.get_current_user(access_token)
>>> print(user.email)
"""
# Decode and validate token
token_payload = self.token_manager.decode_access_token(access_token)
# Get user
user = self.user_repository.get_user_by_id(token_payload.sub)
if not user:
raise UserNotFoundError()
if not user.is_active:
raise AccountDisabledError()
return user
def get_default_sqlite_auth_service(db_path: str, jwt_secret: str) -> AuthService:
from my_auth.persistence.sqlite import SQLiteUserRepository
from my_auth.persistence.sqlite import SQLiteTokenRepository
user_repository = SQLiteUserRepository(db_path=db_path)
token_repository = SQLiteTokenRepository(db_path=db_path)
password_manager = PasswordManager()
token_manager = TokenManager(jwt_secret=jwt_secret)
return AuthService(
user_repository=user_repository,
token_repository=token_repository,
password_manager=password_manager,
token_manager=token_manager
)
def get_current_user(self, access_token: str) -> UserInDB:
"""
Retrieve the current user from an access token.
This method decodes and validates the JWT access token and
retrieves the corresponding user from the database.
Args:
access_token: JWT access token.
Returns:
The user associated with the token.
Raises:
InvalidTokenError: If access token is invalid.
ExpiredTokenError: If access token has expired.
UserNotFoundError: If user no longer exists.
AccountDisabledError: If user account is disabled.
Example:
>>> user = auth_service.get_current_user(access_token)
>>> print(user.email)
"""
# Decode and validate token
token_payload = self.token_manager.decode_access_token(access_token)
# Get user
user = self.user_repository.get_user_by_id(token_payload.sub)
if not user:
raise UserNotFoundError()
if not user.is_active:
raise AccountDisabledError()
return user

View File

64
src/my_auth/email/base.py Normal file
View File

@@ -0,0 +1,64 @@
"""
Abstract base class for email service.
This module defines the interface that all email service implementations
must follow. It provides methods for sending authentication-related emails.
"""
from abc import ABC, abstractmethod
class EmailService(ABC):
"""
Abstract base class for email service operations.
This interface defines all methods required for sending authentication-related
emails. Concrete implementations can use different email providers (SMTP,
SendGrid, AWS SES, etc.).
"""
@abstractmethod
def send_verification_email(self, email: str, token: str) -> None:
"""
Send an email verification link to the user.
This method should send an email containing a verification link or token
that the user can use to verify their email address.
Args:
email: The recipient's email address.
token: The verification token (JWT) to include in the email.
Raises:
Exception: If email sending fails (implementation-specific).
Example:
>>> email_service.send_verification_email(
... "user@example.com",
... "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9..."
... )
"""
pass
@abstractmethod
def send_password_reset_email(self, email: str, token: str) -> None:
"""
Send a password reset link to the user.
This method should send an email containing a password reset link or token
that the user can use to reset their password.
Args:
email: The recipient's email address.
token: The password reset token to include in the email.
Raises:
Exception: If email sending fails (implementation-specific).
Example:
>>> email_service.send_password_reset_email(
... "user@example.com",
... "a1b2c3d4e5f6..."
... )
"""
pass

211
src/my_auth/email/smtp.py Normal file
View File

@@ -0,0 +1,211 @@
"""
SMTP email service implementation.
This module provides an SMTP-based implementation of the email service
for sending authentication-related emails.
"""
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from typing import Optional
from .base import EmailService
# Default email templates
DEFAULT_VERIFICATION_TEMPLATE = """
<html>
<body>
<h2>Email Verification</h2>
<p>Please click the link below to verify your email address:</p>
<p><a href="{verification_link}">Verify Email</a></p>
<p>Or copy and paste this link into your browser:</p>
<p>{verification_link}</p>
<p>This link will expire in 7 days.</p>
<p>If you did not request this verification, please ignore this email.</p>
</body>
</html>
"""
DEFAULT_PASSWORD_RESET_TEMPLATE = """
<html>
<body>
<h2>Password Reset</h2>
<p>You have requested to reset your password. Please click the link below:</p>
<p><a href="{reset_link}">Reset Password</a></p>
<p>Or copy and paste this link into your browser:</p>
<p>{reset_link}</p>
<p>This link will expire in 15 minutes.</p>
<p>If you did not request a password reset, please ignore this email.</p>
</body>
</html>
"""
class SMTPEmailService(EmailService):
"""
SMTP implementation of EmailService.
This implementation uses standard SMTP protocol to send emails.
It supports both TLS and SSL connections and allows custom HTML
templates for email content.
Attributes:
host: SMTP server hostname.
port: SMTP server port (587 for TLS, 465 for SSL).
username: SMTP authentication username.
password: SMTP authentication password.
use_tls: Whether to use TLS (default: True).
from_email: Email address to use as sender.
from_name: Display name for sender (optional).
base_url: Base URL for constructing verification/reset links.
verification_template: HTML template for verification emails.
password_reset_template: HTML template for password reset emails.
"""
def __init__(
self,
host: str,
port: int,
username: str,
password: str,
from_email: str,
base_url: str,
use_tls: bool = True,
from_name: Optional[str] = None,
verification_template: Optional[str] = None,
password_reset_template: Optional[str] = None
):
"""
Initialize SMTP email service.
Args:
host: SMTP server hostname (e.g., "smtp.gmail.com").
port: SMTP server port (587 for TLS, 465 for SSL).
username: SMTP authentication username.
password: SMTP authentication password.
from_email: Email address to use as sender.
base_url: Base URL for your application (e.g., "https://myapp.com").
use_tls: Whether to use TLS encryption (default: True).
from_name: Display name for sender (default: None).
verification_template: Custom HTML template for verification emails
(must include {verification_link} placeholder).
password_reset_template: Custom HTML template for reset emails
(must include {reset_link} placeholder).
Example:
>>> email_service = SMTPEmailService(
... host="smtp.gmail.com",
... port=587,
... username="noreply@myapp.com",
... password="app_password",
... from_email="noreply@myapp.com",
... base_url="https://myapp.com",
... from_name="My Application"
... )
"""
self.host = host
self.port = port
self.username = username
self.password = password
self.use_tls = use_tls
self.from_email = from_email
self.from_name = from_name
self.base_url = base_url.rstrip('/')
# Use custom templates or defaults
self.verification_template = verification_template or DEFAULT_VERIFICATION_TEMPLATE
self.password_reset_template = password_reset_template or DEFAULT_PASSWORD_RESET_TEMPLATE
def _send_email(self, to_email: str, subject: str, html_content: str) -> None:
"""
Send an email via SMTP.
Internal method that handles the actual SMTP connection and sending.
Args:
to_email: Recipient email address.
subject: Email subject line.
html_content: HTML content of the email.
Raises:
smtplib.SMTPException: If email sending fails.
"""
# Create message
message = MIMEMultipart("alternative")
message["Subject"] = subject
message["From"] = f"{self.from_name} <{self.from_email}>" if self.from_name else self.from_email
message["To"] = to_email
# Attach HTML content
html_part = MIMEText(html_content, "html")
message.attach(html_part)
# Send email
if self.use_tls:
with smtplib.SMTP(self.host, self.port) as server:
server.starttls()
server.login(self.username, self.password)
server.send_message(message)
else:
with smtplib.SMTP_SSL(self.host, self.port) as server:
server.login(self.username, self.password)
server.send_message(message)
def send_verification_email(self, email: str, token: str) -> None:
"""
Send an email verification link to the user.
Constructs a verification link using the base_url and token,
then sends an email using the configured verification template.
Args:
email: The recipient's email address.
token: The verification token (JWT).
Raises:
smtplib.SMTPException: If email sending fails.
Example:
>>> email_service.send_verification_email(
... "user@example.com",
... "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9..."
... )
"""
verification_link = f"{self.base_url}/auth/verify-email?token={token}"
html_content = self.verification_template.format(verification_link=verification_link)
self._send_email(
to_email=email,
subject="Verify Your Email Address",
html_content=html_content
)
def send_password_reset_email(self, email: str, token: str) -> None:
"""
Send a password reset link to the user.
Constructs a password reset link using the base_url and token,
then sends an email using the configured password reset template.
Args:
email: The recipient's email address.
token: The password reset token.
Raises:
smtplib.SMTPException: If email sending fails.
Example:
>>> email_service.send_password_reset_email(
... "user@example.com",
... "a1b2c3d4e5f6..."
... )
"""
reset_link = f"{self.base_url}/reset-password?token={token}"
html_content = self.password_reset_template.format(reset_link=reset_link)
self._send_email(
to_email=email,
subject="Reset Your Password",
html_content=html_content
)

View File

@@ -162,7 +162,7 @@ class SQLiteUserRepository(UserRepository):
raise UserAlreadyExistsError(f"User with email {user_data.email} already exists")
user_id = str(uuid4())
now = datetime.utcnow()
now = datetime.now()
user = UserInDB(
id=user_id,
@@ -315,7 +315,7 @@ class SQLiteUserRepository(UserRepository):
# Always update the updated_at timestamp
update_fields.append("updated_at = ?")
update_values.append(datetime.utcnow().isoformat())
update_values.append(datetime.now().isoformat())
# Add user_id for WHERE clause
update_values.append(user_id)
@@ -574,7 +574,7 @@ class SQLiteTokenRepository(TokenRepository):
Returns:
Number of tokens deleted.
"""
now = datetime.utcnow().isoformat()
now = datetime.now().isoformat()
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()

0
tests/api/__init__.py Normal file
View File

View File

@@ -0,0 +1,515 @@
"""
Unit tests for FastAPI authentication routes.
This module tests all authentication API endpoints using FastAPI's TestClient
and mocked dependencies to ensure proper behavior and error handling.
"""
from datetime import datetime
from unittest.mock import Mock
import pytest
from fastapi import FastAPI
from fastapi.testclient import TestClient
from my_auth.api.routes import create_auth_router
from my_auth.core.auth import AuthService
from my_auth.exceptions import (
UserAlreadyExistsError,
InvalidCredentialsError,
UserNotFoundError,
InvalidTokenError,
ExpiredTokenError,
RevokedTokenError,
AccountDisabledError
)
from my_auth.models.token import AccessTokenResponse
from my_auth.models.user import UserInDB
@pytest.fixture
def mock_auth_service():
"""
Create a mock AuthService for testing.
Returns:
Mock AuthService instance with all methods mocked.
"""
service = Mock(spec=AuthService)
return service
@pytest.fixture
def test_app(mock_auth_service):
"""
Create a FastAPI test application with auth router.
Args:
mock_auth_service: Mocked authentication service.
Returns:
FastAPI application configured for testing.
"""
app = FastAPI()
auth_router = create_auth_router(mock_auth_service)
app.include_router(auth_router)
return app
@pytest.fixture
def client(test_app):
"""
Create a test client for the FastAPI application.
Args:
test_app: FastAPI test application.
Returns:
TestClient instance.
"""
return TestClient(test_app)
@pytest.fixture
def sample_user():
"""
Create a sample user for testing.
Returns:
UserInDB instance with sample data.
"""
return UserInDB(
id="user123",
email="test@example.com",
username="testuser",
hashed_password="hashed_password_here",
roles=["user"],
user_settings={},
is_verified=False,
is_active=True,
created_at=datetime.utcnow(),
updated_at=datetime.utcnow()
)
@pytest.fixture
def sample_tokens():
"""
Create sample access and refresh tokens.
Returns:
AccessTokenResponse with sample tokens.
"""
return AccessTokenResponse(
access_token="sample.access.token",
refresh_token="sample_refresh_token",
token_type="bearer"
)
def test_i_can_register_user(client, mock_auth_service, sample_user):
"""
Test successful user registration.
Verifies that a POST request to /auth/register with valid data
returns 201 status and the created user information.
"""
mock_auth_service.register.return_value = sample_user
response = client.post("/auth/register", json={
"email": "test@example.com",
"username": "testuser",
"password": "SecurePass123!",
"roles": ["user"],
"user_settings": {}
})
assert response.status_code == 201
data = response.json()
assert data["email"] == "test@example.com"
assert data["username"] == "testuser"
assert data["id"] == "user123"
assert "hashed_password" not in data
mock_auth_service.register.assert_called_once()
def test_i_cannot_register_with_existing_email(client, mock_auth_service):
"""
Test registration fails with existing email.
Verifies that attempting to register with an already registered
email returns 409 Conflict status.
"""
mock_auth_service.register.side_effect = UserAlreadyExistsError(
"User with this email already exists"
)
response = client.post("/auth/register", json={
"email": "existing@example.com",
"username": "testuser",
"password": "SecurePass123!",
"roles": [],
"user_settings": {}
})
assert response.status_code == 409
assert "already exists" in response.json()["detail"].lower()
def test_i_can_login(client, mock_auth_service, sample_user, sample_tokens):
"""
Test successful login.
Verifies that a POST request to /auth/login with valid credentials
returns access and refresh tokens.
"""
mock_auth_service.login.return_value = (sample_user, sample_tokens)
response = client.post("/auth/login", data={
"username": "test@example.com", # OAuth2 uses 'username' field
"password": "SecurePass123!"
})
assert response.status_code == 200
data = response.json()
assert data["access_token"] == "sample.access.token"
assert data["refresh_token"] == "sample_refresh_token"
assert data["token_type"] == "bearer"
mock_auth_service.login.assert_called_once_with("test@example.com", "SecurePass123!")
def test_i_cannot_login_with_invalid_credentials(client, mock_auth_service):
"""
Test login fails with invalid credentials.
Verifies that attempting to login with incorrect email or password
returns 401 Unauthorized status.
"""
mock_auth_service.login.side_effect = InvalidCredentialsError()
response = client.post("/auth/login", data={
"username": "test@example.com",
"password": "WrongPassword"
})
assert response.status_code == 401
assert "invalid" in response.json()["detail"].lower()
def test_i_cannot_login_with_disabled_account(client, mock_auth_service):
"""
Test login fails with disabled account.
Verifies that attempting to login to a disabled account
returns 403 Forbidden status.
"""
mock_auth_service.login.side_effect = AccountDisabledError()
response = client.post("/auth/login", data={
"username": "test@example.com",
"password": "SecurePass123!"
})
assert response.status_code == 403
assert "disabled" in response.json()["detail"].lower()
def test_i_can_refresh_token(client, mock_auth_service, sample_tokens):
"""
Test successful token refresh.
Verifies that a POST request to /auth/refresh with a valid refresh token
returns new access and refresh tokens.
"""
new_tokens = AccessTokenResponse(
access_token="new.access.token",
refresh_token="new_refresh_token",
token_type="bearer"
)
mock_auth_service.refresh_access_token.return_value = new_tokens
response = client.post("/auth/refresh", json={
"refresh_token": "sample_refresh_token"
})
assert response.status_code == 200
data = response.json()
assert data["access_token"] == "new.access.token"
assert data["refresh_token"] == "new_refresh_token"
mock_auth_service.refresh_access_token.assert_called_once_with("sample_refresh_token")
def test_i_cannot_refresh_with_invalid_token(client, mock_auth_service):
"""
Test token refresh fails with invalid token.
Verifies that attempting to refresh with an invalid token
returns 401 Unauthorized status.
"""
mock_auth_service.refresh_access_token.side_effect = InvalidTokenError(
"Invalid refresh token"
)
response = client.post("/auth/refresh", json={
"refresh_token": "invalid_token"
})
assert response.status_code == 401
assert "invalid" in response.json()["detail"].lower()
def test_i_cannot_refresh_with_expired_token(client, mock_auth_service):
"""
Test token refresh fails with expired token.
Verifies that attempting to refresh with an expired token
returns 401 Unauthorized status.
"""
mock_auth_service.refresh_access_token.side_effect = ExpiredTokenError()
response = client.post("/auth/refresh", json={
"refresh_token": "expired_token"
})
assert response.status_code == 401
assert "expired" in response.json()["detail"].lower()
def test_i_cannot_refresh_with_revoked_token(client, mock_auth_service):
"""
Test token refresh fails with revoked token.
Verifies that attempting to refresh with a revoked token
returns 401 Unauthorized status.
"""
mock_auth_service.refresh_access_token.side_effect = RevokedTokenError()
response = client.post("/auth/refresh", json={
"refresh_token": "revoked_token"
})
assert response.status_code == 401
assert "revoked" in response.json()["detail"].lower()
def test_i_can_logout(client, mock_auth_service):
"""
Test successful logout.
Verifies that a POST request to /auth/logout successfully
revokes the refresh token and returns 204 status.
"""
mock_auth_service.logout.return_value = True
response = client.post("/auth/logout", json={
"refresh_token": "sample_refresh_token"
})
assert response.status_code == 204
mock_auth_service.logout.assert_called_once_with("sample_refresh_token")
def test_i_can_request_password_reset(client, mock_auth_service):
"""
Test password reset request.
Verifies that a POST request to /auth/password-reset-request
generates a reset token for the given email.
"""
mock_auth_service.request_password_reset.return_value = "reset_token_123"
response = client.post("/auth/password-reset-request", json={
"email": "test@example.com"
})
assert response.status_code == 200
data = response.json()
assert "token" in data
assert data["token"] == "reset_token_123"
mock_auth_service.request_password_reset.assert_called_once_with("test@example.com")
def test_i_cannot_request_password_reset_for_unknown_email(client, mock_auth_service):
"""
Test password reset request fails for unknown email.
Verifies that requesting a password reset for a non-existent email
returns 404 Not Found status.
"""
mock_auth_service.request_password_reset.side_effect = UserNotFoundError(
"No user found with email"
)
response = client.post("/auth/password-reset-request", json={
"email": "unknown@example.com"
})
assert response.status_code == 404
def test_i_can_reset_password(client, mock_auth_service):
"""
Test successful password reset.
Verifies that a POST request to /auth/password-reset with valid
token and new password successfully resets the password.
"""
mock_auth_service.reset_password.return_value = True
response = client.post("/auth/password-reset", json={
"token": "reset_token_123",
"new_password": "NewSecurePass123!"
})
assert response.status_code == 200
data = response.json()
assert "success" in data["message"].lower()
mock_auth_service.reset_password.assert_called_once_with(
"reset_token_123",
"NewSecurePass123!"
)
def test_i_cannot_reset_password_with_invalid_token(client, mock_auth_service):
"""
Test password reset fails with invalid token.
Verifies that attempting to reset password with an invalid token
returns 401 Unauthorized status.
"""
mock_auth_service.reset_password.side_effect = InvalidTokenError(
"Invalid password reset token"
)
response = client.post("/auth/password-reset", json={
"token": "invalid_token",
"new_password": "NewSecurePass123!"
})
assert response.status_code == 401
def test_i_cannot_reset_password_with_expired_token(client, mock_auth_service):
"""
Test password reset fails with expired token.
Verifies that attempting to reset password with an expired token
returns 401 Unauthorized status.
"""
mock_auth_service.reset_password.side_effect = ExpiredTokenError(
"Password reset token has expired"
)
response = client.post("/auth/password-reset", json={
"token": "expired_token",
"new_password": "NewSecurePass123!"
})
assert response.status_code == 401
def test_i_can_request_email_verification(client, mock_auth_service):
"""
Test email verification request.
Verifies that a POST request to /auth/verify-email-request
generates a verification token for the given email.
"""
mock_auth_service.request_email_verification.return_value = "verify_token_jwt"
response = client.post("/auth/verify-email-request", json={
"email": "test@example.com"
})
assert response.status_code == 200
data = response.json()
assert "token" in data
assert data["token"] == "verify_token_jwt"
mock_auth_service.request_email_verification.assert_called_once_with("test@example.com")
def test_i_can_verify_email(client, mock_auth_service):
"""
Test successful email verification.
Verifies that a GET request to /auth/verify-email with a valid
token successfully verifies the email address.
"""
mock_auth_service.verify_email.return_value = True
response = client.get("/auth/verify-email?token=verify_token_jwt")
assert response.status_code == 200
data = response.json()
assert "success" in data["message"].lower()
mock_auth_service.verify_email.assert_called_once_with("verify_token_jwt")
def test_i_cannot_verify_email_with_invalid_token(client, mock_auth_service):
"""
Test email verification fails with invalid token.
Verifies that attempting to verify email with an invalid token
returns 401 Unauthorized status.
"""
mock_auth_service.verify_email.side_effect = InvalidTokenError(
"Invalid email verification token"
)
response = client.get("/auth/verify-email?token=invalid_token")
assert response.status_code == 401
def test_i_can_get_current_user(client, mock_auth_service, sample_user):
"""
Test retrieving current authenticated user.
Verifies that a GET request to /auth/me with a valid Bearer token
returns the current user's information.
"""
mock_auth_service.get_current_user.return_value = sample_user
response = client.get(
"/auth/me",
headers={"Authorization": "Bearer sample.access.token"}
)
assert response.status_code == 200
data = response.json()
assert data["email"] == "test@example.com"
assert data["username"] == "testuser"
assert data["id"] == "user123"
assert "hashed_password" not in data
mock_auth_service.get_current_user.assert_called_once_with("sample.access.token")
def test_i_cannot_access_protected_route_without_token(client):
"""
Test protected route fails without authentication token.
Verifies that attempting to access /auth/me without a Bearer token
returns 401 Unauthorized status.
"""
response = client.get("/auth/me")
assert response.status_code == 401
def test_i_cannot_access_protected_route_with_invalid_token(client, mock_auth_service):
"""
Test protected route fails with invalid token.
Verifies that attempting to access /auth/me with an invalid token
returns 401 Unauthorized status.
"""
mock_auth_service.get_current_user.side_effect = InvalidTokenError(
"Invalid access token"
)
response = client.get(
"/auth/me",
headers={"Authorization": "Bearer invalid.token"}
)
assert response.status_code == 401

View File

@@ -1,6 +1,7 @@
# tests/core/conftest.py
import shutil
from datetime import datetime, timedelta
from pathlib import Path
from unittest.mock import MagicMock
@@ -9,7 +10,7 @@ import pytest
from my_auth.core.password import PasswordManager
from my_auth.core.token import TokenManager
from src.my_auth.core.auth import AuthService
from src.my_auth.models.user import UserCreate
from src.my_auth.models.user import UserCreate, UserInDB
from src.my_auth.persistence.sqlite import SQLiteUserRepository, SQLiteTokenRepository
@@ -33,6 +34,23 @@ def test_user_hashed_password():
return "$2b$12$R.S/XfI2tQYt3Kk.iF1XwOQz0Qe.L0T0mD/O1H8E2V5D4Q6F7G8H9I0"
@pytest.fixture
def test_user_in_db() -> UserInDB:
"""Provides a basic UserInDB instance for testing."""
return UserInDB(
id="1",
email="test@example.com",
username="testuser",
hashed_password="some_hash",
is_active=True,
is_verified=True,
roles=['member'],
user_settings={},
created_at=datetime.now(),
updated_at=datetime.now()
)
@pytest.fixture()
def sqlite_db_path(tmp_path_factory):
"""
@@ -79,6 +97,7 @@ def mock_token_manager() -> TokenManager:
mock = MagicMock(spec=TokenManager)
mock.create_access_token.return_value = "MOCKED_ACCESS_TOKEN"
mock.create_refresh_token.return_value = "MOCKED_REFRESH_TOKEN"
mock.get_refresh_token_expiration.return_value = datetime.now() + timedelta(days=1)
return mock

View File

@@ -6,8 +6,9 @@ from unittest.mock import MagicMock, patch
import pytest
from src.my_auth.core.auth import AuthService
from src.my_auth.exceptions import UserAlreadyExistsError, InvalidCredentialsError, InvalidTokenError, ExpiredTokenError
from src.my_auth.models.token import TokenData
from src.my_auth.exceptions import UserAlreadyExistsError, InvalidCredentialsError, InvalidTokenError, \
ExpiredTokenError, RevokedTokenError
from src.my_auth.models.token import TokenData, TokenPayload
from src.my_auth.models.user import UserCreate, UserUpdate
@@ -140,15 +141,18 @@ class TestAuthServiceTokenManagement(object):
result = auth_service.logout(self.refresh_token)
assert result is True
with pytest.raises(InvalidTokenError):
with pytest.raises(RevokedTokenError):
auth_service.refresh_access_token(self.refresh_token)
def test_get_current_user_success(self, auth_service: AuthService):
"""Success: Getting the current user works by successfully decoding the JWT."""
# Mock the decoder to simulate a decoded payload
token_payload = TokenPayload(sub=self.user.id,
email=str(self.user.email),
exp=int(datetime.now().timestamp() * 1000))
with patch.object(auth_service.token_manager, 'decode_access_token',
return_value={"sub": self.user.id}) as mock_decode:
return_value=token_payload) as mock_decode:
user = auth_service.get_current_user("dummy_jwt")
assert user.id == self.user.id
@@ -168,103 +172,103 @@ class TestAuthServiceTokenManagement(object):
auth_service.get_current_user("expired_access_jwt")
class TestAuthServiceResetVerification(object):
"""Tests for password reset and email verification flows."""
@pytest.fixture(autouse=True)
def setup_user(self, auth_service: AuthService, test_user_data_create: UserCreate):
"""Sets up a registered user using a mock hash for speed."""
pm = auth_service.password_manager
original_hash = pm.hash_password.return_value
# Temporarily set hash for setup
pm.hash_password.return_value = "HASHED_PASS"
user = auth_service.register(test_user_data_create)
self.user = user
# Restore hash mock
pm.hash_password.return_value = original_hash
@patch('src.my_auth.core.email.send_email')
def test_request_password_reset_success(self, mock_send_email: MagicMock, auth_service: AuthService):
"""Success: Requesting a password reset generates a token and sends an email."""
tm = auth_service.token_manager
with patch.object(tm, 'create_password_reset_token',
return_value="MOCKED_RESET_TOKEN") as mock_create_token:
token_string = auth_service.request_password_reset(self.user.email)
assert token_string == "MOCKED_RESET_TOKEN"
mock_create_token.assert_called_once()
mock_send_email.assert_called_once()
def test_reset_password_success(self, auth_service: AuthService):
"""Success: Resetting the password works with a valid token."""
# Setup: Manually create a valid reset token
auth_service.token_repository.save_token(
TokenData(token="valid_reset_token", token_type="password_reset", user_id=self.user.id,
expires_at=datetime.now() + timedelta(minutes=10))
)
# Patch the PasswordManager instance to control the hash output
pm = auth_service.password_manager
with patch.object(pm, 'hash_password',
return_value="NEW_HASHED_PASSWORD_FOR_RESET") as mock_hash:
new_password = "NewPassword123!"
result = auth_service.reset_password("valid_reset_token", new_password)
assert result is True
mock_hash.assert_called_once_with(new_password)
# Verification: Check that user data was updated
updated_user = auth_service.user_repository.get_user_by_id(self.user.id)
assert updated_user.hashed_password == "NEW_HASHED_PASSWORD_FOR_RESET"
@patch('src.my_auth.core.email.send_email')
def test_request_email_verification_success(self, mock_send_email: MagicMock, auth_service: AuthService):
"""Success: Requesting verification generates a token and sends an email."""
tm = auth_service.token_manager
with patch.object(tm, 'create_email_verification_token',
return_value="MOCKED_JWT_VERIFY_TOKEN") as mock_create_token:
token_string = auth_service.request_email_verification(self.user.email)
assert token_string == "MOCKED_JWT_VERIFY_TOKEN"
mock_create_token.assert_called_once_with(self.user.email)
mock_send_email.assert_called_once()
def test_verify_email_success(self, auth_service: AuthService):
"""Success: Verification updates the user's status."""
# The token_manager is mocked in conftest, so we must access its real create method
# or rely on the mock's return value to get a token string to use in the call.
# Since we need a real token for the decode logic to pass, we need to bypass the mock here.
# We will temporarily use the real TokenManager to create a valid, decodable token.
# This requires an *unmocked* token manager instance, which is tricky in this setup.
# Alternative: Temporarily inject a real TokenManager for this test (or rely on a non-mocked method)
# Assuming TokenManager.create_email_verification_token can be mocked to return a static string
# and TokenManager.decode_email_verification_token can be patched to simulate success.
# Since the method calls decode_email_verification_token internally, we mock the output of the decode step.
# Setup: Ensure user is unverified
auth_service.user_repository.update_user(self.user.id, UserUpdate(is_verified=False))
tm = auth_service.token_manager
# Mock the decode step to ensure it returns the email used for verification
with patch.object(tm, 'decode_email_verification_token', return_value=self.user.email) as mock_decode:
# Test (we use a dummy token string as the decode step is mocked)
result = auth_service.verify_email("dummy_verification_token")
assert result is True
mock_decode.assert_called_once()
# Verification: User is verified
updated_user = auth_service.user_repository.get_user_by_id(self.user.id)
assert updated_user.is_verified is True
# class TestAuthServiceResetVerification(object):
# """Tests for password reset and email verification flows."""
#
# @pytest.fixture(autouse=True)
# def setup_user(self, auth_service: AuthService, test_user_data_create: UserCreate):
# """Sets up a registered user using a mock hash for speed."""
#
# pm = auth_service.password_manager
# original_hash = pm.hash_password.return_value
#
# # Temporarily set hash for setup
# pm.hash_password.return_value = "HASHED_PASS"
# user = auth_service.register(test_user_data_create)
# self.user = user
#
# # Restore hash mock
# pm.hash_password.return_value = original_hash
#
# @patch('src.my_auth.core.email.send_email')
# def test_request_password_reset_success(self, mock_send_email: MagicMock, auth_service: AuthService):
# """Success: Requesting a password reset generates a token and sends an email."""
#
# tm = auth_service.token_manager
# with patch.object(tm, 'create_password_reset_token',
# return_value="MOCKED_RESET_TOKEN") as mock_create_token:
# token_string = auth_service.request_password_reset(self.user.email)
#
# assert token_string == "MOCKED_RESET_TOKEN"
# mock_create_token.assert_called_once()
# mock_send_email.assert_called_once()
#
# def test_reset_password_success(self, auth_service: AuthService):
# """Success: Resetting the password works with a valid token."""
#
# # Setup: Manually create a valid reset token
# auth_service.token_repository.save_token(
# TokenData(token="valid_reset_token", token_type="password_reset", user_id=self.user.id,
# expires_at=datetime.now() + timedelta(minutes=10))
# )
#
# # Patch the PasswordManager instance to control the hash output
# pm = auth_service.password_manager
# with patch.object(pm, 'hash_password',
# return_value="NEW_HASHED_PASSWORD_FOR_RESET") as mock_hash:
# new_password = "NewPassword123!"
# result = auth_service.reset_password("valid_reset_token", new_password)
#
# assert result is True
# mock_hash.assert_called_once_with(new_password)
#
# # Verification: Check that user data was updated
# updated_user = auth_service.user_repository.get_user_by_id(self.user.id)
# assert updated_user.hashed_password == "NEW_HASHED_PASSWORD_FOR_RESET"
#
# @patch('src.my_auth.core.email.send_email')
# def test_request_email_verification_success(self, mock_send_email: MagicMock, auth_service: AuthService):
# """Success: Requesting verification generates a token and sends an email."""
#
# tm = auth_service.token_manager
# with patch.object(tm, 'create_email_verification_token',
# return_value="MOCKED_JWT_VERIFY_TOKEN") as mock_create_token:
# token_string = auth_service.request_email_verification(self.user.email)
#
# assert token_string == "MOCKED_JWT_VERIFY_TOKEN"
# mock_create_token.assert_called_once_with(self.user.email)
# mock_send_email.assert_called_once()
#
# def test_verify_email_success(self, auth_service: AuthService):
# """Success: Verification updates the user's status."""
#
# # The token_manager is mocked in conftest, so we must access its real create method
# # or rely on the mock's return value to get a token string to use in the call.
# # Since we need a real token for the decode logic to pass, we need to bypass the mock here.
#
# # We will temporarily use the real TokenManager to create a valid, decodable token.
# # This requires an *unmocked* token manager instance, which is tricky in this setup.
#
# # Alternative: Temporarily inject a real TokenManager for this test (or rely on a non-mocked method)
#
# # Assuming TokenManager.create_email_verification_token can be mocked to return a static string
# # and TokenManager.decode_email_verification_token can be patched to simulate success.
#
# # Since the method calls decode_email_verification_token internally, we mock the output of the decode step.
#
# # Setup: Ensure user is unverified
# auth_service.user_repository.update_user(self.user.id, UserUpdate(is_verified=False))
#
# tm = auth_service.token_manager
#
# # Mock the decode step to ensure it returns the email used for verification
# with patch.object(tm, 'decode_email_verification_token', return_value=self.user.email) as mock_decode:
# # Test (we use a dummy token string as the decode step is mocked)
# result = auth_service.verify_email("dummy_verification_token")
#
# assert result is True
# mock_decode.assert_called_once()
#
# # Verification: User is verified
# updated_user = auth_service.user_repository.get_user_by_id(self.user.id)
# assert updated_user.is_verified is True

View File

@@ -0,0 +1,224 @@
# tests/core/test_token_manager.py
from datetime import datetime, timedelta
from unittest.mock import MagicMock, patch
import pytest
from jose import jwt
from src.my_auth.core.token import TokenManager
from src.my_auth.exceptions import InvalidTokenError, ExpiredTokenError
from src.my_auth.models.user import UserInDB # Assuming you have a fixture for this
@pytest.fixture
def token_manager():
"""Provides a TokenManager instance with known, short expiration times for testing."""
return TokenManager(
jwt_secret="TEST_SECRET_KEY",
jwt_algorithm="HS256",
access_token_expire_minutes=1,
refresh_token_expire_days=7,
password_reset_token_expire_minutes=15
)
class TestTokenManagerInitialization:
"""Tests for TokenManager setup and configuration."""
def test_init_success(self):
"""Should initialize successfully with required parameters."""
tm = TokenManager(jwt_secret="MySecret")
assert tm.jwt_secret == "MySecret"
assert tm.jwt_algorithm == "HS256"
assert tm.access_token_expire_minutes == 30
assert tm.refresh_token_expire_days == 7
assert tm.password_reset_token_expire_minutes == 15
def test_init_failure_empty_secret(self):
"""Should raise ValueError if JWT secret is empty."""
with pytest.raises(ValueError, match="JWT secret cannot be empty"):
TokenManager(jwt_secret="")
class TestTokenCreation:
"""Tests creation methods for different token types."""
def test_create_access_token_format_and_expiration(self, token_manager: TokenManager, test_user_in_db: UserInDB):
"""Should create a valid JWT with correct payload and expiration."""
token = token_manager.create_access_token(test_user_in_db)
# 1. Assert token is a string (encoded)
assert isinstance(token, str)
# 2. Decode and check payload content
payload = jwt.decode(token, token_manager.jwt_secret, algorithms=[token_manager.jwt_algorithm])
assert payload["sub"] == test_user_in_db.id
assert payload["email"] == test_user_in_db.email
assert payload["type"] == "access"
# 3. Check expiration (should be within a small window of the expected time)
now = datetime.now()
expected_exp_dt = now + timedelta(minutes=token_manager.access_token_expire_minutes)
# Check if expiration is within +/- 1 second of the expected value
assert abs(payload["exp"] - int(expected_exp_dt.timestamp())) <= 1
def test_create_refresh_token_format(self, token_manager: TokenManager):
"""Should create a random hex string of length 64."""
token = token_manager.create_refresh_token()
assert isinstance(token, str)
assert len(token) == 64
assert all(c in '0123456789abcdef' for c in token)
def test_create_password_reset_token_format(self, token_manager: TokenManager):
"""Should create a random hex string of length 64."""
token = token_manager.create_password_reset_token()
assert isinstance(token, str)
assert len(token) == 64
def test_create_email_verification_token_format(self, token_manager: TokenManager):
"""Should create a JWT with email and 'email_verification' type."""
email = "verify@example.com"
token = token_manager.create_email_verification_token(email)
# Decode and check payload content
payload = jwt.decode(token, token_manager.jwt_secret, algorithms=[token_manager.jwt_algorithm])
assert payload["email"] == email
assert payload["type"] == "email_verification"
# Expiration check (set to 7 days in the implementation)
now = datetime.now()
expected_exp_dt = now + timedelta(days=7)
assert abs(payload["exp"] - int(expected_exp_dt.timestamp())) <= 1
class TestTokenExpirationCalculations:
"""Tests for token expiration date methods."""
# We patch datetime.now() to ensure stable calculations
@patch('src.my_auth.core.token.datetime')
def test_get_refresh_token_expiration(self, mock_datetime, token_manager: TokenManager):
"""Should calculate refresh token expiration correctly."""
# Set a fixed starting time
start_time = datetime(2025, 1, 1, 10, 0, 0)
mock_datetime.now = MagicMock(return_value=start_time)
expected_exp = start_time + timedelta(days=token_manager.refresh_token_expire_days)
actual_exp = token_manager.get_refresh_token_expiration()
assert actual_exp == expected_exp
@patch('src.my_auth.core.token.datetime')
def test_get_password_reset_token_expiration(self, mock_datetime, token_manager: TokenManager):
"""Should calculate password reset token expiration correctly."""
start_time = datetime(2025, 1, 1, 10, 0, 0)
mock_datetime.now = MagicMock(return_value=start_time)
expected_exp = start_time + timedelta(minutes=token_manager.password_reset_token_expire_minutes)
actual_exp = token_manager.get_password_reset_token_expiration()
assert actual_exp == expected_exp
class TestTokenDecodingAndValidation:
"""Tests decoding and validation logic for JWT tokens."""
# --- Access Token Tests ---
def test_decode_access_token_success(self, token_manager: TokenManager, test_user_in_db: UserInDB):
"""Should successfully decode a valid access token."""
token = token_manager.create_access_token(test_user_in_db)
payload = token_manager.decode_access_token(token)
assert payload.sub == test_user_in_db.id
assert payload.email == test_user_in_db.email
assert payload.type == "access"
def test_i_cannot_decode_expired_access_token(self, token_manager: TokenManager, test_user_in_db: UserInDB):
"""
Should raise ExpiredTokenError when decoding an expired token.
"""
from jose import jwt
from datetime import datetime, timedelta
# Create an already expired token (1 hour ago)
expired_time = datetime.now() - timedelta(hours=1)
payload = {
"sub": test_user_in_db.id,
"email": test_user_in_db.email,
"exp": int(expired_time.timestamp()),
"type": "access"
}
expired_token = jwt.encode(
payload,
token_manager.jwt_secret,
algorithm=token_manager.jwt_algorithm
)
# Should raise ExpiredTokenError
with pytest.raises(ExpiredTokenError, match="Access token has expired"):
token_manager.decode_access_token(expired_token)
def test_decode_access_token_invalid_signature(self, token_manager: TokenManager, test_user_in_db: UserInDB):
"""Should raise InvalidTokenError if the signature is bad."""
token = token_manager.create_access_token(test_user_in_db)
# Flip the last character to invalidate the signature
invalid_token = token[:-1] + ('A' if token[-1] != 'A' else 'B')
with pytest.raises(InvalidTokenError, match="Invalid access token"):
token_manager.decode_access_token(invalid_token)
def test_decode_access_token_wrong_type(self, token_manager: TokenManager):
"""Should raise InvalidTokenError if token is not 'access' type."""
# Create an email verification token, but try to decode it as an access token
wrong_token = token_manager.create_email_verification_token("wrong@type.com")
with pytest.raises(InvalidTokenError, match="Invalid token type"):
token_manager.decode_access_token(wrong_token)
# --- Email Verification Token Tests ---
def test_decode_email_verification_token_success(self, token_manager: TokenManager):
"""Should successfully decode a valid email verification token."""
email = "valid_email@test.com"
token = token_manager.create_email_verification_token(email)
decoded_email = token_manager.decode_email_verification_token(token)
assert decoded_email == email
def test_decode_email_verification_token_expired(self, token_manager: TokenManager):
"""Should raise ExpiredTokenError if the token is old (7 days set in creation)."""
# This test requires mocking time, but given the 7-day expiration,
# we can simulate an expired token by manually encoding one.
# Manually encode an expired token
expired_payload = {
"email": "old@example.com",
"exp": int((datetime.now() - timedelta(days=1)).timestamp()), # Expired yesterday
"type": "email_verification"
}
expired_token = jwt.encode(expired_payload, token_manager.jwt_secret, algorithm=token_manager.jwt_algorithm)
with pytest.raises(ExpiredTokenError, match="Email verification token has expired"):
token_manager.decode_email_verification_token(expired_token)
def test_decode_email_verification_token_wrong_type(self, token_manager: TokenManager, test_user_in_db: UserInDB):
"""Should raise InvalidTokenError if token is not 'email_verification' type."""
# Create an access token, but try to decode it as an email token
wrong_token = token_manager.create_access_token(test_user_in_db)
with pytest.raises(InvalidTokenError, match="Invalid token type"):
token_manager.decode_email_verification_token(wrong_token)