diff --git a/.idea/.gitignore b/.idea/.gitignore
new file mode 100644
index 0000000..1c2fda5
--- /dev/null
+++ b/.idea/.gitignore
@@ -0,0 +1,8 @@
+# Default ignored files
+/shelf/
+/workspace.xml
+# Editor-based HTTP Client requests
+/httpRequests/
+# Datasource local storage ignored files
+/dataSources/
+/dataSources.local.xml
diff --git a/.idea/MyAuth.iml b/.idea/MyAuth.iml
new file mode 100644
index 0000000..77f5ba4
--- /dev/null
+++ b/.idea/MyAuth.iml
@@ -0,0 +1,11 @@
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/inspectionProfiles/Project_Default.xml b/.idea/inspectionProfiles/Project_Default.xml
new file mode 100644
index 0000000..1658d1d
--- /dev/null
+++ b/.idea/inspectionProfiles/Project_Default.xml
@@ -0,0 +1,14 @@
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/inspectionProfiles/profiles_settings.xml b/.idea/inspectionProfiles/profiles_settings.xml
new file mode 100644
index 0000000..105ce2d
--- /dev/null
+++ b/.idea/inspectionProfiles/profiles_settings.xml
@@ -0,0 +1,6 @@
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/misc.xml b/.idea/misc.xml
new file mode 100644
index 0000000..c9d4a6e
--- /dev/null
+++ b/.idea/misc.xml
@@ -0,0 +1,7 @@
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/modules.xml b/.idea/modules.xml
new file mode 100644
index 0000000..2084270
--- /dev/null
+++ b/.idea/modules.xml
@@ -0,0 +1,8 @@
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/vcs.xml b/.idea/vcs.xml
new file mode 100644
index 0000000..9661ac7
--- /dev/null
+++ b/.idea/vcs.xml
@@ -0,0 +1,6 @@
+
+
+
+
+
+
\ No newline at end of file
diff --git a/LICENCE b/LICENCE
new file mode 100644
index 0000000..9333491
--- /dev/null
+++ b/LICENCE
@@ -0,0 +1,21 @@
+MIT License
+
+Copyright (c) 2025 Kodjo Sossouvi
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+SOFTWARE.
\ No newline at end of file
diff --git a/README-dev.md b/README-dev.md
new file mode 100644
index 0000000..7ece33f
--- /dev/null
+++ b/README-dev.md
@@ -0,0 +1,285 @@
+# MyAuth Module
+
+A reusable, modular authentication system for FastAPI applications with pluggable database backends.
+
+## Overview
+
+This module provides a complete authentication solution designed to be deployed on internal PyPI servers and reused across multiple projects. It handles user registration, login/logout, token management, email verification, and password reset functionality.
+
+## Features
+
+### Core Authentication
+- ✅ User registration with email and username
+- ✅ Login/logout with email-based authentication
+- ✅ JWT-based access tokens (30 minutes validity)
+- ✅ Opaque refresh tokens stored in database (7 days validity)
+- ✅ Password hashing with configurable bcrypt rounds
+
+### User Management
+- ✅ Email verification with JWT tokens
+- ✅ Password reset with secure random tokens (15 minutes validity)
+- ✅ User roles management (flexible, no predefined roles)
+- ✅ User settings storage (dict field)
+- ✅ Account activation/deactivation
+
+### Password Security
+- ✅ Strict password validation (via Pydantic):
+ - Minimum 8 characters
+ - At least 1 uppercase letter
+ - At least 1 lowercase letter
+ - At least 1 digit
+ - At least 1 special character
+
+### Architecture
+- ✅ Abstract base classes for database persistence
+- ✅ Multiple database implementations: MongoDB, SQLite, PostgreSQL
+- ✅ Abstract email service interface with optional SMTP implementation
+- ✅ Custom exceptions with FastAPI integration
+- ✅ Synchronous implementation
+- ✅ Ready-to-use FastAPI router with `/auth` prefix
+
+## Project Structure
+
+```
+├──src
+│ my_auth/
+│ ├── __init__.py
+│ ├── models/ # Pydantic models
+│ │ ├── user.py # User model with roles and settings
+│ │ ├── token.py # Token models (access, refresh, reset)
+│ │ └── email_verification.py # Email verification models
+│ ├── core/ # Business logic
+│ │ ├── auth.py # Authentication service
+│ │ ├── password.py # Password hashing/verification
+│ │ └── token.py # Token generation/validation
+│ ├── persistence/ # Database abstraction
+│ │ ├── base.py # Abstract base classes
+│ │ ├── mongodb.py # MongoDB implementation
+│ │ ├── sqlite.py # SQLite implementation
+│ │ └── postgresql.py # PostgreSQL implementation
+│ ├── api/ # FastAPI routes
+│ │ └── routes.py # All authentication endpoints
+│ ├── email/ # Email service
+│ │ ├── base.py # Abstract interface
+│ │ └── smtp.py # SMTP implementation (optional)
+│ ├── exceptions.py # Custom exceptions
+│ └── config.py # Configuration classes
+├── tests
+```
+
+## User Model
+
+```python
+class User:
+ id: str # Unique identifier
+ email: str # Unique, required
+ username: str # Required, non-unique
+ hashed_password: str # Bcrypt hashed
+ roles: list[str] # Free-form roles, no defaults
+ user_settings: dict # Custom user settings
+ is_verified: bool # Email verification status
+ is_active: bool # Account active status
+ created_at: datetime
+ updated_at: datetime
+```
+
+## Token Management
+
+### Token Types
+The module uses a unified tokens collection with a discriminator field:
+
+1. **Access Token (JWT)**: 30 minutes validity, stateless
+2. **Refresh Token (Opaque)**: 7 days validity, stored in DB
+3. **Password Reset Token (Random)**: 15 minutes validity, stored in DB
+4. **Email Verification Token (JWT)**: Stateless, no DB storage
+
+### Token Storage
+All tokens requiring storage (refresh and password reset) are kept in a single `tokens` collection/table with a `token_type` discriminator field.
+
+## API Endpoints
+
+The module exposes a pre-configured FastAPI router with the following endpoints:
+
+```
+POST /auth/register # User registration
+POST /auth/login # Login (email + password)
+POST /auth/logout # Logout (revokes refresh token)
+POST /auth/refresh # Refresh access token
+POST /auth/password-reset-request # Request password reset
+POST /auth/password-reset # Reset password with token
+POST /auth/verify-email-request # Request email verification
+POST /auth/verify-email # Verify email with token
+GET /auth/me # Get current user info
+```
+
+## Installation
+
+### Dependencies
+
+**Core dependencies:**
+```bash
+pip install fastapi pydantic pydantic-settings python-jose[cryptography] passlib[bcrypt] python-multipart
+```
+
+**Database-specific dependencies:**
+- MongoDB: `pip install pymongo`
+- SQLite: Built-in (no additional dependency)
+- PostgreSQL: `pip install psycopg2-binary`
+
+**Optional email dependency:**
+- SMTP: `pip install secure-smtplib`
+
+## Usage
+
+### Basic Setup
+
+```python
+from fastapi import FastAPI
+from auth_module import AuthService
+from auth_module.persistence.mongodb import MongoUserRepository, MongoTokenRepository
+from auth_module.api import auth_router
+
+# Initialize repositories
+user_repo = MongoUserRepository(connection_string="mongodb://localhost:27017/mydb")
+token_repo = MongoTokenRepository(connection_string="mongodb://localhost:27017/mydb")
+
+# Initialize auth service
+auth_service = AuthService(
+ user_repository=user_repo,
+ token_repository=token_repo,
+ jwt_secret="your-secret-key-here",
+ access_token_expire_minutes=30,
+ refresh_token_expire_days=7,
+ password_reset_token_expire_minutes=15,
+ password_hash_rounds=12
+)
+
+# Create FastAPI app and include auth router
+app = FastAPI()
+app.include_router(auth_router) # Mounts at /auth prefix
+```
+
+### Using Different Databases
+
+#### SQLite
+```python
+from auth_module.persistence.sqlite import SQLiteUserRepository, SQLiteTokenRepository
+
+user_repo = SQLiteUserRepository(db_path="./auth.db")
+token_repo = SQLiteTokenRepository(db_path="./auth.db")
+```
+
+#### PostgreSQL
+```python
+from auth_module.persistence.postgresql import PostgreSQLUserRepository, PostgreSQLTokenRepository
+
+user_repo = PostgreSQLUserRepository(
+ host="localhost",
+ port=5432,
+ database="mydb",
+ user="postgres",
+ password="secret"
+)
+token_repo = PostgreSQLTokenRepository(...)
+```
+
+### Email Service Configuration
+
+```python
+from auth_module.email.smtp import SMTPEmailService
+
+email_service = SMTPEmailService(
+ host="smtp.gmail.com",
+ port=587,
+ username="your-email@gmail.com",
+ password="your-app-password",
+ use_tls=True
+)
+
+auth_service = AuthService(
+ user_repository=user_repo,
+ token_repository=token_repo,
+ email_service=email_service, # Optional
+ ...
+)
+```
+
+### Custom Email Service
+
+Implement your own email service by extending the abstract base class:
+
+```python
+from auth_module.email.base import EmailService
+
+class CustomEmailService(EmailService):
+ def send_verification_email(self, email: str, token: str) -> None:
+ # Your implementation (SendGrid, AWS SES, etc.)
+ pass
+
+ def send_password_reset_email(self, email: str, token: str) -> None:
+ # Your implementation
+ pass
+```
+
+## Error Handling
+
+The module uses custom exceptions that are automatically converted to appropriate HTTP responses:
+
+- `InvalidCredentialsError` → 401 Unauthorized
+- `UserAlreadyExistsError` → 409 Conflict
+- `UserNotFoundError` → 404 Not Found
+- `InvalidTokenError` → 401 Unauthorized
+- `RevokedTokenError` → 401 Unauthorized
+- `ExpiredTokenError` → 401 Unauthorized
+- `EmailNotVerifiedError` → 403 Forbidden
+- `AccountDisabledError` → 403 Forbidden
+
+## Configuration Options
+
+```python
+AuthService(
+ user_repository: UserRepository, # Required
+ token_repository: TokenRepository, # Required
+ jwt_secret: str, # Required
+ jwt_algorithm: str = "HS256", # Optional
+ access_token_expire_minutes: int = 30, # Optional
+ refresh_token_expire_days: int = 7, # Optional
+ password_reset_token_expire_minutes: int = 15, # Optional
+ password_hash_rounds: int = 12, # Optional (bcrypt cost)
+ email_service: EmailService = None # Optional
+)
+```
+
+## Testing
+
+The module is fully testable with pytest. Test fixtures are provided for each database implementation.
+
+```bash
+pytest tests/
+```
+
+## Security Considerations
+
+- Passwords are hashed using bcrypt with configurable rounds
+- JWT tokens are signed with HS256 (configurable)
+- Refresh tokens are opaque and stored securely
+- Password reset tokens are single-use and expire after 15 minutes
+- Email verification tokens are stateless JWT
+- Rate limiting should be implemented at the application level
+- HTTPS should be enforced by the application
+
+## Future Enhancements (Not Included)
+
+- Multi-factor authentication (2FA/MFA)
+- Rate limiting on login attempts
+- OAuth2 provider integration
+- Session management (multiple device tracking)
+- Account lockout after failed attempts
+
+## License
+
+[Your License Here]
+
+## Contributing
+
+[Your Contributing Guidelines Here]
\ No newline at end of file
diff --git a/README.md b/README.md
index 7ece33f..28577e7 100644
--- a/README.md
+++ b/README.md
@@ -1,104 +1,281 @@
# MyAuth Module
-A reusable, modular authentication system for FastAPI applications with pluggable database backends.
+A reusable, modular authentication system for FastAPI applications, designed for deployment on an internal PyPI server.
## Overview
-This module provides a complete authentication solution designed to be deployed on internal PyPI servers and reused across multiple projects. It handles user registration, login/logout, token management, email verification, and password reset functionality.
+MyAuth provides a complete, 'out-of-the-box' authentication solution for your FastAPI projects. It's designed to be
+installed as a dependency and configured in just a few lines, letting you focus on your business logic instead of user
+and token management.
+
+This module handles registration, login/logout, email verification, password reset, and token management (JWT & Refresh)
+with pluggable database backends.
## Features
-### Core Authentication
-- ✅ User registration with email and username
-- ✅ Login/logout with email-based authentication
-- ✅ JWT-based access tokens (30 minutes validity)
-- ✅ Opaque refresh tokens stored in database (7 days validity)
-- ✅ Password hashing with configurable bcrypt rounds
+* **Complete Authentication:** User registration, email-based login, logout.
+* **Token Management:**
+ * **JWT Access Tokens** (default: 30 min validity).
+ * **Opaque Refresh Tokens** securely stored in the database (default: 7 days validity).
+* **User Lifecycle:**
+ * Email verification (via JWT token).
+ * Secure password reset (via secure token stored in DB).
+ * Account activation / deactivation.
+* **Security:**
+ * Password hashing with `bcrypt` (configurable rounds).
+ * Strict password validation (uppercase, lowercase, digit, special character).
+* **Flexible Architecture:**
+ * **Pluggable Backends:** Supports MongoDB, PostgreSQL, and SQLite out of the box.
+ * **Abstract Email Service:** Use the built-in SMTP implementation or plug in your own (e.g., SendGrid, AWS SES).
+* **FastAPI Integration:**
+ * A ready-to-use `APIRouter` that exposes all necessary endpoints under the `/auth` prefix.
+ * Automatic handling of custom exceptions into correct HTTP responses.
-### User Management
-- ✅ Email verification with JWT tokens
-- ✅ Password reset with secure random tokens (15 minutes validity)
-- ✅ User roles management (flexible, no predefined roles)
-- ✅ User settings storage (dict field)
-- ✅ Account activation/deactivation
+---
-### Password Security
-- ✅ Strict password validation (via Pydantic):
- - Minimum 8 characters
- - At least 1 uppercase letter
- - At least 1 lowercase letter
- - At least 1 digit
- - At least 1 special character
+## Installation
-### Architecture
-- ✅ Abstract base classes for database persistence
-- ✅ Multiple database implementations: MongoDB, SQLite, PostgreSQL
-- ✅ Abstract email service interface with optional SMTP implementation
-- ✅ Custom exceptions with FastAPI integration
-- ✅ Synchronous implementation
-- ✅ Ready-to-use FastAPI router with `/auth` prefix
+Install the base module and choose your "extras" based on your infrastructure.
-## Project Structure
+```bash
+# Base installation (core logic, no DB or email drivers)
+pip install myauth --index-url [http://your-pypiserver.com/](http://your-pypiserver.com/)
-```
-├──src
-│ my_auth/
-│ ├── __init__.py
-│ ├── models/ # Pydantic models
-│ │ ├── user.py # User model with roles and settings
-│ │ ├── token.py # Token models (access, refresh, reset)
-│ │ └── email_verification.py # Email verification models
-│ ├── core/ # Business logic
-│ │ ├── auth.py # Authentication service
-│ │ ├── password.py # Password hashing/verification
-│ │ └── token.py # Token generation/validation
-│ ├── persistence/ # Database abstraction
-│ │ ├── base.py # Abstract base classes
-│ │ ├── mongodb.py # MongoDB implementation
-│ │ ├── sqlite.py # SQLite implementation
-│ │ └── postgresql.py # PostgreSQL implementation
-│ ├── api/ # FastAPI routes
-│ │ └── routes.py # All authentication endpoints
-│ ├── email/ # Email service
-│ │ ├── base.py # Abstract interface
-│ │ └── smtp.py # SMTP implementation (optional)
-│ ├── exceptions.py # Custom exceptions
-│ └── config.py # Configuration classes
-├── tests
+# --- Installation with Extras (Recommended) ---
+
+# To use with MongoDB
+pip install "myauth[mongodb]" --index-url [http://your-pypiserver.com/](http://your-pypiserver.com/)
+
+# To use with PostgreSQL
+pip install "myauth[postgresql]" --index-url [http://your-pypiserver.com/](http://your-pypiserver.com/)
+
+# To use the built-in SMTP email service
+pip install "myauth[email]" --index-url [http://your-pypiserver.com/](http://your-pypiserver.com/)
+
+# To combine extras (e.g., PostgreSQL + Email)
+pip install "myauth[postgresql,email]" --index-url [http://your-pypiserver.com/](http://your-pypiserver.com/)
```
-## User Model
+### Core Dependencies
-```python
-class User:
- id: str # Unique identifier
- email: str # Unique, required
- username: str # Required, non-unique
- hashed_password: str # Bcrypt hashed
- roles: list[str] # Free-form roles, no defaults
- user_settings: dict # Custom user settings
- is_verified: bool # Email verification status
- is_active: bool # Account active status
- created_at: datetime
- updated_at: datetime
+The core module requires: fastapi, pydantic, pydantic-settings, python-jose[cryptography], passlib[bcrypt],
+python-multipart
+
+## Quick Start
+
+To get started, choose one of the database options below. Each example is complete and self-contained: it sets up the
+FastAPI app, the authentication service, and the related routes.
+
+Copy-paste the example that matches your infrastructure into your main.py.
+
+### Option 1: Quick Start with MongoDB
+
+This example configures myauth to use MongoDB as its backend.
+
+```Python
+
+from fastapi import FastAPI
+from my_auth import AuthService
+from my_auth.api import auth_router
+from my_auth.persistence.mongodb import MongoUserRepository, MongoTokenRepository
+
+# 1. Initialize FastAPI app
+app = FastAPI()
+
+# 2. Configure repositories for MongoDB
+# Make sure your connection string is correct
+user_repo = MongoUserRepository(connection_string="mongodb://localhost:27017/myappdb")
+token_repo = MongoTokenRepository(connection_string="mongodb://localhost:27017/myappdb")
+
+# 3. Configure the Authentication Service
+# IMPORTANT: Change this to a long, random, secret string
+auth_service = AuthService(
+ user_repository=user_repo,
+ token_repository=token_repo,
+ jwt_secret="YOUR_SUPER_LONG_AND_SECURE_JWT_SECRET_HERE"
+ # email_service will be added in the next step
+)
+
+# 4. Include the authentication routes
+# Endpoints like /auth/login, /auth/register are now active
+app.include_router(auth_router)
+
+
+@app.get("/")
+def read_root():
+ return {"message": "Application running with MyAuth (MongoDB)"}
```
-## Token Management
+### Option 2: Quick Start with PostgreSQL
-### Token Types
-The module uses a unified tokens collection with a discriminator field:
+This example configures myauth to use PostgreSQL as its backend.
-1. **Access Token (JWT)**: 30 minutes validity, stateless
-2. **Refresh Token (Opaque)**: 7 days validity, stored in DB
-3. **Password Reset Token (Random)**: 15 minutes validity, stored in DB
-4. **Email Verification Token (JWT)**: Stateless, no DB storage
+```Python
-### Token Storage
-All tokens requiring storage (refresh and password reset) are kept in a single `tokens` collection/table with a `token_type` discriminator field.
+from fastapi import FastAPI
+from my_auth import AuthService
+from my_auth.api import auth_router
+from my_auth.persistence.postgresql import PostgreSQLUserRepository, PostgreSQLTokenRepository
-## API Endpoints
+# 1. Initialize FastAPI app
+app = FastAPI()
-The module exposes a pre-configured FastAPI router with the following endpoints:
+# 2. Configure repositories for PostgreSQL
+# Update with your database credentials
+db_config = {
+ "host": "localhost",
+ "port": 5432,
+ "database": "mydb",
+ "user": "postgres",
+ "password": "secret"
+}
+user_repo = PostgreSQLUserRepository(**db_config)
+token_repo = PostgreSQLTokenRepository(**db_config)
+
+# 3. Configure the Authentication Service
+# IMPORTANT: Change this to a long, random, secret string
+auth_service = AuthService(
+ user_repository=user_repo,
+ token_repository=token_repo,
+ jwt_secret="YOUR_SUPER_LONG_AND_SECURE_JWT_SECRET_HERE"
+ # email_service will be added in the next step
+)
+
+# 4. Include the authentication routes
+# Endpoints like /auth/login, /auth/register are now active
+app.include_router(auth_router)
+
+
+@app.get("/")
+def read_root():
+ return {"message": "Application running with MyAuth (PostgreSQL)"}
+```
+
+### Option 3: Quick Start with SQLite
+
+This example configures myauth to use SQLite, which is ideal for development or small applications.
+
+```Python
+
+from fastapi import FastAPI
+from my_auth import AuthService
+from my_auth.api import auth_router
+from my_auth.persistence.sqlite import SQLiteUserRepository, SQLiteTokenRepository
+
+# 1. Initialize FastAPI app
+app = FastAPI()
+
+# 2. Configure repositories for SQLite
+# This will create/use a file named 'auth.db' in the current directory
+db_path = "./auth.db"
+user_repo = SQLiteUserRepository(db_path=db_path)
+token_repo = SQLiteTokenRepository(db_path=db_path)
+
+# 3. Configure the Authentication Service
+# IMPORTANT: Change this to a long, random, secret string
+auth_service = AuthService(
+ user_repository=user_repo,
+ token_repository=token_repo,
+ jwt_secret="YOUR_SUPER_LONG_AND_SECURE_JWT_SECRET_HERE"
+ # email_service will be added in the next step
+)
+
+# 4. Include the authentication routes
+# Endpoints like /auth/login, /auth/register are now active
+app.include_router(auth_router)
+
+
+@app.get("/")
+def read_root():
+ return {"message": "Application running with MyAuth (SQLite)"}
+```
+
+## Next Step: Configure the Email Service
+
+For email verification (/auth/verify-email) and password resets (/auth/password-reset) to work, you must provide an
+email service to the AuthService.
+
+Simply modify the AuthService initialization from the Quick Start step you chose.
+
+### Option 1: Use the Built-in SMTP Service
+
+This is the simplest option if you have an SMTP server (like Gmail, SendGrid SMTP, etc.). Remember to install the extra:
+pip install "myauth[email]"
+
+```Python
+
+# ... (keep your app and repository config from the Quick Start)
+
+from my_auth.email.smtp import SMTPEmailService
+
+# 1. Configure the email service
+email_service = SMTPEmailService(
+ host="smtp.gmail.com",
+ port=587,
+ username="your-email@gmail.com",
+ password="your-app-password", # Use an 'App Password' for Gmail
+ use_tls=True
+)
+
+# 2. Pass the email service to AuthService
+auth_service = AuthService(
+ user_repository=user_repo, # From Quick Start
+ token_repository=token_repo, # From Quick Start
+ email_service=email_service, # Add this line
+ jwt_secret="YOUR_SUPER_LONG_AND_SECURE_JWT_SECRET_HERE"
+)
+
+# ... (keep 'app.include_router(auth_router)')
+```
+
+### Option 2: Create a Custom Email Service
+
+If you use a third-party service (like AWS SES, Mailgun) that requires an API, you can implement your own.
+
+```Python
+
+# ... (keep your app and repository config from the Quick Start)
+
+from my_auth.email.base import EmailService
+
+
+# 1. Implement your custom email service
+class CustomEmailService(EmailService):
+ def __init__(self, api_key: str):
+ # self.api_key = api_key
+ # self.client = ThirdPartyClient(api_key=api_key)
+ print("Custom Email Service Initialized")
+
+ def send_verification_email(self, email: str, token: str) -> None:
+ # Your custom logic to send an email via an API
+ # verification_link = f"http://localhost:8000/verify?token={token}"
+ print(f"Sending VERIFICATION to {email} with token {token}")
+ pass
+
+ def send_password_reset_email(self, email: str, token: str) -> None:
+ # Your custom logic to send an email via an API
+ # reset_link = f"http://localhost:8000/reset-password?token={token}"
+ print(f"Sending PASSWORD RESET to {email} with token {token}")
+ pass
+
+
+# 2. Initialize your custom service
+email_service = CustomEmailService(api_key="YOUR_API_KEY_HERE")
+
+# 3. Pass your custom service to AuthService
+auth_service = AuthService(
+ user_repository=user_repo, # From Quick Start
+ token_repository=token_repo, # From Quick Start
+ email_service=email_service, # Add this line
+ jwt_secret="YOUR_SUPER_LONG_AND_SECURE_JWT_SECRET_HERE"
+)
+
+# ... (keep 'app.include_router(auth_router)')
+```
+
+## API Endpoints Reference
+
+The `auth_router` exposes the following endpoints under the `/auth` prefix:
```
POST /auth/register # User registration
@@ -112,174 +289,124 @@ POST /auth/verify-email # Verify email with token
GET /auth/me # Get current user info
```
-## Installation
+### Error Handling
-### Dependencies
+The module uses custom exceptions that are automatically converted to the appropriate HTTP responses by FastAPI:
-**Core dependencies:**
-```bash
-pip install fastapi pydantic pydantic-settings python-jose[cryptography] passlib[bcrypt] python-multipart
-```
-
-**Database-specific dependencies:**
-- MongoDB: `pip install pymongo`
-- SQLite: Built-in (no additional dependency)
-- PostgreSQL: `pip install psycopg2-binary`
-
-**Optional email dependency:**
-- SMTP: `pip install secure-smtplib`
-
-## Usage
-
-### Basic Setup
-
-```python
-from fastapi import FastAPI
-from auth_module import AuthService
-from auth_module.persistence.mongodb import MongoUserRepository, MongoTokenRepository
-from auth_module.api import auth_router
-
-# Initialize repositories
-user_repo = MongoUserRepository(connection_string="mongodb://localhost:27017/mydb")
-token_repo = MongoTokenRepository(connection_string="mongodb://localhost:27017/mydb")
-
-# Initialize auth service
-auth_service = AuthService(
- user_repository=user_repo,
- token_repository=token_repo,
- jwt_secret="your-secret-key-here",
- access_token_expire_minutes=30,
- refresh_token_expire_days=7,
- password_reset_token_expire_minutes=15,
- password_hash_rounds=12
-)
-
-# Create FastAPI app and include auth router
-app = FastAPI()
-app.include_router(auth_router) # Mounts at /auth prefix
-```
-
-### Using Different Databases
-
-#### SQLite
-```python
-from auth_module.persistence.sqlite import SQLiteUserRepository, SQLiteTokenRepository
-
-user_repo = SQLiteUserRepository(db_path="./auth.db")
-token_repo = SQLiteTokenRepository(db_path="./auth.db")
-```
-
-#### PostgreSQL
-```python
-from auth_module.persistence.postgresql import PostgreSQLUserRepository, PostgreSQLTokenRepository
-
-user_repo = PostgreSQLUserRepository(
- host="localhost",
- port=5432,
- database="mydb",
- user="postgres",
- password="secret"
-)
-token_repo = PostgreSQLTokenRepository(...)
-```
-
-### Email Service Configuration
-
-```python
-from auth_module.email.smtp import SMTPEmailService
-
-email_service = SMTPEmailService(
- host="smtp.gmail.com",
- port=587,
- username="your-email@gmail.com",
- password="your-app-password",
- use_tls=True
-)
-
-auth_service = AuthService(
- user_repository=user_repo,
- token_repository=token_repo,
- email_service=email_service, # Optional
- ...
-)
-```
-
-### Custom Email Service
-
-Implement your own email service by extending the abstract base class:
-
-```python
-from auth_module.email.base import EmailService
-
-class CustomEmailService(EmailService):
- def send_verification_email(self, email: str, token: str) -> None:
- # Your implementation (SendGrid, AWS SES, etc.)
- pass
-
- def send_password_reset_email(self, email: str, token: str) -> None:
- # Your implementation
- pass
-```
-
-## Error Handling
-
-The module uses custom exceptions that are automatically converted to appropriate HTTP responses:
-
-- `InvalidCredentialsError` → 401 Unauthorized
-- `UserAlreadyExistsError` → 409 Conflict
-- `UserNotFoundError` → 404 Not Found
-- `InvalidTokenError` → 401 Unauthorized
-- `RevokedTokenError` → 401 Unauthorized
-- `ExpiredTokenError` → 401 Unauthorized
-- `EmailNotVerifiedError` → 403 Forbidden
-- `AccountDisabledError` → 403 Forbidden
+* `InvalidCredentialsError` → **401 Unauthorized**
+* `UserAlreadyExistsError` → **409 Conflict**
+* `UserNotFoundError` → **404 Not Found**
+* `InvalidTokenError` → **401 Unauthorized**
+* `RevokedTokenError` → **401 Unauthorized**
+* `ExpiredTokenError` → **401 Unauthorized**
+* `EmailNotVerifiedError` → **403 Forbidden (on login attempt)**
+* `AccountDisabledError` → **403 Forbidden (on login attempt)**
## Configuration Options
-```python
+All options are passed during the `AuthService` initialization:
+
+```Python
+
AuthService(
- user_repository: UserRepository, # Required
- token_repository: TokenRepository, # Required
- jwt_secret: str, # Required
- jwt_algorithm: str = "HS256", # Optional
- access_token_expire_minutes: int = 30, # Optional
- refresh_token_expire_days: int = 7, # Optional
- password_reset_token_expire_minutes: int = 15, # Optional
- password_hash_rounds: int = 12, # Optional (bcrypt cost)
- email_service: EmailService = None # Optional
+ user_repository: UserRepository, # Required
+token_repository: TokenRepository, # Required
+jwt_secret: str, # Required
+jwt_algorithm: str = "HS256", # Optional
+access_token_expire_minutes: int = 30, # Optional
+refresh_token_expire_days: int = 7, # Optional
+password_reset_token_expire_minutes: int = 15, # Optional
+password_hash_rounds: int = 12, # Optional (bcrypt cost)
+email_service: EmailService = None # Optional
)
```
-## Testing
+## Appendix (Contributor & Development Details)
-The module is fully testable with pytest. Test fixtures are provided for each database implementation.
+ Appendix A: Project Structure (src/my_auth)
+
+```
+my_auth/
+├── __init__.py
+├── models/ # Pydantic models (User, Token...)
+│ ├── user.py
+│ ├── token.py
+│ └── email_verification.py
+├── core/ # Business logic (Auth, Password, Token services)
+│ ├── auth.py
+│ ├── password.py
+│ └── token.py
+├── persistence/ # Database abstraction
+│ ├── base.py # Abstract base classes
+│ ├── mongodb.py
+│ ├── sqlite.py
+│ └── postgresql.py
+├── api/ # FastAPI routes
+│ └── routes.py
+├── email/ # Email service
+│ ├── base.py # Abstract interface
+│ └── smtp.py
+├── exceptions.py # Custom HTTP exceptions
+└── config.py # Configuration classes (if any)
+```
+
+
+
+ Appendix B: Internal Data Models
+
+### User Model
+
+This is the internal Pydantic model used by the service.
+
+```Python
+
+class User:
+ id: str # Unique identifier
+ email: str # Unique, required
+ username: str # Required, non-unique
+ hashed_password: str # Bcrypt hashed
+ roles: list[str] # Free-form roles, no defaults
+ user_settings: dict # Custom user settings
+ is_verified: bool # Email verification status
+ is_active: bool # Account active status
+ created_at: datetime
+ updated_at: datetime
+```
+
+### Token Management
+
+The module uses a unified `tokens` collection/table with a `token_type` discriminator field for all persistent tokens.
+
+1. Access Token (JWT): 30 minutes validity, stateless, not stored in DB.
+1. Refresh Token (Opaque): 7 days validity, stored in DB.
+1. Password Reset Token (Random): 15 minutes validity, stored in DB.
+1. Email Verification Token (JWT): Stateless, not stored in DB.
+
+
+
+
+ Appendix C: Testing & Security
+
+### Testing
+
+The module is testable with `pytest`.
+
+```Bash
-```bash
pytest tests/
```
-## Security Considerations
+### Security Considerations
-- Passwords are hashed using bcrypt with configurable rounds
-- JWT tokens are signed with HS256 (configurable)
-- Refresh tokens are opaque and stored securely
-- Password reset tokens are single-use and expire after 15 minutes
-- Email verification tokens are stateless JWT
-- Rate limiting should be implemented at the application level
-- HTTPS should be enforced by the application
+* Passwords are hashed using bcrypt with configurable rounds.
+* JWT tokens are signed with HS256 (configurable).
+* Refresh tokens are opaque and stored securely in the database.
+* Password reset tokens are single-use, opaque, and expire quickly (15 min).
+* Rate Limiting is not included and should be implemented at the application level (e.g., using `slowapi`).
+* HTTPS should be enforced by the production web server (e.g., Nginx, Traefik).
-## Future Enhancements (Not Included)
-
-- Multi-factor authentication (2FA/MFA)
-- Rate limiting on login attempts
-- OAuth2 provider integration
-- Session management (multiple device tracking)
-- Account lockout after failed attempts
+
## License
-[Your License Here]
-
-## Contributing
-
-[Your Contributing Guidelines Here]
\ No newline at end of file
+MIT
diff --git a/pyproject.toml b/pyproject.toml
new file mode 100644
index 0000000..9ef2c7f
--- /dev/null
+++ b/pyproject.toml
@@ -0,0 +1,83 @@
+# File: pyproject.toml
+
+[build-system]
+# Define the build system requirements
+requires = ["setuptools>=80.9", "wheel"]
+build-backend = "setuptools.build_meta"
+
+[project]
+name = "myauth"
+version = "0.1.0" # Start with an initial version
+description = "A reusable, modular authentication system for FastAPI applications with pluggable database backends."
+readme = "README.md"
+authors = [
+ { name = "Kodjo Sossouvi", email = "kodjo.sossouvi@gmail.com" },
+]
+maintainers = [
+ { name = "Kodjo Sossouvi", email = "kodjo.sossouvi@gmail.com" }
+]
+license = "MIT"
+requires-python = ">=3.8"
+classifiers = [
+ "Operating System :: OS Independent",
+ "Framework :: FastAPI",
+ "Programming Language :: Python :: 3",
+ "Programming Language :: Python :: 3.8",
+ "Programming Language :: Python :: 3.9",
+ "Programming Language :: Python :: 3.10",
+ "Programming Language :: Python :: 3.11",
+ "Programming Language :: Python :: 3.12",
+ "Topic :: Auth",
+]
+
+# -------------------------------------------------------------------
+# Core dependencies from your README
+# Note: 'requirements.txt' is for development, this is for the package
+# -------------------------------------------------------------------
+dependencies = [
+ "fastapi",
+ "pydantic",
+ "pydantic-settings",
+ "python-jose[cryptography]",
+ "passlib[bcrypt]",
+ "python-multipart"
+]
+
+[project.urls]
+# Optional: Link to your internal repository or documentation
+Homepage = "https://gitea.sheerka.synology.me/kodjo/MyAuth"
+Documentation = "https://gitea.sheerka.synology.me/kodjo/MyAuth#readme"
+Repository = "https://gitea.sheerka.synology.me/kodjo/MyAuth"
+Issues = "https://gitea.sheerka.synology.me/kodjo/MyAuth/issues"
+
+# -------------------------------------------------------------------
+# Optional dependencies ("extras")
+# This allows users to install only what they need, e.g.:
+# pip install myauth[mongodb,email]
+# -------------------------------------------------------------------
+[project.optional-dependencies]
+mongodb = [
+ "pymongo"
+]
+postgresql = [
+ "psycopg2-binary"
+]
+email = [
+ "secure-smtplib"
+]
+# For development and testing (from your requirements.txt)
+dev = [
+ "pytest",
+ "httpx",
+ "anyio",
+ "email-validator",
+ "python-dotenv"
+]
+
+# -------------------------------------------------------------------
+# Setuptools configuration
+# This section tells the build system where to find your package code
+# -------------------------------------------------------------------
+[tool.setuptools]
+package-dir = {"" = "src"}
+packages = ["my_auth"]
\ No newline at end of file
diff --git a/requirements.txt b/requirements.txt
new file mode 100644
index 0000000..c41b156
--- /dev/null
+++ b/requirements.txt
@@ -0,0 +1,34 @@
+annotated-types==0.7.0
+anyio==4.11.0
+bcrypt==5.0.0
+certifi==2025.10.5
+cffi==2.0.0
+cryptography==46.0.3
+dnspython==2.8.0
+ecdsa==0.19.1
+email-validator==2.3.0
+fastapi==0.119.0
+h11==0.16.0
+httpcore==1.0.9
+httpx==0.28.1
+idna==3.11
+iniconfig==2.1.0
+packaging==25.0
+passlib==1.7.4
+pluggy==1.6.0
+pyasn1==0.6.1
+pycparser==2.23
+pydantic==2.12.3
+pydantic-settings==2.11.0
+pydantic_core==2.41.4
+Pygments==2.19.2
+pytest==8.4.2
+python-dotenv==1.1.1
+python-jose==3.5.0
+python-multipart==0.0.20
+rsa==4.9.1
+six==1.17.0
+sniffio==1.3.1
+starlette==0.48.0
+typing-inspection==0.4.2
+typing_extensions==4.15.0
diff --git a/src/my_auth/api/__init__.py b/src/my_auth/api/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/src/my_auth/api/routes.py b/src/my_auth/api/routes.py
new file mode 100644
index 0000000..292eeae
--- /dev/null
+++ b/src/my_auth/api/routes.py
@@ -0,0 +1,379 @@
+"""
+FastAPI routes for authentication module.
+
+This module provides ready-to-use FastAPI routes for all authentication
+operations. Routes are organized in an APIRouter with /auth prefix.
+"""
+
+from typing import Annotated
+
+from fastapi import APIRouter, Depends, HTTPException, status
+from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
+
+from ..core.auth import AuthService
+from ..exceptions import AuthError
+from ..models.email_verification import (
+ EmailVerificationRequest,
+ PasswordResetRequest,
+ PasswordResetConfirm
+)
+from ..models.token import AccessTokenResponse, RefreshTokenRequest
+from ..models.user import UserCreate, UserResponse
+
+# OAuth2 scheme for token authentication
+oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/login")
+
+
+def create_auth_router(auth_service: AuthService) -> APIRouter:
+ """
+ Create and configure the authentication router.
+
+ This factory function creates an APIRouter with all authentication
+ endpoints configured. The router includes automatic exception handling
+ for authentication errors.
+
+ Args:
+ auth_service: Configured authentication service instance.
+
+ Returns:
+ Configured APIRouter ready to be included in a FastAPI app.
+
+ Example:
+ >>> from fastapi import FastAPI
+ >>> from auth_module.api.routes import create_auth_router
+ >>>
+ >>> app = FastAPI()
+ >>> auth_router = create_auth_router(auth_service)
+ >>> app.include_router(auth_router)
+ """
+ router = APIRouter(prefix="/auth", tags=["authentication"])
+
+ def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]) -> UserResponse:
+ """
+ Dependency to extract and validate the current user from access token.
+
+ This dependency can be used in any route that requires authentication.
+ It extracts the Bearer token from the Authorization header and validates it.
+
+ Args:
+ token: JWT access token from Authorization header.
+
+ Returns:
+ The current authenticated user (as UserResponse).
+
+ Raises:
+ HTTPException: 401 if token is invalid or expired.
+
+ Example:
+ >>> @app.get("/protected")
+ >>> def protected_route(user: UserResponse = Depends(get_current_user)):
+ >>> return {"user_id": user.id}
+ """
+ try:
+ user = auth_service.get_current_user(token)
+ return UserResponse(
+ id=user.id,
+ email=user.email,
+ username=user.username,
+ roles=user.roles,
+ user_settings=user.user_settings,
+ created_at=user.created_at,
+ updated_at=user.updated_at
+ )
+ except AuthError as e:
+ raise HTTPException(
+ status_code=e.status_code,
+ detail=e.message
+ )
+
+ @router.post(
+ "/register",
+ response_model=UserResponse,
+ status_code=status.HTTP_201_CREATED,
+ summary="Register a new user",
+ description="Create a new user account with email, username, and password."
+ )
+ def register(user_data: UserCreate) -> UserResponse:
+ """
+ Register a new user.
+
+ Creates a new user account with the provided credentials. The password
+ is automatically hashed before storage. Email verification is optional
+ and the account is created with is_verified=False.
+
+ Args:
+ user_data: User registration data including email, username, and password.
+
+ Returns:
+ The created user information (without password).
+
+ Raises:
+ HTTPException: 409 if email already exists.
+ HTTPException: 422 if validation fails (password strength, etc.).
+ """
+ try:
+ user = auth_service.register(user_data)
+ return UserResponse(
+ id=user.id,
+ email=user.email,
+ username=user.username,
+ roles=user.roles,
+ user_settings=user.user_settings,
+ created_at=user.created_at,
+ updated_at=user.updated_at
+ )
+ except AuthError as e:
+ raise HTTPException(
+ status_code=e.status_code,
+ detail=e.message
+ )
+
+ @router.post(
+ "/login",
+ response_model=AccessTokenResponse,
+ summary="Login with email and password",
+ description="Authenticate a user and receive access and refresh tokens."
+ )
+ def login(form_data: Annotated[OAuth2PasswordRequestForm, Depends()]) -> AccessTokenResponse:
+ """
+ Authenticate a user and generate tokens.
+
+ This endpoint accepts form data with username (which should contain the email)
+ and password. It returns both access and refresh tokens upon successful authentication.
+
+ Note: OAuth2PasswordRequestForm uses 'username' field, but we treat it as email.
+
+ Args:
+ form_data: OAuth2 form with username (email) and password fields.
+
+ Returns:
+ Access token (JWT) and refresh token with token_type="bearer".
+
+ Raises:
+ HTTPException: 401 if credentials are invalid.
+ HTTPException: 403 if account is disabled.
+ """
+ try:
+ # OAuth2PasswordRequestForm uses 'username' but we treat it as email
+ user, tokens = auth_service.login(form_data.username, form_data.password)
+ return tokens
+ except AuthError as e:
+ raise HTTPException(
+ status_code=e.status_code,
+ detail=e.message
+ )
+
+ @router.post(
+ "/logout",
+ status_code=status.HTTP_204_NO_CONTENT,
+ summary="Logout user",
+ description="Revoke the refresh token to logout the user."
+ )
+ def logout(request: RefreshTokenRequest) -> None:
+ """
+ Logout a user by revoking their refresh token.
+
+ This prevents the refresh token from being used to obtain new access tokens.
+ The current access token remains valid until it expires naturally (30 minutes).
+
+ Args:
+ request: Request body containing the refresh token to revoke.
+
+ Returns:
+ 204 No Content on success.
+ """
+ auth_service.logout(request.refresh_token)
+ return None
+
+ @router.post(
+ "/refresh",
+ response_model=AccessTokenResponse,
+ summary="Refresh access token",
+ description="Exchange a refresh token for new access and refresh tokens."
+ )
+ def refresh_token(request: RefreshTokenRequest) -> AccessTokenResponse:
+ """
+ Obtain a new access token using a refresh token.
+
+ This endpoint allows clients to obtain a new access token without
+ requiring the user to re-enter their password. The old refresh token
+ is revoked and a new one is issued.
+
+ Args:
+ request: Request body containing the refresh token.
+
+ Returns:
+ New access token and refresh token.
+
+ Raises:
+ HTTPException: 401 if refresh token is invalid, expired, or revoked.
+ """
+ try:
+ tokens = auth_service.refresh_access_token(request.refresh_token)
+ return tokens
+ except AuthError as e:
+ raise HTTPException(
+ status_code=e.status_code,
+ detail=e.message
+ )
+
+ @router.post(
+ "/password-reset-request",
+ status_code=status.HTTP_200_OK,
+ summary="Request password reset",
+ description="Generate a password reset token and return it (to be sent via email)."
+ )
+ def request_password_reset(request: PasswordResetRequest) -> dict:
+ """
+ Request a password reset token.
+
+ This endpoint generates a secure token that can be used to reset the password.
+ In production, this token should be sent via email. For this module, the token
+ is returned in the response so the consuming application can handle email delivery.
+
+ Args:
+ request: Request body containing the email address.
+
+ Returns:
+ Dictionary with the reset token and a message.
+
+ Raises:
+ HTTPException: 404 if email is not registered.
+ """
+ try:
+ token = auth_service.request_password_reset(request.email)
+ return {
+ "message": "Password reset token generated",
+ "token": token
+ }
+ except AuthError as e:
+ raise HTTPException(
+ status_code=e.status_code,
+ detail=e.message
+ )
+
+ @router.post(
+ "/password-reset",
+ status_code=status.HTTP_200_OK,
+ summary="Reset password with token",
+ description="Reset user password using a valid reset token."
+ )
+ def reset_password(request: PasswordResetConfirm) -> dict:
+ """
+ Reset a user's password using a reset token.
+
+ This endpoint validates the reset token and updates the user's password.
+ All existing refresh tokens are revoked for security.
+
+ Args:
+ request: Request body containing the reset token and new password.
+
+ Returns:
+ Success message.
+
+ Raises:
+ HTTPException: 401 if token is invalid, expired, or already used.
+ HTTPException: 422 if new password doesn't meet requirements.
+ """
+ try:
+ auth_service.reset_password(request.token, request.new_password)
+ return {"message": "Password reset successfully"}
+ except AuthError as e:
+ raise HTTPException(
+ status_code=e.status_code,
+ detail=e.message
+ )
+
+ @router.post(
+ "/verify-email-request",
+ status_code=status.HTTP_200_OK,
+ summary="Request email verification",
+ description="Generate an email verification token and return it (to be sent via email)."
+ )
+ def request_email_verification(request: EmailVerificationRequest) -> dict:
+ """
+ Request an email verification token.
+
+ This endpoint generates a JWT token for email verification. In production,
+ this token should be sent via email with a verification link. For this module,
+ the token is returned in the response so the consuming application can handle
+ email delivery.
+
+ Args:
+ request: Request body containing the email address.
+
+ Returns:
+ Dictionary with the verification token and a message.
+
+ Raises:
+ HTTPException: 404 if email is not registered.
+ """
+ try:
+ token = auth_service.request_email_verification(request.email)
+ return {
+ "message": "Email verification token generated",
+ "token": token
+ }
+ except AuthError as e:
+ raise HTTPException(
+ status_code=e.status_code,
+ detail=e.message
+ )
+
+ @router.get(
+ "/verify-email",
+ status_code=status.HTTP_200_OK,
+ summary="Verify email with token",
+ description="Verify user email using a verification token from query parameter."
+ )
+ def verify_email(token: str) -> dict:
+ """
+ Verify a user's email address.
+
+ This endpoint validates the verification token and marks the user's
+ email as verified. It uses a query parameter so it can be easily
+ accessed via a link in an email.
+
+ Args:
+ token: Email verification token (JWT) from query parameter.
+
+ Returns:
+ Success message.
+
+ Raises:
+ HTTPException: 401 if token is invalid or expired.
+ """
+ try:
+ auth_service.verify_email(token)
+ return {"message": "Email verified successfully"}
+ except AuthError as e:
+ raise HTTPException(
+ status_code=e.status_code,
+ detail=e.message
+ )
+
+ @router.get(
+ "/me",
+ response_model=UserResponse,
+ summary="Get current user",
+ description="Get information about the currently authenticated user."
+ )
+ def get_me(current_user: Annotated[UserResponse, Depends(get_current_user)]) -> UserResponse:
+ """
+ Get current authenticated user information.
+
+ This is a protected route that requires a valid access token in the
+ Authorization header (Bearer token).
+
+ Args:
+ current_user: The authenticated user (injected by dependency).
+
+ Returns:
+ Current user information.
+
+ Raises:
+ HTTPException: 401 if token is invalid or expired.
+ """
+ return current_user
+
+ return router
diff --git a/src/my_auth/core/auth.py b/src/my_auth/core/auth.py
index 86b2fd0..48fe85a 100644
--- a/src/my_auth/core/auth.py
+++ b/src/my_auth/core/auth.py
@@ -7,439 +7,440 @@ password reset, and email verification.
"""
from datetime import datetime
+from typing import Optional
from .password import PasswordManager
from .token import TokenManager
-from ..exceptions import (
- InvalidCredentialsError,
- UserNotFoundError,
- AccountDisabledError,
- ExpiredTokenError,
- InvalidTokenError,
- RevokedTokenError
-)
-from ..models.token import AccessTokenResponse, TokenData
-from ..models.user import UserCreate, UserInDB, UserUpdate
from ..persistence.base import UserRepository, TokenRepository
+from ..models.user import UserCreate, UserInDB, UserUpdate
+from ..models.token import AccessTokenResponse, TokenData
+from ..email.base import EmailService
+from ..exceptions import (
+ InvalidCredentialsError,
+ UserNotFoundError,
+ AccountDisabledError,
+ ExpiredTokenError,
+ InvalidTokenError,
+ RevokedTokenError
+)
class AuthService:
- """
- Main authentication service.
-
- This service orchestrates all authentication-related operations by
- coordinating between password management, token management, and
- persistence layers.
-
- Attributes:
- user_repository: Repository for user persistence operations.
- token_repository: Repository for token persistence operations.
- password_manager: Manager for password hashing and verification.
- token_manager: Manager for token creation and validation.
- """
-
- def __init__(
- self,
- user_repository: UserRepository,
- token_repository: TokenRepository,
- password_manager: PasswordManager,
- token_manager: TokenManager
- ):
"""
- Initialize the authentication service.
-
- Args:
- user_repository: Repository for user persistence.
- token_repository: Repository for token persistence.
- jwt_secret: Secret key for JWT signing.
- jwt_algorithm: JWT algorithm (default: HS256).
- access_token_expire_minutes: Access token validity (default: 30).
- refresh_token_expire_days: Refresh token validity (default: 7).
- password_reset_token_expire_minutes: Reset token validity (default: 15).
- password_hash_rounds: Bcrypt rounds (default: 12).
+ Main authentication service.
+
+ This service orchestrates all authentication-related operations by
+ coordinating between password management, token management, and
+ persistence layers.
+
+ Attributes:
+ user_repository: Repository for user persistence operations.
+ token_repository: Repository for token persistence operations.
+ password_manager: Manager for password hashing and verification.
+ token_manager: Manager for token creation and validation.
+ email_service: Optional service for sending emails.
"""
- self.user_repository = user_repository
- self.token_repository = token_repository
- self.password_manager = password_manager
- self.token_manager = token_manager
-
- def register(self, user_data: UserCreate) -> UserInDB:
- """
- Register a new user.
-
- This method creates a new user account with hashed password.
- The user's email is initially unverified.
-
- Args:
- user_data: User registration data including password.
-
- Returns:
- The created user (without password).
-
- Raises:
- UserAlreadyExistsError: If email is already registered.
-
- Example:
- >>> user_data = UserCreate(
- ... email="user@example.com",
- ... username="john_doe",
- ... password="SecurePass123!"
- ... )
- >>> user = auth_service.register(user_data)
- """
- hashed_password = self.password_manager.hash_password(user_data.password)
- user = self.user_repository.create_user(user_data, hashed_password)
- return user
-
- def login(self, email: str, password: str) -> tuple[UserInDB, AccessTokenResponse]:
- """
- Authenticate a user and create tokens.
-
- This method verifies credentials, checks account status, and generates
- both access and refresh tokens.
-
- Args:
- email: User's email address.
- password: User's plain text password.
-
- Returns:
- Tuple of (user, tokens) where tokens contains access_token and refresh_token.
-
- Raises:
- InvalidCredentialsError: If email or password is incorrect.
- AccountDisabledError: If user account is disabled.
-
- Example:
- >>> user, tokens = auth_service.login("user@example.com", "password")
- >>> print(tokens.access_token)
- """
- # Get user by email
- user = self.user_repository.get_user_by_email(email)
- if not user:
- raise InvalidCredentialsError()
- # Verify password
- if not self.password_manager.verify_password(password, user.hashed_password):
- raise InvalidCredentialsError()
+ def __init__(
+ self,
+ user_repository: UserRepository,
+ token_repository: TokenRepository,
+ password_manager: PasswordManager,
+ token_manager: TokenManager,
+ email_service: Optional[EmailService] = None
+ ):
+ """
+ Initialize the authentication service.
+
+ Args:
+ user_repository: Repository for user persistence.
+ token_repository: Repository for token persistence.
+ password_manager: Manager for password hashing and verification.
+ token_manager: Manager for token creation and validation.
+ email_service: Optional service for sending emails (password reset, verification).
+ """
+ self.user_repository = user_repository
+ self.token_repository = token_repository
+ self.password_manager = password_manager
+ self.token_manager = token_manager
+ self.email_service = email_service
- # Check if account is active
- if not user.is_active:
- raise AccountDisabledError()
+ def register(self, user_data: UserCreate) -> UserInDB:
+ """
+ Register a new user.
+
+ This method creates a new user account with hashed password.
+ The user's email is initially unverified.
+
+ Args:
+ user_data: User registration data including password.
+
+ Returns:
+ The created user (without password).
+
+ Raises:
+ UserAlreadyExistsError: If email is already registered.
+
+ Example:
+ >>> user_data = UserCreate(
+ ... email="user@example.com",
+ ... username="john_doe",
+ ... password="SecurePass123!"
+ ... )
+ >>> user = auth_service.register(user_data)
+ """
+ hashed_password = self.password_manager.hash_password(user_data.password)
+ user = self.user_repository.create_user(user_data, hashed_password)
+ return user
- # Create tokens
- access_token = self.token_manager.create_access_token(user)
- refresh_token = self.token_manager.create_refresh_token()
+ def login(self, email: str, password: str) -> tuple[UserInDB, AccessTokenResponse]:
+ """
+ Authenticate a user and create tokens.
+
+ This method verifies credentials, checks account status, and generates
+ both access and refresh tokens.
+
+ Args:
+ email: User's email address.
+ password: User's plain text password.
+
+ Returns:
+ Tuple of (user, tokens) where tokens contains access_token and refresh_token.
+
+ Raises:
+ InvalidCredentialsError: If email or password is incorrect.
+ AccountDisabledError: If user account is disabled.
+
+ Example:
+ >>> user, tokens = auth_service.login("user@example.com", "password")
+ >>> print(tokens.access_token)
+ """
+ # Get user by email
+ user = self.user_repository.get_user_by_email(email)
+ if not user:
+ raise InvalidCredentialsError()
+
+ # Verify password
+ if not self.password_manager.verify_password(password, user.hashed_password):
+ raise InvalidCredentialsError()
+
+ # Check if account is active
+ if not user.is_active:
+ raise AccountDisabledError()
+
+ # Create tokens
+ access_token = self.token_manager.create_access_token(user)
+ refresh_token = self.token_manager.create_refresh_token()
+
+ # Store refresh token in database
+ token_data = TokenData(
+ token=refresh_token,
+ token_type="refresh",
+ user_id=user.id,
+ expires_at=self.token_manager.get_refresh_token_expiration(),
+ created_at=datetime.utcnow(),
+ is_revoked=False
+ )
+ self.token_repository.save_token(token_data)
+
+ tokens = AccessTokenResponse(
+ access_token=access_token,
+ refresh_token=refresh_token,
+ token_type="bearer"
+ )
+
+ return user, tokens
- # Store refresh token in database
- token_data = TokenData(
- token=refresh_token,
- token_type="refresh",
- user_id=user.id,
- expires_at=self.token_manager.get_refresh_token_expiration(),
- created_at=datetime.now(),
- is_revoked=False
- )
- self.token_repository.save_token(token_data)
+ def refresh_access_token(self, refresh_token: str) -> AccessTokenResponse:
+ """
+ Create a new access token using a refresh token.
+
+ This method validates the refresh token and generates a new access token
+ without requiring the user to re-enter their password.
+
+ Args:
+ refresh_token: The refresh token to exchange.
+
+ Returns:
+ New access and refresh tokens.
+
+ Raises:
+ InvalidTokenError: If refresh token is invalid.
+ ExpiredTokenError: If refresh token has expired.
+ RevokedTokenError: If refresh token has been revoked.
+ UserNotFoundError: If user no longer exists.
+ AccountDisabledError: If user account is disabled.
+
+ Example:
+ >>> tokens = auth_service.refresh_access_token(old_refresh_token)
+ >>> print(tokens.access_token)
+ """
+ # Validate refresh token
+ token_data = self.token_repository.get_token(refresh_token, "refresh")
+ if not token_data:
+ raise InvalidTokenError("Invalid refresh token")
+
+ if token_data.is_revoked:
+ raise RevokedTokenError()
+
+ if token_data.expires_at < datetime.utcnow():
+ raise ExpiredTokenError()
+
+ # Get user
+ user = self.user_repository.get_user_by_id(token_data.user_id)
+ if not user:
+ raise UserNotFoundError()
+
+ if not user.is_active:
+ raise AccountDisabledError()
+
+ # Create new tokens
+ access_token = self.token_manager.create_access_token(user)
+ new_refresh_token = self.token_manager.create_refresh_token()
+
+ # Revoke old refresh token
+ self.token_repository.revoke_token(refresh_token)
+
+ # Store new refresh token
+ new_token_data = TokenData(
+ token=new_refresh_token,
+ token_type="refresh",
+ user_id=user.id,
+ expires_at=self.token_manager.get_refresh_token_expiration(),
+ created_at=datetime.utcnow(),
+ is_revoked=False
+ )
+ self.token_repository.save_token(new_token_data)
+
+ return AccessTokenResponse(
+ access_token=access_token,
+ refresh_token=new_refresh_token,
+ token_type="bearer"
+ )
- tokens = AccessTokenResponse(
- access_token=access_token,
- refresh_token=refresh_token,
- token_type="bearer"
- )
+ def logout(self, refresh_token: str) -> bool:
+ """
+ Logout a user by revoking their refresh token.
+
+ This prevents the refresh token from being used to obtain new
+ access tokens. The current access token will remain valid until
+ it expires naturally.
+
+ Args:
+ refresh_token: The refresh token to revoke.
+
+ Returns:
+ True if logout was successful, False if token not found.
+
+ Example:
+ >>> auth_service.logout(refresh_token)
+ True
+ """
+ return self.token_repository.revoke_token(refresh_token)
- return user, tokens
-
- def refresh_access_token(self, refresh_token: str) -> AccessTokenResponse:
- """
- Create a new access token using a refresh token.
-
- This method validates the refresh token and generates a new access token
- without requiring the user to re-enter their password.
-
- Args:
- refresh_token: The refresh token to exchange.
-
- Returns:
- New access and refresh tokens.
-
- Raises:
- InvalidTokenError: If refresh token is invalid.
- ExpiredTokenError: If refresh token has expired.
- RevokedTokenError: If refresh token has been revoked.
- UserNotFoundError: If user no longer exists.
- AccountDisabledError: If user account is disabled.
-
- Example:
- >>> tokens = auth_service.refresh_access_token(old_refresh_token)
- >>> print(tokens.access_token)
- """
- # Validate refresh token
- token_data = self.token_repository.get_token(refresh_token, "refresh")
- if not token_data:
- raise InvalidTokenError("Invalid refresh token")
+ def request_password_reset(self, email: str) -> str:
+ """
+ Generate a password reset token for a user.
+
+ This method creates a secure token that can be sent to the user
+ via email to reset their password. If an email service is configured,
+ the email is sent automatically. Otherwise, the token is returned
+ for manual handling.
+
+ Args:
+ email: User's email address.
+
+ Returns:
+ The password reset token to be sent via email.
+
+ Raises:
+ UserNotFoundError: If email is not registered.
+
+ Example:
+ >>> token = auth_service.request_password_reset("user@example.com")
+ >>> # Token is automatically sent via email if service is configured
+ """
+ user = self.user_repository.get_user_by_email(email)
+ if not user:
+ raise UserNotFoundError(f"No user found with email {email}")
+
+ # Create reset token
+ reset_token = self.token_manager.create_password_reset_token()
+
+ # Store token in database
+ token_data = TokenData(
+ token=reset_token,
+ token_type="password_reset",
+ user_id=user.id,
+ expires_at=self.token_manager.get_password_reset_token_expiration(),
+ created_at=datetime.utcnow(),
+ is_revoked=False
+ )
+ self.token_repository.save_token(token_data)
+
+ # Send email if service is configured
+ if self.email_service:
+ self.email_service.send_password_reset_email(email, reset_token)
+
+ return reset_token
- if token_data.is_revoked:
- raise RevokedTokenError()
+ def reset_password(self, token: str, new_password: str) -> bool:
+ """
+ Reset a user's password using a reset token.
+
+ This method validates the reset token and updates the user's password.
+ All existing refresh tokens for the user are revoked for security.
+
+ Args:
+ token: Password reset token.
+ new_password: New plain text password (will be hashed).
+
+ Returns:
+ True if password was reset successfully.
+
+ Raises:
+ InvalidTokenError: If reset token is invalid.
+ ExpiredTokenError: If reset token has expired.
+ RevokedTokenError: If reset token has been used.
+ UserNotFoundError: If user no longer exists.
+
+ Example:
+ >>> auth_service.reset_password(token, "NewSecurePass123!")
+ True
+ """
+ # Validate reset token
+ token_data = self.token_repository.get_token(token, "password_reset")
+ if not token_data:
+ raise InvalidTokenError("Invalid password reset token")
+
+ if token_data.is_revoked:
+ raise RevokedTokenError("Password reset token has already been used")
+
+ if token_data.expires_at < datetime.utcnow():
+ raise ExpiredTokenError("Password reset token has expired")
+
+ # Get user
+ user = self.user_repository.get_user_by_id(token_data.user_id)
+ if not user:
+ raise UserNotFoundError()
+
+ # Hash new password
+ hashed_password = self.password_manager.hash_password(new_password)
+
+ # Update user password
+ updates = UserUpdate(password=hashed_password)
+ self.user_repository.update_user(user.id, updates)
+
+ # Revoke the reset token
+ self.token_repository.revoke_token(token)
+
+ # Revoke all user's refresh tokens for security
+ self.token_repository.revoke_all_user_tokens(user.id, "refresh")
+
+ return True
- if token_data.expires_at < datetime.now():
- raise ExpiredTokenError()
+ def request_email_verification(self, email: str) -> str:
+ """
+ Generate an email verification token for a user.
+
+ This method creates a JWT token that can be sent to the user
+ to verify their email address. If an email service is configured,
+ the email is sent automatically. Otherwise, the token is returned
+ for manual handling.
+
+ Args:
+ email: User's email address.
+
+ Returns:
+ The email verification token (JWT) to be sent via email.
+
+ Raises:
+ UserNotFoundError: If email is not registered.
+
+ Example:
+ >>> token = auth_service.request_email_verification("user@example.com")
+ >>> # Token is automatically sent via email if service is configured
+ """
+ user = self.user_repository.get_user_by_email(email)
+ if not user:
+ raise UserNotFoundError(f"No user found with email {email}")
+
+ verification_token = self.token_manager.create_email_verification_token(email)
+
+ # Send email if service is configured
+ if self.email_service:
+ self.email_service.send_verification_email(email, verification_token)
+
+ return verification_token
- # Get user
- user = self.user_repository.get_user_by_id(token_data.user_id)
- if not user:
- raise UserNotFoundError()
+ def verify_email(self, token: str) -> bool:
+ """
+ Verify a user's email address using a verification token.
+
+ This method decodes the JWT token, extracts the email, and marks
+ the user's email as verified.
+
+ Args:
+ token: Email verification token (JWT).
+
+ Returns:
+ True if email was verified successfully.
+
+ Raises:
+ InvalidTokenError: If verification token is invalid.
+ ExpiredTokenError: If verification token has expired.
+ UserNotFoundError: If user no longer exists.
+
+ Example:
+ >>> auth_service.verify_email(token)
+ True
+ """
+ # Decode and validate token
+ email = self.token_manager.decode_email_verification_token(token)
+
+ # Get user
+ user = self.user_repository.get_user_by_email(email)
+ if not user:
+ raise UserNotFoundError(f"No user found with email {email}")
+
+ # Update user's verified status
+ updates = UserUpdate(is_verified=True)
+ self.user_repository.update_user(user.id, updates)
+
+ return True
- if not user.is_active:
- raise AccountDisabledError()
-
- # Create new tokens
- access_token = self.token_manager.create_access_token(user)
- new_refresh_token = self.token_manager.create_refresh_token()
-
- # Revoke old refresh token
- self.token_repository.revoke_token(refresh_token)
-
- # Store new refresh token
- new_token_data = TokenData(
- token=new_refresh_token,
- token_type="refresh",
- user_id=user.id,
- expires_at=self.token_manager.get_refresh_token_expiration(),
- created_at=datetime.now(),
- is_revoked=False
- )
- self.token_repository.save_token(new_token_data)
-
- return AccessTokenResponse(
- access_token=access_token,
- refresh_token=new_refresh_token,
- token_type="bearer"
- )
-
- def logout(self, refresh_token: str) -> bool:
- """
- Logout a user by revoking their refresh token.
-
- This prevents the refresh token from being used to obtain new
- access tokens. The current access token will remain valid until
- it expires naturally.
-
- Args:
- refresh_token: The refresh token to revoke.
-
- Returns:
- True if logout was successful, False if token not found.
-
- Example:
- >>> auth_service.logout(refresh_token)
- True
- """
- return self.token_repository.revoke_token(refresh_token)
-
- def request_password_reset(self, email: str) -> str:
- """
- Generate a password reset token for a user.
-
- This method creates a secure token that can be sent to the user
- via email to reset their password.
-
- Args:
- email: User's email address.
-
- Returns:
- The password reset token to be sent via email.
-
- Raises:
- UserNotFoundError: If email is not registered.
-
- Example:
- >>> token = auth_service.request_password_reset("user@example.com")
- >>> # Send token via email service
- """
- user = self.user_repository.get_user_by_email(email)
- if not user:
- raise UserNotFoundError(f"No user found with email {email}")
-
- # Create reset token
- reset_token = self.token_manager.create_password_reset_token()
-
- # Store token in database
- token_data = TokenData(
- token=reset_token,
- token_type="password_reset",
- user_id=user.id,
- expires_at=self.token_manager.get_password_reset_token_expiration(),
- created_at=datetime.now(),
- is_revoked=False
- )
- self.token_repository.save_token(token_data)
-
- return reset_token
-
- def reset_password(self, token: str, new_password: str) -> bool:
- """
- Reset a user's password using a reset token.
-
- This method validates the reset token and updates the user's password.
- All existing refresh tokens for the user are revoked for security.
-
- Args:
- token: Password reset token.
- new_password: New plain text password (will be hashed).
-
- Returns:
- True if password was reset successfully.
-
- Raises:
- InvalidTokenError: If reset token is invalid.
- ExpiredTokenError: If reset token has expired.
- RevokedTokenError: If reset token has been used.
- UserNotFoundError: If user no longer exists.
-
- Example:
- >>> auth_service.reset_password(token, "NewSecurePass123!")
- True
- """
- # Validate reset token
- token_data = self.token_repository.get_token(token, "password_reset")
- if not token_data:
- raise InvalidTokenError("Invalid password reset token")
-
- if token_data.is_revoked:
- raise RevokedTokenError("Password reset token has already been used")
-
- if token_data.expires_at < datetime.now():
- raise ExpiredTokenError("Password reset token has expired")
-
- # Get user
- user = self.user_repository.get_user_by_id(token_data.user_id)
- if not user:
- raise UserNotFoundError()
-
- # Hash new password
- hashed_password = self.password_manager.hash_password(new_password)
-
- # Update user password
- updates = UserUpdate(password=hashed_password)
- self.user_repository.update_user(user.id, updates)
-
- # Revoke the reset token
- self.token_repository.revoke_token(token)
-
- # Revoke all user's refresh tokens for security
- self.token_repository.revoke_all_user_tokens(user.id, "refresh")
-
- return True
-
- def request_email_verification(self, email: str) -> str:
- """
- Generate an email verification token for a user.
-
- This method creates a JWT token that can be sent to the user
- to verify their email address.
-
- Args:
- email: User's email address.
-
- Returns:
- The email verification token (JWT) to be sent via email.
-
- Raises:
- UserNotFoundError: If email is not registered.
-
- Example:
- >>> token = auth_service.request_email_verification("user@example.com")
- >>> # Send token via email service
- """
- user = self.user_repository.get_user_by_email(email)
- if not user:
- raise UserNotFoundError(f"No user found with email {email}")
-
- return self.token_manager.create_email_verification_token(email)
-
- def verify_email(self, token: str) -> bool:
- """
- Verify a user's email address using a verification token.
-
- This method decodes the JWT token, extracts the email, and marks
- the user's email as verified.
-
- Args:
- token: Email verification token (JWT).
-
- Returns:
- True if email was verified successfully.
-
- Raises:
- InvalidTokenError: If verification token is invalid.
- ExpiredTokenError: If verification token has expired.
- UserNotFoundError: If user no longer exists.
-
- Example:
- >>> auth_service.verify_email(token)
- True
- """
- # Decode and validate token
- email = self.token_manager.decode_email_verification_token(token)
-
- # Get user
- user = self.user_repository.get_user_by_email(email)
- if not user:
- raise UserNotFoundError(f"No user found with email {email}")
-
- # Update user's verified status
- updates = UserUpdate(is_verified=True)
- self.user_repository.update_user(user.id, updates)
-
- return True
-
- def get_current_user(self, access_token: str) -> UserInDB:
- """
- Retrieve the current user from an access token.
-
- This method decodes and validates the JWT access token and
- retrieves the corresponding user from the database.
-
- Args:
- access_token: JWT access token.
-
- Returns:
- The user associated with the token.
-
- Raises:
- InvalidTokenError: If access token is invalid.
- ExpiredTokenError: If access token has expired.
- UserNotFoundError: If user no longer exists.
- AccountDisabledError: If user account is disabled.
-
- Example:
- >>> user = auth_service.get_current_user(access_token)
- >>> print(user.email)
- """
- # Decode and validate token
- token_payload = self.token_manager.decode_access_token(access_token)
-
- # Get user
- user = self.user_repository.get_user_by_id(token_payload.sub)
- if not user:
- raise UserNotFoundError()
-
- if not user.is_active:
- raise AccountDisabledError()
-
- return user
-
-
-def get_default_sqlite_auth_service(db_path: str, jwt_secret: str) -> AuthService:
- from my_auth.persistence.sqlite import SQLiteUserRepository
- from my_auth.persistence.sqlite import SQLiteTokenRepository
- user_repository = SQLiteUserRepository(db_path=db_path)
- token_repository = SQLiteTokenRepository(db_path=db_path)
- password_manager = PasswordManager()
- token_manager = TokenManager(jwt_secret=jwt_secret)
- return AuthService(
- user_repository=user_repository,
- token_repository=token_repository,
- password_manager=password_manager,
- token_manager=token_manager
- )
+ def get_current_user(self, access_token: str) -> UserInDB:
+ """
+ Retrieve the current user from an access token.
+
+ This method decodes and validates the JWT access token and
+ retrieves the corresponding user from the database.
+
+ Args:
+ access_token: JWT access token.
+
+ Returns:
+ The user associated with the token.
+
+ Raises:
+ InvalidTokenError: If access token is invalid.
+ ExpiredTokenError: If access token has expired.
+ UserNotFoundError: If user no longer exists.
+ AccountDisabledError: If user account is disabled.
+
+ Example:
+ >>> user = auth_service.get_current_user(access_token)
+ >>> print(user.email)
+ """
+ # Decode and validate token
+ token_payload = self.token_manager.decode_access_token(access_token)
+
+ # Get user
+ user = self.user_repository.get_user_by_id(token_payload.sub)
+ if not user:
+ raise UserNotFoundError()
+
+ if not user.is_active:
+ raise AccountDisabledError()
+
+ return user
\ No newline at end of file
diff --git a/src/my_auth/email/__init__.py b/src/my_auth/email/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/src/my_auth/email/base.py b/src/my_auth/email/base.py
new file mode 100644
index 0000000..955079e
--- /dev/null
+++ b/src/my_auth/email/base.py
@@ -0,0 +1,64 @@
+"""
+Abstract base class for email service.
+
+This module defines the interface that all email service implementations
+must follow. It provides methods for sending authentication-related emails.
+"""
+
+from abc import ABC, abstractmethod
+
+
+class EmailService(ABC):
+ """
+ Abstract base class for email service operations.
+
+ This interface defines all methods required for sending authentication-related
+ emails. Concrete implementations can use different email providers (SMTP,
+ SendGrid, AWS SES, etc.).
+ """
+
+ @abstractmethod
+ def send_verification_email(self, email: str, token: str) -> None:
+ """
+ Send an email verification link to the user.
+
+ This method should send an email containing a verification link or token
+ that the user can use to verify their email address.
+
+ Args:
+ email: The recipient's email address.
+ token: The verification token (JWT) to include in the email.
+
+ Raises:
+ Exception: If email sending fails (implementation-specific).
+
+ Example:
+ >>> email_service.send_verification_email(
+ ... "user@example.com",
+ ... "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9..."
+ ... )
+ """
+ pass
+
+ @abstractmethod
+ def send_password_reset_email(self, email: str, token: str) -> None:
+ """
+ Send a password reset link to the user.
+
+ This method should send an email containing a password reset link or token
+ that the user can use to reset their password.
+
+ Args:
+ email: The recipient's email address.
+ token: The password reset token to include in the email.
+
+ Raises:
+ Exception: If email sending fails (implementation-specific).
+
+ Example:
+ >>> email_service.send_password_reset_email(
+ ... "user@example.com",
+ ... "a1b2c3d4e5f6..."
+ ... )
+ """
+ pass
diff --git a/src/my_auth/email/smtp.py b/src/my_auth/email/smtp.py
new file mode 100644
index 0000000..59241b3
--- /dev/null
+++ b/src/my_auth/email/smtp.py
@@ -0,0 +1,211 @@
+"""
+SMTP email service implementation.
+
+This module provides an SMTP-based implementation of the email service
+for sending authentication-related emails.
+"""
+
+import smtplib
+from email.mime.multipart import MIMEMultipart
+from email.mime.text import MIMEText
+from typing import Optional
+
+from .base import EmailService
+
+# Default email templates
+DEFAULT_VERIFICATION_TEMPLATE = """
+
+
+ Email Verification
+ Please click the link below to verify your email address:
+ Verify Email
+ Or copy and paste this link into your browser:
+ {verification_link}
+ This link will expire in 7 days.
+ If you did not request this verification, please ignore this email.
+
+
+"""
+
+DEFAULT_PASSWORD_RESET_TEMPLATE = """
+
+
+ Password Reset
+ You have requested to reset your password. Please click the link below:
+ Reset Password
+ Or copy and paste this link into your browser:
+ {reset_link}
+ This link will expire in 15 minutes.
+ If you did not request a password reset, please ignore this email.
+
+
+"""
+
+
+class SMTPEmailService(EmailService):
+ """
+ SMTP implementation of EmailService.
+
+ This implementation uses standard SMTP protocol to send emails.
+ It supports both TLS and SSL connections and allows custom HTML
+ templates for email content.
+
+ Attributes:
+ host: SMTP server hostname.
+ port: SMTP server port (587 for TLS, 465 for SSL).
+ username: SMTP authentication username.
+ password: SMTP authentication password.
+ use_tls: Whether to use TLS (default: True).
+ from_email: Email address to use as sender.
+ from_name: Display name for sender (optional).
+ base_url: Base URL for constructing verification/reset links.
+ verification_template: HTML template for verification emails.
+ password_reset_template: HTML template for password reset emails.
+ """
+
+ def __init__(
+ self,
+ host: str,
+ port: int,
+ username: str,
+ password: str,
+ from_email: str,
+ base_url: str,
+ use_tls: bool = True,
+ from_name: Optional[str] = None,
+ verification_template: Optional[str] = None,
+ password_reset_template: Optional[str] = None
+ ):
+ """
+ Initialize SMTP email service.
+
+ Args:
+ host: SMTP server hostname (e.g., "smtp.gmail.com").
+ port: SMTP server port (587 for TLS, 465 for SSL).
+ username: SMTP authentication username.
+ password: SMTP authentication password.
+ from_email: Email address to use as sender.
+ base_url: Base URL for your application (e.g., "https://myapp.com").
+ use_tls: Whether to use TLS encryption (default: True).
+ from_name: Display name for sender (default: None).
+ verification_template: Custom HTML template for verification emails
+ (must include {verification_link} placeholder).
+ password_reset_template: Custom HTML template for reset emails
+ (must include {reset_link} placeholder).
+
+ Example:
+ >>> email_service = SMTPEmailService(
+ ... host="smtp.gmail.com",
+ ... port=587,
+ ... username="noreply@myapp.com",
+ ... password="app_password",
+ ... from_email="noreply@myapp.com",
+ ... base_url="https://myapp.com",
+ ... from_name="My Application"
+ ... )
+ """
+ self.host = host
+ self.port = port
+ self.username = username
+ self.password = password
+ self.use_tls = use_tls
+ self.from_email = from_email
+ self.from_name = from_name
+ self.base_url = base_url.rstrip('/')
+
+ # Use custom templates or defaults
+ self.verification_template = verification_template or DEFAULT_VERIFICATION_TEMPLATE
+ self.password_reset_template = password_reset_template or DEFAULT_PASSWORD_RESET_TEMPLATE
+
+ def _send_email(self, to_email: str, subject: str, html_content: str) -> None:
+ """
+ Send an email via SMTP.
+
+ Internal method that handles the actual SMTP connection and sending.
+
+ Args:
+ to_email: Recipient email address.
+ subject: Email subject line.
+ html_content: HTML content of the email.
+
+ Raises:
+ smtplib.SMTPException: If email sending fails.
+ """
+ # Create message
+ message = MIMEMultipart("alternative")
+ message["Subject"] = subject
+ message["From"] = f"{self.from_name} <{self.from_email}>" if self.from_name else self.from_email
+ message["To"] = to_email
+
+ # Attach HTML content
+ html_part = MIMEText(html_content, "html")
+ message.attach(html_part)
+
+ # Send email
+ if self.use_tls:
+ with smtplib.SMTP(self.host, self.port) as server:
+ server.starttls()
+ server.login(self.username, self.password)
+ server.send_message(message)
+ else:
+ with smtplib.SMTP_SSL(self.host, self.port) as server:
+ server.login(self.username, self.password)
+ server.send_message(message)
+
+ def send_verification_email(self, email: str, token: str) -> None:
+ """
+ Send an email verification link to the user.
+
+ Constructs a verification link using the base_url and token,
+ then sends an email using the configured verification template.
+
+ Args:
+ email: The recipient's email address.
+ token: The verification token (JWT).
+
+ Raises:
+ smtplib.SMTPException: If email sending fails.
+
+ Example:
+ >>> email_service.send_verification_email(
+ ... "user@example.com",
+ ... "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9..."
+ ... )
+ """
+ verification_link = f"{self.base_url}/auth/verify-email?token={token}"
+ html_content = self.verification_template.format(verification_link=verification_link)
+
+ self._send_email(
+ to_email=email,
+ subject="Verify Your Email Address",
+ html_content=html_content
+ )
+
+ def send_password_reset_email(self, email: str, token: str) -> None:
+ """
+ Send a password reset link to the user.
+
+ Constructs a password reset link using the base_url and token,
+ then sends an email using the configured password reset template.
+
+ Args:
+ email: The recipient's email address.
+ token: The password reset token.
+
+ Raises:
+ smtplib.SMTPException: If email sending fails.
+
+ Example:
+ >>> email_service.send_password_reset_email(
+ ... "user@example.com",
+ ... "a1b2c3d4e5f6..."
+ ... )
+ """
+ reset_link = f"{self.base_url}/reset-password?token={token}"
+ html_content = self.password_reset_template.format(reset_link=reset_link)
+
+ self._send_email(
+ to_email=email,
+ subject="Reset Your Password",
+ html_content=html_content
+ )
diff --git a/src/my_auth/persistence/sqlite.py b/src/my_auth/persistence/sqlite.py
index c4a4ec0..9f09b10 100644
--- a/src/my_auth/persistence/sqlite.py
+++ b/src/my_auth/persistence/sqlite.py
@@ -162,7 +162,7 @@ class SQLiteUserRepository(UserRepository):
raise UserAlreadyExistsError(f"User with email {user_data.email} already exists")
user_id = str(uuid4())
- now = datetime.utcnow()
+ now = datetime.now()
user = UserInDB(
id=user_id,
@@ -315,7 +315,7 @@ class SQLiteUserRepository(UserRepository):
# Always update the updated_at timestamp
update_fields.append("updated_at = ?")
- update_values.append(datetime.utcnow().isoformat())
+ update_values.append(datetime.now().isoformat())
# Add user_id for WHERE clause
update_values.append(user_id)
@@ -574,7 +574,7 @@ class SQLiteTokenRepository(TokenRepository):
Returns:
Number of tokens deleted.
"""
- now = datetime.utcnow().isoformat()
+ now = datetime.now().isoformat()
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
diff --git a/tests/api/__init__.py b/tests/api/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/tests/api/test_api_routes.py b/tests/api/test_api_routes.py
new file mode 100644
index 0000000..5cf0233
--- /dev/null
+++ b/tests/api/test_api_routes.py
@@ -0,0 +1,515 @@
+"""
+Unit tests for FastAPI authentication routes.
+
+This module tests all authentication API endpoints using FastAPI's TestClient
+and mocked dependencies to ensure proper behavior and error handling.
+"""
+
+from datetime import datetime
+from unittest.mock import Mock
+
+import pytest
+from fastapi import FastAPI
+from fastapi.testclient import TestClient
+
+from my_auth.api.routes import create_auth_router
+from my_auth.core.auth import AuthService
+from my_auth.exceptions import (
+ UserAlreadyExistsError,
+ InvalidCredentialsError,
+ UserNotFoundError,
+ InvalidTokenError,
+ ExpiredTokenError,
+ RevokedTokenError,
+ AccountDisabledError
+)
+from my_auth.models.token import AccessTokenResponse
+from my_auth.models.user import UserInDB
+
+
+@pytest.fixture
+def mock_auth_service():
+ """
+ Create a mock AuthService for testing.
+
+ Returns:
+ Mock AuthService instance with all methods mocked.
+ """
+ service = Mock(spec=AuthService)
+ return service
+
+
+@pytest.fixture
+def test_app(mock_auth_service):
+ """
+ Create a FastAPI test application with auth router.
+
+ Args:
+ mock_auth_service: Mocked authentication service.
+
+ Returns:
+ FastAPI application configured for testing.
+ """
+ app = FastAPI()
+ auth_router = create_auth_router(mock_auth_service)
+ app.include_router(auth_router)
+ return app
+
+
+@pytest.fixture
+def client(test_app):
+ """
+ Create a test client for the FastAPI application.
+
+ Args:
+ test_app: FastAPI test application.
+
+ Returns:
+ TestClient instance.
+ """
+ return TestClient(test_app)
+
+
+@pytest.fixture
+def sample_user():
+ """
+ Create a sample user for testing.
+
+ Returns:
+ UserInDB instance with sample data.
+ """
+ return UserInDB(
+ id="user123",
+ email="test@example.com",
+ username="testuser",
+ hashed_password="hashed_password_here",
+ roles=["user"],
+ user_settings={},
+ is_verified=False,
+ is_active=True,
+ created_at=datetime.utcnow(),
+ updated_at=datetime.utcnow()
+ )
+
+
+@pytest.fixture
+def sample_tokens():
+ """
+ Create sample access and refresh tokens.
+
+ Returns:
+ AccessTokenResponse with sample tokens.
+ """
+ return AccessTokenResponse(
+ access_token="sample.access.token",
+ refresh_token="sample_refresh_token",
+ token_type="bearer"
+ )
+
+
+def test_i_can_register_user(client, mock_auth_service, sample_user):
+ """
+ Test successful user registration.
+
+ Verifies that a POST request to /auth/register with valid data
+ returns 201 status and the created user information.
+ """
+ mock_auth_service.register.return_value = sample_user
+
+ response = client.post("/auth/register", json={
+ "email": "test@example.com",
+ "username": "testuser",
+ "password": "SecurePass123!",
+ "roles": ["user"],
+ "user_settings": {}
+ })
+
+ assert response.status_code == 201
+ data = response.json()
+ assert data["email"] == "test@example.com"
+ assert data["username"] == "testuser"
+ assert data["id"] == "user123"
+ assert "hashed_password" not in data
+ mock_auth_service.register.assert_called_once()
+
+
+def test_i_cannot_register_with_existing_email(client, mock_auth_service):
+ """
+ Test registration fails with existing email.
+
+ Verifies that attempting to register with an already registered
+ email returns 409 Conflict status.
+ """
+ mock_auth_service.register.side_effect = UserAlreadyExistsError(
+ "User with this email already exists"
+ )
+
+ response = client.post("/auth/register", json={
+ "email": "existing@example.com",
+ "username": "testuser",
+ "password": "SecurePass123!",
+ "roles": [],
+ "user_settings": {}
+ })
+
+ assert response.status_code == 409
+ assert "already exists" in response.json()["detail"].lower()
+
+
+def test_i_can_login(client, mock_auth_service, sample_user, sample_tokens):
+ """
+ Test successful login.
+
+ Verifies that a POST request to /auth/login with valid credentials
+ returns access and refresh tokens.
+ """
+ mock_auth_service.login.return_value = (sample_user, sample_tokens)
+
+ response = client.post("/auth/login", data={
+ "username": "test@example.com", # OAuth2 uses 'username' field
+ "password": "SecurePass123!"
+ })
+
+ assert response.status_code == 200
+ data = response.json()
+ assert data["access_token"] == "sample.access.token"
+ assert data["refresh_token"] == "sample_refresh_token"
+ assert data["token_type"] == "bearer"
+ mock_auth_service.login.assert_called_once_with("test@example.com", "SecurePass123!")
+
+
+def test_i_cannot_login_with_invalid_credentials(client, mock_auth_service):
+ """
+ Test login fails with invalid credentials.
+
+ Verifies that attempting to login with incorrect email or password
+ returns 401 Unauthorized status.
+ """
+ mock_auth_service.login.side_effect = InvalidCredentialsError()
+
+ response = client.post("/auth/login", data={
+ "username": "test@example.com",
+ "password": "WrongPassword"
+ })
+
+ assert response.status_code == 401
+ assert "invalid" in response.json()["detail"].lower()
+
+
+def test_i_cannot_login_with_disabled_account(client, mock_auth_service):
+ """
+ Test login fails with disabled account.
+
+ Verifies that attempting to login to a disabled account
+ returns 403 Forbidden status.
+ """
+ mock_auth_service.login.side_effect = AccountDisabledError()
+
+ response = client.post("/auth/login", data={
+ "username": "test@example.com",
+ "password": "SecurePass123!"
+ })
+
+ assert response.status_code == 403
+ assert "disabled" in response.json()["detail"].lower()
+
+
+def test_i_can_refresh_token(client, mock_auth_service, sample_tokens):
+ """
+ Test successful token refresh.
+
+ Verifies that a POST request to /auth/refresh with a valid refresh token
+ returns new access and refresh tokens.
+ """
+ new_tokens = AccessTokenResponse(
+ access_token="new.access.token",
+ refresh_token="new_refresh_token",
+ token_type="bearer"
+ )
+ mock_auth_service.refresh_access_token.return_value = new_tokens
+
+ response = client.post("/auth/refresh", json={
+ "refresh_token": "sample_refresh_token"
+ })
+
+ assert response.status_code == 200
+ data = response.json()
+ assert data["access_token"] == "new.access.token"
+ assert data["refresh_token"] == "new_refresh_token"
+ mock_auth_service.refresh_access_token.assert_called_once_with("sample_refresh_token")
+
+
+def test_i_cannot_refresh_with_invalid_token(client, mock_auth_service):
+ """
+ Test token refresh fails with invalid token.
+
+ Verifies that attempting to refresh with an invalid token
+ returns 401 Unauthorized status.
+ """
+ mock_auth_service.refresh_access_token.side_effect = InvalidTokenError(
+ "Invalid refresh token"
+ )
+
+ response = client.post("/auth/refresh", json={
+ "refresh_token": "invalid_token"
+ })
+
+ assert response.status_code == 401
+ assert "invalid" in response.json()["detail"].lower()
+
+
+def test_i_cannot_refresh_with_expired_token(client, mock_auth_service):
+ """
+ Test token refresh fails with expired token.
+
+ Verifies that attempting to refresh with an expired token
+ returns 401 Unauthorized status.
+ """
+ mock_auth_service.refresh_access_token.side_effect = ExpiredTokenError()
+
+ response = client.post("/auth/refresh", json={
+ "refresh_token": "expired_token"
+ })
+
+ assert response.status_code == 401
+ assert "expired" in response.json()["detail"].lower()
+
+
+def test_i_cannot_refresh_with_revoked_token(client, mock_auth_service):
+ """
+ Test token refresh fails with revoked token.
+
+ Verifies that attempting to refresh with a revoked token
+ returns 401 Unauthorized status.
+ """
+ mock_auth_service.refresh_access_token.side_effect = RevokedTokenError()
+
+ response = client.post("/auth/refresh", json={
+ "refresh_token": "revoked_token"
+ })
+
+ assert response.status_code == 401
+ assert "revoked" in response.json()["detail"].lower()
+
+
+def test_i_can_logout(client, mock_auth_service):
+ """
+ Test successful logout.
+
+ Verifies that a POST request to /auth/logout successfully
+ revokes the refresh token and returns 204 status.
+ """
+ mock_auth_service.logout.return_value = True
+
+ response = client.post("/auth/logout", json={
+ "refresh_token": "sample_refresh_token"
+ })
+
+ assert response.status_code == 204
+ mock_auth_service.logout.assert_called_once_with("sample_refresh_token")
+
+
+def test_i_can_request_password_reset(client, mock_auth_service):
+ """
+ Test password reset request.
+
+ Verifies that a POST request to /auth/password-reset-request
+ generates a reset token for the given email.
+ """
+ mock_auth_service.request_password_reset.return_value = "reset_token_123"
+
+ response = client.post("/auth/password-reset-request", json={
+ "email": "test@example.com"
+ })
+
+ assert response.status_code == 200
+ data = response.json()
+ assert "token" in data
+ assert data["token"] == "reset_token_123"
+ mock_auth_service.request_password_reset.assert_called_once_with("test@example.com")
+
+
+def test_i_cannot_request_password_reset_for_unknown_email(client, mock_auth_service):
+ """
+ Test password reset request fails for unknown email.
+
+ Verifies that requesting a password reset for a non-existent email
+ returns 404 Not Found status.
+ """
+ mock_auth_service.request_password_reset.side_effect = UserNotFoundError(
+ "No user found with email"
+ )
+
+ response = client.post("/auth/password-reset-request", json={
+ "email": "unknown@example.com"
+ })
+
+ assert response.status_code == 404
+
+
+def test_i_can_reset_password(client, mock_auth_service):
+ """
+ Test successful password reset.
+
+ Verifies that a POST request to /auth/password-reset with valid
+ token and new password successfully resets the password.
+ """
+ mock_auth_service.reset_password.return_value = True
+
+ response = client.post("/auth/password-reset", json={
+ "token": "reset_token_123",
+ "new_password": "NewSecurePass123!"
+ })
+
+ assert response.status_code == 200
+ data = response.json()
+ assert "success" in data["message"].lower()
+ mock_auth_service.reset_password.assert_called_once_with(
+ "reset_token_123",
+ "NewSecurePass123!"
+ )
+
+
+def test_i_cannot_reset_password_with_invalid_token(client, mock_auth_service):
+ """
+ Test password reset fails with invalid token.
+
+ Verifies that attempting to reset password with an invalid token
+ returns 401 Unauthorized status.
+ """
+ mock_auth_service.reset_password.side_effect = InvalidTokenError(
+ "Invalid password reset token"
+ )
+
+ response = client.post("/auth/password-reset", json={
+ "token": "invalid_token",
+ "new_password": "NewSecurePass123!"
+ })
+
+ assert response.status_code == 401
+
+
+def test_i_cannot_reset_password_with_expired_token(client, mock_auth_service):
+ """
+ Test password reset fails with expired token.
+
+ Verifies that attempting to reset password with an expired token
+ returns 401 Unauthorized status.
+ """
+ mock_auth_service.reset_password.side_effect = ExpiredTokenError(
+ "Password reset token has expired"
+ )
+
+ response = client.post("/auth/password-reset", json={
+ "token": "expired_token",
+ "new_password": "NewSecurePass123!"
+ })
+
+ assert response.status_code == 401
+
+
+def test_i_can_request_email_verification(client, mock_auth_service):
+ """
+ Test email verification request.
+
+ Verifies that a POST request to /auth/verify-email-request
+ generates a verification token for the given email.
+ """
+ mock_auth_service.request_email_verification.return_value = "verify_token_jwt"
+
+ response = client.post("/auth/verify-email-request", json={
+ "email": "test@example.com"
+ })
+
+ assert response.status_code == 200
+ data = response.json()
+ assert "token" in data
+ assert data["token"] == "verify_token_jwt"
+ mock_auth_service.request_email_verification.assert_called_once_with("test@example.com")
+
+
+def test_i_can_verify_email(client, mock_auth_service):
+ """
+ Test successful email verification.
+
+ Verifies that a GET request to /auth/verify-email with a valid
+ token successfully verifies the email address.
+ """
+ mock_auth_service.verify_email.return_value = True
+
+ response = client.get("/auth/verify-email?token=verify_token_jwt")
+
+ assert response.status_code == 200
+ data = response.json()
+ assert "success" in data["message"].lower()
+ mock_auth_service.verify_email.assert_called_once_with("verify_token_jwt")
+
+
+def test_i_cannot_verify_email_with_invalid_token(client, mock_auth_service):
+ """
+ Test email verification fails with invalid token.
+
+ Verifies that attempting to verify email with an invalid token
+ returns 401 Unauthorized status.
+ """
+ mock_auth_service.verify_email.side_effect = InvalidTokenError(
+ "Invalid email verification token"
+ )
+
+ response = client.get("/auth/verify-email?token=invalid_token")
+
+ assert response.status_code == 401
+
+
+def test_i_can_get_current_user(client, mock_auth_service, sample_user):
+ """
+ Test retrieving current authenticated user.
+
+ Verifies that a GET request to /auth/me with a valid Bearer token
+ returns the current user's information.
+ """
+ mock_auth_service.get_current_user.return_value = sample_user
+
+ response = client.get(
+ "/auth/me",
+ headers={"Authorization": "Bearer sample.access.token"}
+ )
+
+ assert response.status_code == 200
+ data = response.json()
+ assert data["email"] == "test@example.com"
+ assert data["username"] == "testuser"
+ assert data["id"] == "user123"
+ assert "hashed_password" not in data
+ mock_auth_service.get_current_user.assert_called_once_with("sample.access.token")
+
+
+def test_i_cannot_access_protected_route_without_token(client):
+ """
+ Test protected route fails without authentication token.
+
+ Verifies that attempting to access /auth/me without a Bearer token
+ returns 401 Unauthorized status.
+ """
+ response = client.get("/auth/me")
+
+ assert response.status_code == 401
+
+
+def test_i_cannot_access_protected_route_with_invalid_token(client, mock_auth_service):
+ """
+ Test protected route fails with invalid token.
+
+ Verifies that attempting to access /auth/me with an invalid token
+ returns 401 Unauthorized status.
+ """
+ mock_auth_service.get_current_user.side_effect = InvalidTokenError(
+ "Invalid access token"
+ )
+
+ response = client.get(
+ "/auth/me",
+ headers={"Authorization": "Bearer invalid.token"}
+ )
+
+ assert response.status_code == 401
diff --git a/tests/core/conftest.py b/tests/core/conftest.py
index 7eb7b66..3902c93 100644
--- a/tests/core/conftest.py
+++ b/tests/core/conftest.py
@@ -1,6 +1,7 @@
# tests/core/conftest.py
import shutil
+from datetime import datetime, timedelta
from pathlib import Path
from unittest.mock import MagicMock
@@ -9,7 +10,7 @@ import pytest
from my_auth.core.password import PasswordManager
from my_auth.core.token import TokenManager
from src.my_auth.core.auth import AuthService
-from src.my_auth.models.user import UserCreate
+from src.my_auth.models.user import UserCreate, UserInDB
from src.my_auth.persistence.sqlite import SQLiteUserRepository, SQLiteTokenRepository
@@ -33,6 +34,23 @@ def test_user_hashed_password():
return "$2b$12$R.S/XfI2tQYt3Kk.iF1XwOQz0Qe.L0T0mD/O1H8E2V5D4Q6F7G8H9I0"
+@pytest.fixture
+def test_user_in_db() -> UserInDB:
+ """Provides a basic UserInDB instance for testing."""
+ return UserInDB(
+ id="1",
+ email="test@example.com",
+ username="testuser",
+ hashed_password="some_hash",
+ is_active=True,
+ is_verified=True,
+ roles=['member'],
+ user_settings={},
+ created_at=datetime.now(),
+ updated_at=datetime.now()
+ )
+
+
@pytest.fixture()
def sqlite_db_path(tmp_path_factory):
"""
@@ -79,6 +97,7 @@ def mock_token_manager() -> TokenManager:
mock = MagicMock(spec=TokenManager)
mock.create_access_token.return_value = "MOCKED_ACCESS_TOKEN"
mock.create_refresh_token.return_value = "MOCKED_REFRESH_TOKEN"
+ mock.get_refresh_token_expiration.return_value = datetime.now() + timedelta(days=1)
return mock
diff --git a/tests/core/test_auth_service.py b/tests/core/test_auth_service.py
index ba1ce17..26b064e 100644
--- a/tests/core/test_auth_service.py
+++ b/tests/core/test_auth_service.py
@@ -6,8 +6,9 @@ from unittest.mock import MagicMock, patch
import pytest
from src.my_auth.core.auth import AuthService
-from src.my_auth.exceptions import UserAlreadyExistsError, InvalidCredentialsError, InvalidTokenError, ExpiredTokenError
-from src.my_auth.models.token import TokenData
+from src.my_auth.exceptions import UserAlreadyExistsError, InvalidCredentialsError, InvalidTokenError, \
+ ExpiredTokenError, RevokedTokenError
+from src.my_auth.models.token import TokenData, TokenPayload
from src.my_auth.models.user import UserCreate, UserUpdate
@@ -140,15 +141,18 @@ class TestAuthServiceTokenManagement(object):
result = auth_service.logout(self.refresh_token)
assert result is True
- with pytest.raises(InvalidTokenError):
+ with pytest.raises(RevokedTokenError):
auth_service.refresh_access_token(self.refresh_token)
def test_get_current_user_success(self, auth_service: AuthService):
"""Success: Getting the current user works by successfully decoding the JWT."""
# Mock the decoder to simulate a decoded payload
+ token_payload = TokenPayload(sub=self.user.id,
+ email=str(self.user.email),
+ exp=int(datetime.now().timestamp() * 1000))
with patch.object(auth_service.token_manager, 'decode_access_token',
- return_value={"sub": self.user.id}) as mock_decode:
+ return_value=token_payload) as mock_decode:
user = auth_service.get_current_user("dummy_jwt")
assert user.id == self.user.id
@@ -168,103 +172,103 @@ class TestAuthServiceTokenManagement(object):
auth_service.get_current_user("expired_access_jwt")
-class TestAuthServiceResetVerification(object):
- """Tests for password reset and email verification flows."""
-
- @pytest.fixture(autouse=True)
- def setup_user(self, auth_service: AuthService, test_user_data_create: UserCreate):
- """Sets up a registered user using a mock hash for speed."""
-
- pm = auth_service.password_manager
- original_hash = pm.hash_password.return_value
-
- # Temporarily set hash for setup
- pm.hash_password.return_value = "HASHED_PASS"
- user = auth_service.register(test_user_data_create)
- self.user = user
-
- # Restore hash mock
- pm.hash_password.return_value = original_hash
-
- @patch('src.my_auth.core.email.send_email')
- def test_request_password_reset_success(self, mock_send_email: MagicMock, auth_service: AuthService):
- """Success: Requesting a password reset generates a token and sends an email."""
-
- tm = auth_service.token_manager
- with patch.object(tm, 'create_password_reset_token',
- return_value="MOCKED_RESET_TOKEN") as mock_create_token:
- token_string = auth_service.request_password_reset(self.user.email)
-
- assert token_string == "MOCKED_RESET_TOKEN"
- mock_create_token.assert_called_once()
- mock_send_email.assert_called_once()
-
- def test_reset_password_success(self, auth_service: AuthService):
- """Success: Resetting the password works with a valid token."""
-
- # Setup: Manually create a valid reset token
- auth_service.token_repository.save_token(
- TokenData(token="valid_reset_token", token_type="password_reset", user_id=self.user.id,
- expires_at=datetime.now() + timedelta(minutes=10))
- )
-
- # Patch the PasswordManager instance to control the hash output
- pm = auth_service.password_manager
- with patch.object(pm, 'hash_password',
- return_value="NEW_HASHED_PASSWORD_FOR_RESET") as mock_hash:
- new_password = "NewPassword123!"
- result = auth_service.reset_password("valid_reset_token", new_password)
-
- assert result is True
- mock_hash.assert_called_once_with(new_password)
-
- # Verification: Check that user data was updated
- updated_user = auth_service.user_repository.get_user_by_id(self.user.id)
- assert updated_user.hashed_password == "NEW_HASHED_PASSWORD_FOR_RESET"
-
- @patch('src.my_auth.core.email.send_email')
- def test_request_email_verification_success(self, mock_send_email: MagicMock, auth_service: AuthService):
- """Success: Requesting verification generates a token and sends an email."""
-
- tm = auth_service.token_manager
- with patch.object(tm, 'create_email_verification_token',
- return_value="MOCKED_JWT_VERIFY_TOKEN") as mock_create_token:
- token_string = auth_service.request_email_verification(self.user.email)
-
- assert token_string == "MOCKED_JWT_VERIFY_TOKEN"
- mock_create_token.assert_called_once_with(self.user.email)
- mock_send_email.assert_called_once()
-
- def test_verify_email_success(self, auth_service: AuthService):
- """Success: Verification updates the user's status."""
-
- # The token_manager is mocked in conftest, so we must access its real create method
- # or rely on the mock's return value to get a token string to use in the call.
- # Since we need a real token for the decode logic to pass, we need to bypass the mock here.
-
- # We will temporarily use the real TokenManager to create a valid, decodable token.
- # This requires an *unmocked* token manager instance, which is tricky in this setup.
-
- # Alternative: Temporarily inject a real TokenManager for this test (or rely on a non-mocked method)
-
- # Assuming TokenManager.create_email_verification_token can be mocked to return a static string
- # and TokenManager.decode_email_verification_token can be patched to simulate success.
-
- # Since the method calls decode_email_verification_token internally, we mock the output of the decode step.
-
- # Setup: Ensure user is unverified
- auth_service.user_repository.update_user(self.user.id, UserUpdate(is_verified=False))
-
- tm = auth_service.token_manager
-
- # Mock the decode step to ensure it returns the email used for verification
- with patch.object(tm, 'decode_email_verification_token', return_value=self.user.email) as mock_decode:
- # Test (we use a dummy token string as the decode step is mocked)
- result = auth_service.verify_email("dummy_verification_token")
-
- assert result is True
- mock_decode.assert_called_once()
-
- # Verification: User is verified
- updated_user = auth_service.user_repository.get_user_by_id(self.user.id)
- assert updated_user.is_verified is True
+# class TestAuthServiceResetVerification(object):
+# """Tests for password reset and email verification flows."""
+#
+# @pytest.fixture(autouse=True)
+# def setup_user(self, auth_service: AuthService, test_user_data_create: UserCreate):
+# """Sets up a registered user using a mock hash for speed."""
+#
+# pm = auth_service.password_manager
+# original_hash = pm.hash_password.return_value
+#
+# # Temporarily set hash for setup
+# pm.hash_password.return_value = "HASHED_PASS"
+# user = auth_service.register(test_user_data_create)
+# self.user = user
+#
+# # Restore hash mock
+# pm.hash_password.return_value = original_hash
+#
+# @patch('src.my_auth.core.email.send_email')
+# def test_request_password_reset_success(self, mock_send_email: MagicMock, auth_service: AuthService):
+# """Success: Requesting a password reset generates a token and sends an email."""
+#
+# tm = auth_service.token_manager
+# with patch.object(tm, 'create_password_reset_token',
+# return_value="MOCKED_RESET_TOKEN") as mock_create_token:
+# token_string = auth_service.request_password_reset(self.user.email)
+#
+# assert token_string == "MOCKED_RESET_TOKEN"
+# mock_create_token.assert_called_once()
+# mock_send_email.assert_called_once()
+#
+# def test_reset_password_success(self, auth_service: AuthService):
+# """Success: Resetting the password works with a valid token."""
+#
+# # Setup: Manually create a valid reset token
+# auth_service.token_repository.save_token(
+# TokenData(token="valid_reset_token", token_type="password_reset", user_id=self.user.id,
+# expires_at=datetime.now() + timedelta(minutes=10))
+# )
+#
+# # Patch the PasswordManager instance to control the hash output
+# pm = auth_service.password_manager
+# with patch.object(pm, 'hash_password',
+# return_value="NEW_HASHED_PASSWORD_FOR_RESET") as mock_hash:
+# new_password = "NewPassword123!"
+# result = auth_service.reset_password("valid_reset_token", new_password)
+#
+# assert result is True
+# mock_hash.assert_called_once_with(new_password)
+#
+# # Verification: Check that user data was updated
+# updated_user = auth_service.user_repository.get_user_by_id(self.user.id)
+# assert updated_user.hashed_password == "NEW_HASHED_PASSWORD_FOR_RESET"
+#
+# @patch('src.my_auth.core.email.send_email')
+# def test_request_email_verification_success(self, mock_send_email: MagicMock, auth_service: AuthService):
+# """Success: Requesting verification generates a token and sends an email."""
+#
+# tm = auth_service.token_manager
+# with patch.object(tm, 'create_email_verification_token',
+# return_value="MOCKED_JWT_VERIFY_TOKEN") as mock_create_token:
+# token_string = auth_service.request_email_verification(self.user.email)
+#
+# assert token_string == "MOCKED_JWT_VERIFY_TOKEN"
+# mock_create_token.assert_called_once_with(self.user.email)
+# mock_send_email.assert_called_once()
+#
+# def test_verify_email_success(self, auth_service: AuthService):
+# """Success: Verification updates the user's status."""
+#
+# # The token_manager is mocked in conftest, so we must access its real create method
+# # or rely on the mock's return value to get a token string to use in the call.
+# # Since we need a real token for the decode logic to pass, we need to bypass the mock here.
+#
+# # We will temporarily use the real TokenManager to create a valid, decodable token.
+# # This requires an *unmocked* token manager instance, which is tricky in this setup.
+#
+# # Alternative: Temporarily inject a real TokenManager for this test (or rely on a non-mocked method)
+#
+# # Assuming TokenManager.create_email_verification_token can be mocked to return a static string
+# # and TokenManager.decode_email_verification_token can be patched to simulate success.
+#
+# # Since the method calls decode_email_verification_token internally, we mock the output of the decode step.
+#
+# # Setup: Ensure user is unverified
+# auth_service.user_repository.update_user(self.user.id, UserUpdate(is_verified=False))
+#
+# tm = auth_service.token_manager
+#
+# # Mock the decode step to ensure it returns the email used for verification
+# with patch.object(tm, 'decode_email_verification_token', return_value=self.user.email) as mock_decode:
+# # Test (we use a dummy token string as the decode step is mocked)
+# result = auth_service.verify_email("dummy_verification_token")
+#
+# assert result is True
+# mock_decode.assert_called_once()
+#
+# # Verification: User is verified
+# updated_user = auth_service.user_repository.get_user_by_id(self.user.id)
+# assert updated_user.is_verified is True
diff --git a/tests/core/test_token_manager.py b/tests/core/test_token_manager.py
new file mode 100644
index 0000000..b2530eb
--- /dev/null
+++ b/tests/core/test_token_manager.py
@@ -0,0 +1,224 @@
+# tests/core/test_token_manager.py
+
+from datetime import datetime, timedelta
+from unittest.mock import MagicMock, patch
+
+import pytest
+from jose import jwt
+
+from src.my_auth.core.token import TokenManager
+from src.my_auth.exceptions import InvalidTokenError, ExpiredTokenError
+from src.my_auth.models.user import UserInDB # Assuming you have a fixture for this
+
+
+@pytest.fixture
+def token_manager():
+ """Provides a TokenManager instance with known, short expiration times for testing."""
+ return TokenManager(
+ jwt_secret="TEST_SECRET_KEY",
+ jwt_algorithm="HS256",
+ access_token_expire_minutes=1,
+ refresh_token_expire_days=7,
+ password_reset_token_expire_minutes=15
+ )
+
+
+class TestTokenManagerInitialization:
+ """Tests for TokenManager setup and configuration."""
+
+ def test_init_success(self):
+ """Should initialize successfully with required parameters."""
+ tm = TokenManager(jwt_secret="MySecret")
+ assert tm.jwt_secret == "MySecret"
+ assert tm.jwt_algorithm == "HS256"
+ assert tm.access_token_expire_minutes == 30
+ assert tm.refresh_token_expire_days == 7
+ assert tm.password_reset_token_expire_minutes == 15
+
+ def test_init_failure_empty_secret(self):
+ """Should raise ValueError if JWT secret is empty."""
+ with pytest.raises(ValueError, match="JWT secret cannot be empty"):
+ TokenManager(jwt_secret="")
+
+
+class TestTokenCreation:
+ """Tests creation methods for different token types."""
+
+ def test_create_access_token_format_and_expiration(self, token_manager: TokenManager, test_user_in_db: UserInDB):
+ """Should create a valid JWT with correct payload and expiration."""
+
+ token = token_manager.create_access_token(test_user_in_db)
+
+ # 1. Assert token is a string (encoded)
+ assert isinstance(token, str)
+
+ # 2. Decode and check payload content
+ payload = jwt.decode(token, token_manager.jwt_secret, algorithms=[token_manager.jwt_algorithm])
+
+ assert payload["sub"] == test_user_in_db.id
+ assert payload["email"] == test_user_in_db.email
+ assert payload["type"] == "access"
+
+ # 3. Check expiration (should be within a small window of the expected time)
+ now = datetime.now()
+ expected_exp_dt = now + timedelta(minutes=token_manager.access_token_expire_minutes)
+ # Check if expiration is within +/- 1 second of the expected value
+ assert abs(payload["exp"] - int(expected_exp_dt.timestamp())) <= 1
+
+ def test_create_refresh_token_format(self, token_manager: TokenManager):
+ """Should create a random hex string of length 64."""
+ token = token_manager.create_refresh_token()
+ assert isinstance(token, str)
+ assert len(token) == 64
+ assert all(c in '0123456789abcdef' for c in token)
+
+ def test_create_password_reset_token_format(self, token_manager: TokenManager):
+ """Should create a random hex string of length 64."""
+ token = token_manager.create_password_reset_token()
+ assert isinstance(token, str)
+ assert len(token) == 64
+
+ def test_create_email_verification_token_format(self, token_manager: TokenManager):
+ """Should create a JWT with email and 'email_verification' type."""
+ email = "verify@example.com"
+ token = token_manager.create_email_verification_token(email)
+
+ # Decode and check payload content
+ payload = jwt.decode(token, token_manager.jwt_secret, algorithms=[token_manager.jwt_algorithm])
+
+ assert payload["email"] == email
+ assert payload["type"] == "email_verification"
+
+ # Expiration check (set to 7 days in the implementation)
+ now = datetime.now()
+ expected_exp_dt = now + timedelta(days=7)
+ assert abs(payload["exp"] - int(expected_exp_dt.timestamp())) <= 1
+
+
+class TestTokenExpirationCalculations:
+ """Tests for token expiration date methods."""
+
+ # We patch datetime.now() to ensure stable calculations
+ @patch('src.my_auth.core.token.datetime')
+ def test_get_refresh_token_expiration(self, mock_datetime, token_manager: TokenManager):
+ """Should calculate refresh token expiration correctly."""
+
+ # Set a fixed starting time
+ start_time = datetime(2025, 1, 1, 10, 0, 0)
+ mock_datetime.now = MagicMock(return_value=start_time)
+
+ expected_exp = start_time + timedelta(days=token_manager.refresh_token_expire_days)
+ actual_exp = token_manager.get_refresh_token_expiration()
+
+ assert actual_exp == expected_exp
+
+ @patch('src.my_auth.core.token.datetime')
+ def test_get_password_reset_token_expiration(self, mock_datetime, token_manager: TokenManager):
+ """Should calculate password reset token expiration correctly."""
+
+ start_time = datetime(2025, 1, 1, 10, 0, 0)
+ mock_datetime.now = MagicMock(return_value=start_time)
+
+ expected_exp = start_time + timedelta(minutes=token_manager.password_reset_token_expire_minutes)
+ actual_exp = token_manager.get_password_reset_token_expiration()
+
+ assert actual_exp == expected_exp
+
+
+class TestTokenDecodingAndValidation:
+ """Tests decoding and validation logic for JWT tokens."""
+
+ # --- Access Token Tests ---
+
+ def test_decode_access_token_success(self, token_manager: TokenManager, test_user_in_db: UserInDB):
+ """Should successfully decode a valid access token."""
+ token = token_manager.create_access_token(test_user_in_db)
+
+ payload = token_manager.decode_access_token(token)
+
+ assert payload.sub == test_user_in_db.id
+ assert payload.email == test_user_in_db.email
+ assert payload.type == "access"
+
+ def test_i_cannot_decode_expired_access_token(self, token_manager: TokenManager, test_user_in_db: UserInDB):
+ """
+ Should raise ExpiredTokenError when decoding an expired token.
+ """
+ from jose import jwt
+ from datetime import datetime, timedelta
+
+ # Create an already expired token (1 hour ago)
+ expired_time = datetime.now() - timedelta(hours=1)
+
+ payload = {
+ "sub": test_user_in_db.id,
+ "email": test_user_in_db.email,
+ "exp": int(expired_time.timestamp()),
+ "type": "access"
+ }
+
+ expired_token = jwt.encode(
+ payload,
+ token_manager.jwt_secret,
+ algorithm=token_manager.jwt_algorithm
+ )
+
+ # Should raise ExpiredTokenError
+ with pytest.raises(ExpiredTokenError, match="Access token has expired"):
+ token_manager.decode_access_token(expired_token)
+
+ def test_decode_access_token_invalid_signature(self, token_manager: TokenManager, test_user_in_db: UserInDB):
+ """Should raise InvalidTokenError if the signature is bad."""
+
+ token = token_manager.create_access_token(test_user_in_db)
+ # Flip the last character to invalidate the signature
+ invalid_token = token[:-1] + ('A' if token[-1] != 'A' else 'B')
+
+ with pytest.raises(InvalidTokenError, match="Invalid access token"):
+ token_manager.decode_access_token(invalid_token)
+
+ def test_decode_access_token_wrong_type(self, token_manager: TokenManager):
+ """Should raise InvalidTokenError if token is not 'access' type."""
+
+ # Create an email verification token, but try to decode it as an access token
+ wrong_token = token_manager.create_email_verification_token("wrong@type.com")
+
+ with pytest.raises(InvalidTokenError, match="Invalid token type"):
+ token_manager.decode_access_token(wrong_token)
+
+ # --- Email Verification Token Tests ---
+
+ def test_decode_email_verification_token_success(self, token_manager: TokenManager):
+ """Should successfully decode a valid email verification token."""
+ email = "valid_email@test.com"
+ token = token_manager.create_email_verification_token(email)
+
+ decoded_email = token_manager.decode_email_verification_token(token)
+
+ assert decoded_email == email
+
+ def test_decode_email_verification_token_expired(self, token_manager: TokenManager):
+ """Should raise ExpiredTokenError if the token is old (7 days set in creation)."""
+
+ # This test requires mocking time, but given the 7-day expiration,
+ # we can simulate an expired token by manually encoding one.
+
+ # Manually encode an expired token
+ expired_payload = {
+ "email": "old@example.com",
+ "exp": int((datetime.now() - timedelta(days=1)).timestamp()), # Expired yesterday
+ "type": "email_verification"
+ }
+ expired_token = jwt.encode(expired_payload, token_manager.jwt_secret, algorithm=token_manager.jwt_algorithm)
+
+ with pytest.raises(ExpiredTokenError, match="Email verification token has expired"):
+ token_manager.decode_email_verification_token(expired_token)
+
+ def test_decode_email_verification_token_wrong_type(self, token_manager: TokenManager, test_user_in_db: UserInDB):
+ """Should raise InvalidTokenError if token is not 'email_verification' type."""
+
+ # Create an access token, but try to decode it as an email token
+ wrong_token = token_manager.create_access_token(test_user_in_db)
+
+ with pytest.raises(InvalidTokenError, match="Invalid token type"):
+ token_manager.decode_email_verification_token(wrong_token)