Compare commits

3 Commits

Author SHA1 Message Date
c3ea80363f Working on document repository 2025-09-18 22:53:51 +02:00
df86a3d998 Fixed docker config. Added services 2025-09-17 22:45:33 +02:00
da63f1b75b Fixed unit tests 2025-09-17 21:24:03 +02:00
21 changed files with 2246 additions and 712 deletions

346
Readme.md
View File

@@ -2,11 +2,14 @@
## Overview ## Overview
MyDocManager is a real-time document processing application that automatically detects files in a monitored directory, processes them asynchronously, and stores the results in a database. The application uses a modern microservices architecture with Redis for task queuing and MongoDB for data persistence. MyDocManager is a real-time document processing application that automatically detects files in a monitored directory,
processes them asynchronously, and stores the results in a database. The application uses a modern microservices
architecture with Redis for task queuing and MongoDB for data persistence.
## Architecture ## Architecture
### Technology Stack ### Technology Stack
- **Backend API**: FastAPI (Python 3.12) - **Backend API**: FastAPI (Python 3.12)
- **Task Processing**: Celery with Redis broker - **Task Processing**: Celery with Redis broker
- **Document Processing**: EasyOCR, PyMuPDF, python-docx, pdfplumber - **Document Processing**: EasyOCR, PyMuPDF, python-docx, pdfplumber
@@ -16,6 +19,7 @@ MyDocManager is a real-time document processing application that automatically d
- **File Monitoring**: Python watchdog library - **File Monitoring**: Python watchdog library
### Services Architecture ### Services Architecture
┌─────────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Frontend │ │ file- │ │ Redis │ │ Worker │ │ MongoDB │ │ Frontend │ │ file- │ │ Redis │ │ Worker │ │ MongoDB │
│ (React) │◄──►│ processor │───►│ (Broker) │◄──►│ (Celery) │───►│ (Results) │ │ (React) │◄──►│ processor │───►│ (Broker) │◄──►│ (Celery) │───►│ (Results) │
@@ -24,13 +28,13 @@ MyDocManager is a real-time document processing application that automatically d
└─────────────────┘ └─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘ └─────────────────┘ └─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘
### Docker Services ### Docker Services
1. **file-processor**: FastAPI + real-time file monitoring + Celery task dispatch 1. **file-processor**: FastAPI + real-time file monitoring + Celery task dispatch
2. **worker**: Celery workers for document processing (OCR, text extraction) 2. **worker**: Celery workers for document processing (OCR, text extraction)
3. **redis**: Message broker for Celery tasks 3. **redis**: Message broker for Celery tasks
4. **mongodb**: Final database for processing results 4. **mongodb**: Final database for processing results
5. **frontend**: React interface for monitoring and file access 5. **frontend**: React interface for monitoring and file access
## Data Flow ## Data Flow
1. **File Detection**: Watchdog monitors target directory in real-time 1. **File Detection**: Watchdog monitors target directory in real-time
@@ -42,11 +46,13 @@ MyDocManager is a real-time document processing application that automatically d
## Document Processing Capabilities ## Document Processing Capabilities
### Supported File Types ### Supported File Types
- **PDF**: Direct text extraction + OCR for scanned documents - **PDF**: Direct text extraction + OCR for scanned documents
- **Word Documents**: .docx text extraction - **Word Documents**: .docx text extraction
- **Images**: OCR text recognition (JPG, PNG, etc.) - **Images**: OCR text recognition (JPG, PNG, etc.)
### Processing Libraries ### Processing Libraries
- **EasyOCR**: Modern OCR engine (80+ languages, deep learning-based) - **EasyOCR**: Modern OCR engine (80+ languages, deep learning-based)
- **PyMuPDF**: PDF text extraction and manipulation - **PyMuPDF**: PDF text extraction and manipulation
- **python-docx**: Word document processing - **python-docx**: Word document processing
@@ -55,12 +61,15 @@ MyDocManager is a real-time document processing application that automatically d
## Development Environment ## Development Environment
### Container-Based Development ### Container-Based Development
The application is designed for container-based development with hot-reload capabilities: The application is designed for container-based development with hot-reload capabilities:
- Source code mounted as volumes for real-time updates - Source code mounted as volumes for real-time updates
- All services orchestrated via Docker Compose - All services orchestrated via Docker Compose
- Development and production parity - Development and production parity
### Key Features ### Key Features
- **Real-time Processing**: Immediate file detection and processing - **Real-time Processing**: Immediate file detection and processing
- **Horizontal Scaling**: Multiple workers can be added easily - **Horizontal Scaling**: Multiple workers can be added easily
- **Fault Tolerance**: Celery provides automatic retry mechanisms - **Fault Tolerance**: Celery provides automatic retry mechanisms
@@ -68,6 +77,7 @@ The application is designed for container-based development with hot-reload capa
- **Hot Reload**: Development changes reflected instantly in containers - **Hot Reload**: Development changes reflected instantly in containers
### Docker Services ### Docker Services
1. **file-processor**: FastAPI + real-time file monitoring + Celery task dispatch 1. **file-processor**: FastAPI + real-time file monitoring + Celery task dispatch
2. **worker**: Celery workers for document processing (OCR, text extraction) 2. **worker**: Celery workers for document processing (OCR, text extraction)
3. **redis**: Message broker for Celery tasks 3. **redis**: Message broker for Celery tasks
@@ -138,6 +148,7 @@ MyDocManager/
## Authentication & User Management ## Authentication & User Management
### Security Features ### Security Features
- **JWT Authentication**: Stateless authentication with 24-hour token expiration - **JWT Authentication**: Stateless authentication with 24-hour token expiration
- **Password Security**: bcrypt hashing with automatic salting - **Password Security**: bcrypt hashing with automatic salting
- **Role-Based Access**: Admin and User roles with granular permissions - **Role-Based Access**: Admin and User roles with granular permissions
@@ -145,16 +156,19 @@ MyDocManager/
- **Auto Admin Creation**: Default admin user created on first startup - **Auto Admin Creation**: Default admin user created on first startup
### User Roles ### User Roles
- **Admin**: Full access to user management (create, read, update, delete users) - **Admin**: Full access to user management (create, read, update, delete users)
- **User**: Limited access (view own profile, access document processing features) - **User**: Limited access (view own profile, access document processing features)
### Authentication Flow ### Authentication Flow
1. **Login**: User provides credentials → Server validates → Returns JWT token 1. **Login**: User provides credentials → Server validates → Returns JWT token
2. **API Access**: Client includes JWT in Authorization header 2. **API Access**: Client includes JWT in Authorization header
3. **Token Validation**: Server verifies token signature and expiration 3. **Token Validation**: Server verifies token signature and expiration
4. **Role Check**: Server validates user permissions for requested resource 4. **Role Check**: Server validates user permissions for requested resource
### User Management APIs ### User Management APIs
``` ```
POST /auth/login # Generate JWT token POST /auth/login # Generate JWT token
GET /users # List all users (admin only) GET /users # List all users (admin only)
@@ -164,10 +178,227 @@ DELETE /users/{user_id} # Delete user (admin only)
GET /users/me # Get current user profile (authenticated users) GET /users/me # Get current user profile (authenticated users)
``` ```
### Useful Service URLs
## Docker Commands Reference - **FastAPI API**: http://localhost:8000
- **FastAPI Docs**: http://localhost:8000/docs
- **Health Check**: http://localhost:8000/health
- **Redis**: localhost:6379
- **MongoDB**: localhost:27017
### Initial Setup & Build ### Testing Commands
```bash
# Test FastAPI health
curl http://localhost:8000/health
# Test Celery task dispatch
curl -X POST http://localhost:8000/test-task \
-H "Content-Type: application/json" \
-d '{"message": "Hello from test!"}'
# Monitor Celery tasks
docker-compose logs -f worker
```
## Default Admin User
On first startup, the application automatically creates a default admin user:
- **Username**: `admin`
- **Password**: `admin`
- **Role**: `admin`
- **Email**: `admin@mydocmanager.local`
**⚠️ Important**: Change the default admin password immediately after first login in production environments.
## File Processing Architecture
### Document Processing Flow
1. **File Detection**: Watchdog monitors `/volumes/watched_files/` directory in real-time
2. **Task Creation**: File watcher creates Celery task for each detected file
3. **Document Processing**: Celery worker processes the document and extracts content
4. **Database Storage**: Processed data stored in MongoDB collections
### MongoDB Collections Design
#### Files Collection
Stores file metadata and extracted content:
```json
{
"_id": "ObjectId",
"filename": "document.pdf",
"filepath": "/watched_files/document.pdf",
"file_type": "pdf",
"mime_type": "application/pdf",
"file_size": 2048576,
"content": "extracted text content...",
"encoding": "utf-8",
"extraction_method": "direct_text",
// direct_text, ocr, hybrid
"metadata": {
"page_count": 15,
// for PDFs
"word_count": 250,
// for text files
"image_dimensions": {
// for images
"width": 1920,
"height": 1080
}
},
"detected_at": "2024-01-15T10:29:00Z",
"file_hash": "sha256_hash_value"
}
```
#### Processing Jobs Collection
Tracks processing status and lifecycle:
```json
{
"_id": "ObjectId",
"file_id": "reference_to_files_collection",
"status": "completed",
// pending, processing, completed, failed
"task_id": "celery_task_uuid",
"created_at": "2024-01-15T10:29:00Z",
"started_at": "2024-01-15T10:29:30Z",
"completed_at": "2024-01-15T10:30:00Z",
"error_message": null
}
```
### Supported File Types (Initial Implementation)
- **Text Files** (`.txt`): Direct content reading
- **PDF Documents** (`.pdf`): Text extraction via PyMuPDF/pdfplumber
- **Word Documents** (`.docx`): Content extraction via python-docx
### File Processing Architecture Decisions
#### Watchdog Implementation
- **Choice**: Dedicated observer thread (Option A)
- **Rationale**: Standard approach, clean separation of concerns
- **Implementation**: Watchdog observer runs in separate thread from FastAPI
#### Task Dispatch Strategy
- **Choice**: Direct Celery task creation from file watcher
- **Rationale**: Minimal latency, straightforward flow
- **Implementation**: File detected → Immediate Celery task dispatch
#### Data Storage Strategy
- **Choice**: Separate collections for files and processing status
- **Rationale**: Clean separation of file data vs processing lifecycle
- **Benefits**:
- Better query performance
- Clear data model boundaries
- Easy processing status tracking
#### Content Storage Location
- **Choice**: Store extracted content in `files` collection
- **Rationale**: Content is intrinsic property of the file
- **Benefits**: Single query to get file + content, simpler data model
### Implementation Order
1. ✅ Pydantic models for MongoDB collections
2. ✅ Repository layer for data access (files + processing_jobs)
3. ✅ Celery tasks for document processing
4. ✅ Watchdog file monitoring implementation
5. ✅ FastAPI integration and startup coordination
### Processing Pipeline Features
- **Duplicate Detection**: SHA256 hashing prevents reprocessing same files
- **Error Handling**: Failed processing tracked with error messages
- **Status Tracking**: Real-time processing status via `processing_jobs` collection
- **Extensible Metadata**: Flexible metadata storage per file type
- **Multiple Extraction Methods**: Support for direct text, OCR, and hybrid approaches
## Key Implementation Notes
### Python Standards
- **Style**: PEP 8 compliance
- **Documentation**: Google/NumPy docstring format
- **Naming**: snake_case for variables and functions
- **Testing**: pytest with test_i_can_xxx / test_i_cannot_xxx patterns
### Security Best Practices
- **Password Storage**: Never store plain text passwords, always use bcrypt hashing
- **JWT Secrets**: Use strong, randomly generated secret keys in production
- **Token Expiration**: 24-hour expiration with secure signature validation
- **Role Validation**: Server-side role checking for all protected endpoints
### Dependencies Management
- **Package Manager**: pip (standard)
- **External Dependencies**: Listed in each service's requirements.txt
- **Standard Library First**: Prefer standard library when possible
### Testing Strategy
- All code must be testable
- Unit tests for each authentication and user management function
- Integration tests for complete authentication flow
- Tests validated before implementation
### Critical Architecture Decisions Made
1. **JWT Authentication**: Simple token-based auth with 24-hour expiration
2. **Role-Based Access**: Admin/User roles for granular permissions
3. **bcrypt Password Hashing**: Industry-standard password security
4. **MongoDB User Storage**: Centralized user management in main database
5. **Auto Admin Creation**: Automatic setup for first-time deployment
6. **Single FastAPI Service**: Handles both API and file watching with authentication
7. **Celery with Redis**: Chosen over other async patterns for scalability
8. **EasyOCR Preferred**: Selected over Tesseract for modern OCR needs
9. **Container Development**: Hot-reload setup required for development workflow
10. **Dedicated Watchdog Observer**: Thread-based file monitoring for reliability
11. **Separate MongoDB Collections**: Files and processing jobs stored separately
12. **Content in Files Collection**: Extracted content stored with file metadata
13. **Direct Task Dispatch**: File watcher directly creates Celery tasks
14. **SHA256 Duplicate Detection**: Prevents reprocessing identical files
### Development Process Requirements
1. **Collaborative Validation**: All options must be explained before coding
2. **Test-First Approach**: Test cases defined and validated before implementation
3. **Incremental Development**: Start simple, extend functionality progressively
4. **Error Handling**: Clear problem explanation required before proposing fixes
### Next Implementation Steps
1. ✅ Create docker-compose.yml with all services => Done
2. ✅ Define user management and authentication architecture => Done
3. ✅ Implement user models and authentication services =>
1. models/user.py => Done
2. models/auth.py => Done
3. database/repositories/user_repository.py => Done
4. ✅ Add automatic admin user creation if it does not exists => Done
5. **IN PROGRESS**: Implement file processing pipeline =>
1. Create Pydantic models for files and processing_jobs collections
2. Implement repository layer for file and processing job data access
3. Create Celery tasks for document processing (.txt, .pdf, .docx)
4. Implement Watchdog file monitoring with dedicated observer
5. Integrate file watcher with FastAPI startup
6. Create protected API routes for user management
7. Build React monitoring interface with authentication
## Annexes
### Docker Commands Reference
#### Initial Setup & Build
```bash ```bash
# Build and start all services (first time) # Build and start all services (first time)
@@ -181,7 +412,7 @@ docker-compose build file-processor
docker-compose build worker docker-compose build worker
``` ```
### Development Workflow #### Development Workflow
```bash ```bash
# Start all services # Start all services
@@ -203,7 +434,7 @@ docker-compose restart redis
docker-compose restart mongodb docker-compose restart mongodb
``` ```
### Monitoring & Debugging #### Monitoring & Debugging
```bash ```bash
# View logs of all services # View logs of all services
@@ -228,7 +459,7 @@ docker-compose exec worker bash
docker-compose exec mongodb mongosh docker-compose exec mongodb mongosh
``` ```
### Service Management #### Service Management
```bash ```bash
# Start only specific services # Start only specific services
@@ -248,103 +479,6 @@ docker-compose up --scale worker=3
### Hot-Reload Configuration ### Hot-Reload Configuration
- **file-processor**: Hot-reload enabled via `--reload` flag - **file-processor**: Hot-reload enabled via `--reload` flag
- Code changes in `src/file-processor/app/` automatically restart FastAPI - Code changes in `src/file-processor/app/` automatically restart FastAPI
- **worker**: No hot-reload (manual restart required for stability) - **worker**: No hot-reload (manual restart required for stability)
- Code changes in `src/worker/tasks/` require: `docker-compose restart worker` - Code changes in `src/worker/tasks/` require: `docker-compose restart worker`
### Useful Service URLs
- **FastAPI API**: http://localhost:8000
- **FastAPI Docs**: http://localhost:8000/docs
- **Health Check**: http://localhost:8000/health
- **Redis**: localhost:6379
- **MongoDB**: localhost:27017
### Testing Commands
```bash
# Test FastAPI health
curl http://localhost:8000/health
# Test Celery task dispatch
curl -X POST http://localhost:8000/test-task \
-H "Content-Type: application/json" \
-d '{"message": "Hello from test!"}'
# Monitor Celery tasks
docker-compose logs -f worker
```
## Default Admin User
On first startup, the application automatically creates a default admin user:
- **Username**: `admin`
- **Password**: `admin`
- **Role**: `admin`
- **Email**: `admin@mydocmanager.local`
**⚠️ Important**: Change the default admin password immediately after first login in production environments.
## Key Implementation Notes
### Python Standards
- **Style**: PEP 8 compliance
- **Documentation**: Google/NumPy docstring format
- **Naming**: snake_case for variables and functions
- **Testing**: pytest with test_i_can_xxx / test_i_cannot_xxx patterns
### Security Best Practices
- **Password Storage**: Never store plain text passwords, always use bcrypt hashing
- **JWT Secrets**: Use strong, randomly generated secret keys in production
- **Token Expiration**: 24-hour expiration with secure signature validation
- **Role Validation**: Server-side role checking for all protected endpoints
### Dependencies Management
- **Package Manager**: pip (standard)
- **External Dependencies**: Listed in each service's requirements.txt
- **Standard Library First**: Prefer standard library when possible
### Testing Strategy
- All code must be testable
- Unit tests for each authentication and user management function
- Integration tests for complete authentication flow
- Tests validated before implementation
### Critical Architecture Decisions Made
1. **JWT Authentication**: Simple token-based auth with 24-hour expiration
2. **Role-Based Access**: Admin/User roles for granular permissions
3. **bcrypt Password Hashing**: Industry-standard password security
4. **MongoDB User Storage**: Centralized user management in main database
5. **Auto Admin Creation**: Automatic setup for first-time deployment
6. **Single FastAPI Service**: Handles both API and file watching with authentication
7. **Celery with Redis**: Chosen over other async patterns for scalability
8. **EasyOCR Preferred**: Selected over Tesseract for modern OCR needs
9. **Container Development**: Hot-reload setup required for development workflow
### Development Process Requirements
1. **Collaborative Validation**: All options must be explained before coding
2. **Test-First Approach**: Test cases defined and validated before implementation
3. **Incremental Development**: Start simple, extend functionality progressively
4. **Error Handling**: Clear problem explanation required before proposing fixes
### Next Implementation Steps
1. ✅ Create docker-compose.yml with all services
2. ✅ Define user management and authentication architecture
3. Implement user models and authentication services
4. Create protected API routes for user management
5. Add automatic admin user creation
6. Implement basic FastAPI service structure
7. Add watchdog file monitoring
8. Create Celery task structure
9. Implement document processing tasks
10. Build React monitoring interface with authentication
### prochaines étapes
MongoDB CRUD
Nous devons absolument mocker MongoDB pour les tests unitaires avec pytest-mock
Fichiers à créer:
* app/models/auht.py => déjà fait
* app/models/user.py => déjà fait
* app/database/connection.py
* Utilise les settings pour l'URL MongoDB. Il faut créer un fichier de configuration (app/config/settings.py)
* Fonction get_database() + gestion des erreurs
* Configuration via variables d'environnement
* app/database/repositories/user_repository.py

View File

@@ -1,5 +1,3 @@
version: '3.8'
services: services:
# Redis - Message broker for Celery # Redis - Message broker for Celery
redis: redis:
@@ -36,15 +34,16 @@ services:
environment: environment:
- REDIS_URL=redis://redis:6379/0 - REDIS_URL=redis://redis:6379/0
- MONGODB_URL=mongodb://admin:password123@mongodb:27017/mydocmanager?authSource=admin - MONGODB_URL=mongodb://admin:password123@mongodb:27017/mydocmanager?authSource=admin
- PYTHONPATH=/app
volumes: volumes:
- ./src/file-processor/app:/app - ./src/file-processor:/app
- ./volumes/watched_files:/watched_files - ./volumes/watched_files:/watched_files
depends_on: depends_on:
- redis - redis
- mongodb - mongodb
networks: networks:
- mydocmanager-network - mydocmanager-network
command: uvicorn main:app --host 0.0.0.0 --port 8000 --reload command: uvicorn app.main:app --host 0.0.0.0 --port 8000 --reload
# Worker - Celery workers for document processing # Worker - Celery workers for document processing
worker: worker:
@@ -55,6 +54,7 @@ services:
environment: environment:
- REDIS_URL=redis://redis:6379/0 - REDIS_URL=redis://redis:6379/0
- MONGODB_URL=mongodb://admin:password123@mongodb:27017/mydocmanager?authSource=admin - MONGODB_URL=mongodb://admin:password123@mongodb:27017/mydocmanager?authSource=admin
- PYTHONPATH=/app
volumes: volumes:
- ./src/worker/tasks:/app - ./src/worker/tasks:/app
- ./volumes/watched_files:/watched_files - ./volumes/watched_files:/watched_files

7
pytest.ini Normal file
View File

@@ -0,0 +1,7 @@
[tool:pytest]
asyncio_mode = auto
testpaths = tests
python_files = test_*.py
python_classes = Test*
python_functions = test_*
pythonpath = src/file-processor

View File

@@ -3,7 +3,6 @@ annotated-types==0.7.0
anyio==4.10.0 anyio==4.10.0
bcrypt==4.3.0 bcrypt==4.3.0
billiard==4.2.1 billiard==4.2.1
bson==0.5.10
celery==5.5.3 celery==5.5.3
click==8.2.1 click==8.2.1
click-didyoumean==0.3.1 click-didyoumean==0.3.1
@@ -17,17 +16,25 @@ httptools==0.6.4
idna==3.10 idna==3.10
iniconfig==2.1.0 iniconfig==2.1.0
kombu==5.5.4 kombu==5.5.4
mongomock==4.3.0
mongomock-motor==0.0.36
motor==3.7.1
packaging==25.0 packaging==25.0
pipdeptree==2.28.0
pluggy==1.6.0 pluggy==1.6.0
prompt_toolkit==3.0.52 prompt_toolkit==3.0.52
pydantic==2.11.9 pydantic==2.11.9
pydantic_core==2.33.2 pydantic_core==2.33.2
Pygments==2.19.2 Pygments==2.19.2
pymongo==4.15.0 pymongo==4.15.1
pytest==8.4.2 pytest==8.4.2
pytest-asyncio==1.2.0
pytest-mock==3.15.1
python-dateutil==2.9.0.post0 python-dateutil==2.9.0.post0
python-dotenv==1.1.1 python-dotenv==1.1.1
pytz==2025.2
PyYAML==6.0.2 PyYAML==6.0.2
sentinels==1.1.1
six==1.17.0 six==1.17.0
sniffio==1.3.1 sniffio==1.3.1
starlette==0.47.3 starlette==0.47.3

View File

@@ -8,10 +8,12 @@ COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt RUN pip install --no-cache-dir -r requirements.txt
# Copy application code # Copy application code
COPY app/ . COPY . .
ENV PYTHONPATH=/app
# Expose port # Expose port
EXPOSE 8000 EXPOSE 8000
# Command will be overridden by docker-compose # Command will be overridden by docker-compose
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"] CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]

View File

@@ -11,7 +11,7 @@ from pymongo import MongoClient
from pymongo.database import Database from pymongo.database import Database
from pymongo.errors import ConnectionFailure, ServerSelectionTimeoutError from pymongo.errors import ConnectionFailure, ServerSelectionTimeoutError
from ..config.settings import get_mongodb_url, get_mongodb_database_name from app.config.settings import get_mongodb_url, get_mongodb_database_name
# Global variables for singleton pattern # Global variables for singleton pattern
_client: Optional[MongoClient] = None _client: Optional[MongoClient] = None

View File

@@ -0,0 +1,248 @@
"""
File repository for database operations on FileDocument collection.
This module provides data access operations for file documents stored
in MongoDB with proper error handling and type safety.
"""
from typing import Optional, List
from bson import ObjectId
from pymongo.errors import DuplicateKeyError, PyMongoError
from difflib import SequenceMatcher
from motor.motor_asyncio import AsyncIOMotorCollection
from app.models.document import FileDocument
from app.database.connection import get_database
class FileDocumentRepository:
"""
Repository class for file document database operations.
This class handles all database operations for FileDocument objects
with proper error handling and data validation.
"""
def __init__(self):
"""Initialize file repository with database connection."""
self.db = get_database()
self.collection: AsyncIOMotorCollection = self.db.files
self._ensure_indexes()
async def _ensure_indexes(self):
"""
Ensure required database indexes exist.
Creates unique index on username field to prevent duplicates.
"""
try:
await self.collection.create_index("filepath", unique=True)
except PyMongoError:
# Index might already exist, ignore error
pass
async def create_document(self, file_data: FileDocument) -> FileDocument:
"""
Create a new file document in database.
Args:
file_data (FileDocument): File document data to create
Returns:
FileDocument: Created file document with database ID
Raises:
ValueError: If file creation fails due to validation
DuplicateKeyError: If file with same hash already exists
"""
try:
file_dict = file_data.model_dump(by_alias=True, exclude_unset=True)
if "_id" in file_dict and file_dict["_id"] is None:
del file_dict["_id"]
result = await self.collection.insert_one(file_dict)
file_data.id = result.inserted_id
return file_data
except DuplicateKeyError as e:
raise DuplicateKeyError(f"File with same hash already exists: {e}")
except PyMongoError as e:
raise ValueError(f"Failed to create file document: {e}")
async def find_document_by_id(self, file_id: str) -> Optional[FileDocument]:
"""
Find file document by ID.
Args:
file_id (str): File document ID to search for
Returns:
FileDocument or None: File document if found, None otherwise
"""
try:
if not ObjectId.is_valid(file_id):
return None
file_doc = await self.collection.find_one({"_id": ObjectId(file_id)})
if file_doc:
return FileDocument(**file_doc)
return None
except PyMongoError:
return None
async def find_document_by_hash(self, file_hash: str) -> Optional[FileDocument]:
"""
Find file document by file hash to detect duplicates.
Args:
file_hash (str): SHA256 hash of file content
Returns:
FileDocument or None: File document if found, None otherwise
"""
try:
file_doc = await self.collection.find_one({"file_hash": file_hash})
if file_doc:
return FileDocument(**file_doc)
return None
except PyMongoError:
return None
async def find_document_by_filepath(self, filepath: str) -> Optional[FileDocument]:
"""
Find file document by exact filepath.
Args:
filepath (str): Full path to the file
Returns:
FileDocument or None: File document if found, None otherwise
"""
try:
file_doc = await self.collection.find_one({"filepath": filepath})
if file_doc:
return FileDocument(**file_doc)
return None
except PyMongoError:
return None
async def find_document_by_name(self, filename: str, similarity_threshold: float = 0.6) -> List[FileDocument]:
"""
Find file documents by filename using fuzzy matching.
Args:
filename (str): Filename to search for
similarity_threshold (float): Minimum similarity ratio (0.0 to 1.0)
Returns:
List[FileDocument]: List of matching files sorted by similarity score
"""
try:
# Get all files from database
cursor = self.collection.find({})
all_files = await cursor.to_list(length=None)
matches = []
for file_doc in all_files:
file_obj = FileDocument(**file_doc)
# Calculate similarity between search term and filename
similarity = SequenceMatcher(None, filename.lower(), file_obj.filename.lower()).ratio()
if similarity >= similarity_threshold:
matches.append((file_obj, similarity))
# Sort by similarity score (highest first)
matches.sort(key=lambda x: x[1], reverse=True)
# Return only the FileDocument objects
return [match[0] for match in matches]
except PyMongoError:
return []
async def list_documents(self, skip: int = 0, limit: int = 100) -> List[FileDocument]:
"""
List file documents with pagination.
Args:
skip (int): Number of documents to skip (default: 0)
limit (int): Maximum number of documents to return (default: 100)
Returns:
List[FileDocument]: List of file documents
"""
try:
cursor = self.collection.find({}).skip(skip).limit(limit).sort("detected_at", -1)
file_docs = await cursor.to_list(length=limit)
return [FileDocument(**doc) for doc in file_docs]
except PyMongoError:
return []
async def count_documents(self) -> int:
"""
Count total number of file documents.
Returns:
int: Total number of file documents in collection
"""
try:
return await self.collection.count_documents({})
except PyMongoError:
return 0
async def update_document(self, file_id: str, update_data: dict) -> Optional[FileDocument]:
"""
Update file document with new data.
Args:
file_id (str): File document ID to update
update_data (dict): Fields to update
Returns:
FileDocument or None: Updated file document if successful, None otherwise
"""
try:
if not ObjectId.is_valid(file_id):
return None
# Remove None values from update data
clean_update_data = {k: v for k, v in update_data.items() if v is not None}
if not clean_update_data:
return await self.find_document_by_id(file_id)
result = await self.collection.find_one_and_update(
{"_id": ObjectId(file_id)},
{"$set": clean_update_data},
return_document=True
)
if result:
return FileDocument(**result)
return None
except PyMongoError:
return None
async def delete_document(self, file_id: str) -> bool:
"""
Delete file document from database.
Args:
file_id (str): File document ID to delete
Returns:
bool: True if file was deleted, False otherwise
"""
try:
if not ObjectId.is_valid(file_id):
return False
result = await self.collection.delete_one({"_id": ObjectId(file_id)})
return result.deleted_count > 0
except PyMongoError:
return False

View File

@@ -2,17 +2,17 @@
User repository for MongoDB operations. User repository for MongoDB operations.
This module implements the repository pattern for user CRUD operations This module implements the repository pattern for user CRUD operations
with dependency injection of the database connection. with dependency injection of the database connection using async/await.
""" """
from typing import Optional, List, Dict, Any from typing import Optional, List
from datetime import datetime from datetime import datetime
from bson import ObjectId from bson import ObjectId
from pymongo.database import Database from motor.motor_asyncio import AsyncIOMotorDatabase, AsyncIOMotorCollection
from pymongo.errors import DuplicateKeyError from pymongo.errors import DuplicateKeyError, PyMongoError
from pymongo.collection import Collection
from app.models.user import UserCreate, UserInDB, UserUpdate from app.models.user import UserCreate, UserInDB, UserUpdate
from app.utils.security import hash_password
class UserRepository: class UserRepository:
@@ -20,35 +20,33 @@ class UserRepository:
Repository class for user CRUD operations in MongoDB. Repository class for user CRUD operations in MongoDB.
This class handles all database operations related to users, This class handles all database operations related to users,
following the repository pattern with dependency injection. following the repository pattern with dependency injection and async/await.
""" """
def __init__(self, database: Database): def __init__(self, database: AsyncIOMotorDatabase):
""" """
Initialize repository with database dependency. Initialize repository with database dependency.
Args: Args:
database (Database): MongoDB database instance database (AsyncIOMotorDatabase): MongoDB database instance
""" """
self.db = database self.db = database
self.collection: Collection = database.users self.collection: AsyncIOMotorCollection = database.users
# Create unique index on username for duplicate prevention
self._ensure_indexes() self._ensure_indexes()
def _ensure_indexes(self): async def _ensure_indexes(self):
""" """
Ensure required database indexes exist. Ensure required database indexes exist.
Creates unique index on username field to prevent duplicates. Creates unique index on username field to prevent duplicates.
""" """
try: try:
self.collection.create_index("username", unique=True) await self.collection.create_index("username", unique=True)
except Exception: except PyMongoError:
# Index might already exist, ignore error # Index might already exist, ignore error
pass pass
def create_user(self, user_data: UserCreate) -> UserInDB: async def create_user(self, user_data: UserCreate) -> UserInDB:
""" """
Create a new user in the database. Create a new user in the database.
@@ -60,25 +58,28 @@ class UserRepository:
Raises: Raises:
DuplicateKeyError: If username already exists DuplicateKeyError: If username already exists
ValueError: If user creation fails due to validation
""" """
user_dict = { user_dict = {
"username": user_data.username, "username": user_data.username,
"email": user_data.email, "email": user_data.email,
"hashed_password": user_data.hashed_password, "hashed_password": hash_password(user_data.password),
"role": user_data.role, "role": user_data.role,
"is_active": user_data.is_active, "is_active": True,
"created_at": datetime.utcnow(), "created_at": datetime.now(),
"updated_at": datetime.utcnow() "updated_at": datetime.now()
} }
try: try:
result = self.collection.insert_one(user_dict) result = await self.collection.insert_one(user_dict)
user_dict["_id"] = result.inserted_id user_dict["_id"] = result.inserted_id
return UserInDB(**user_dict) return UserInDB(**user_dict)
except DuplicateKeyError: except DuplicateKeyError as e:
raise DuplicateKeyError(f"User with username '{user_data.username}' already exists") raise DuplicateKeyError(f"User with username '{user_data.username}' already exists: {e}")
except PyMongoError as e:
raise ValueError(f"Failed to create user: {e}")
def find_user_by_username(self, username: str) -> Optional[UserInDB]: async def find_user_by_username(self, username: str) -> Optional[UserInDB]:
""" """
Find user by username. Find user by username.
@@ -88,12 +89,15 @@ class UserRepository:
Returns: Returns:
UserInDB or None: User if found, None otherwise UserInDB or None: User if found, None otherwise
""" """
user_doc = self.collection.find_one({"username": username}) try:
if user_doc: user_doc = await self.collection.find_one({"username": username})
return UserInDB(**user_doc) if user_doc:
return None return UserInDB(**user_doc)
return None
except PyMongoError:
return None
def find_user_by_id(self, user_id: str) -> Optional[UserInDB]: async def find_user_by_id(self, user_id: str) -> Optional[UserInDB]:
""" """
Find user by ID. Find user by ID.
@@ -104,16 +108,17 @@ class UserRepository:
UserInDB or None: User if found, None otherwise UserInDB or None: User if found, None otherwise
""" """
try: try:
object_id = ObjectId(user_id) if not ObjectId.is_valid(user_id):
user_doc = self.collection.find_one({"_id": object_id}) return None
user_doc = await self.collection.find_one({"_id": ObjectId(user_id)})
if user_doc: if user_doc:
return UserInDB(**user_doc) return UserInDB(**user_doc)
except Exception: return None
# Invalid ObjectId format except PyMongoError:
pass return None
return None
def find_user_by_email(self, email: str) -> Optional[UserInDB]: async def find_user_by_email(self, email: str) -> Optional[UserInDB]:
""" """
Find user by email address. Find user by email address.
@@ -123,12 +128,15 @@ class UserRepository:
Returns: Returns:
UserInDB or None: User if found, None otherwise UserInDB or None: User if found, None otherwise
""" """
user_doc = self.collection.find_one({"email": email}) try:
if user_doc: user_doc = await self.collection.find_one({"email": email})
return UserInDB(**user_doc) if user_doc:
return None return UserInDB(**user_doc)
return None
except PyMongoError:
return None
def update_user(self, user_id: str, user_update: UserUpdate) -> Optional[UserInDB]: async def update_user(self, user_id: str, user_update: UserUpdate) -> Optional[UserInDB]:
""" """
Update user information. Update user information.
@@ -140,34 +148,43 @@ class UserRepository:
UserInDB or None: Updated user if found, None otherwise UserInDB or None: Updated user if found, None otherwise
""" """
try: try:
object_id = ObjectId(user_id) if not ObjectId.is_valid(user_id):
return None
# Build update document with only provided fields # Build update document with only provided fields
update_data = {"updated_at": datetime.utcnow()} update_data = {"updated_at": datetime.now()}
if user_update.username is not None:
update_data["username"] = user_update.username
if user_update.email is not None: if user_update.email is not None:
update_data["email"] = user_update.email update_data["email"] = user_update.email
if user_update.hashed_password is not None: if user_update.password is not None:
update_data["hashed_password"] = user_update.hashed_password update_data["hashed_password"] = hash_password(user_update.password)
if user_update.role is not None: if user_update.role is not None:
update_data["role"] = user_update.role update_data["role"] = user_update.role
if user_update.is_active is not None: if user_update.is_active is not None:
update_data["is_active"] = user_update.is_active update_data["is_active"] = user_update.is_active
result = self.collection.update_one( # Remove None values from update data
{"_id": object_id}, clean_update_data = {k: v for k, v in update_data.items() if v is not None}
{"$set": update_data}
if not clean_update_data:
return await self.find_user_by_id(user_id)
result = await self.collection.find_one_and_update(
{"_id": ObjectId(user_id)},
{"$set": clean_update_data},
return_document=True
) )
if result.matched_count > 0: if result:
return self.find_user_by_id(user_id) return UserInDB(**result)
return None
except Exception: except PyMongoError:
# Invalid ObjectId format or other errors return None
pass
return None
def delete_user(self, user_id: str) -> bool: async def delete_user(self, user_id: str) -> bool:
""" """
Delete user from database. Delete user from database.
@@ -178,14 +195,15 @@ class UserRepository:
bool: True if user was deleted, False otherwise bool: True if user was deleted, False otherwise
""" """
try: try:
object_id = ObjectId(user_id) if not ObjectId.is_valid(user_id):
result = self.collection.delete_one({"_id": object_id}) return False
result = await self.collection.delete_one({"_id": ObjectId(user_id)})
return result.deleted_count > 0 return result.deleted_count > 0
except Exception: except PyMongoError:
# Invalid ObjectId format
return False return False
def list_users(self, skip: int = 0, limit: int = 100) -> List[UserInDB]: async def list_users(self, skip: int = 0, limit: int = 100) -> List[UserInDB]:
""" """
List users with pagination. List users with pagination.
@@ -196,19 +214,26 @@ class UserRepository:
Returns: Returns:
List[UserInDB]: List of users List[UserInDB]: List of users
""" """
cursor = self.collection.find().skip(skip).limit(limit) try:
return [UserInDB(**user_doc) for user_doc in cursor] cursor = self.collection.find({}).skip(skip).limit(limit).sort("created_at", -1)
user_docs = await cursor.to_list(length=limit)
return [UserInDB(**user_doc) for user_doc in user_docs]
except PyMongoError:
return []
def count_users(self) -> int: async def count_users(self) -> int:
""" """
Count total number of users. Count total number of users.
Returns: Returns:
int: Total number of users in database int: Total number of users in database
""" """
return self.collection.count_documents({}) try:
return await self.collection.count_documents({})
except PyMongoError:
return 0
def user_exists(self, username: str) -> bool: async def user_exists(self, username: str) -> bool:
""" """
Check if user exists by username. Check if user exists by username.
@@ -218,4 +243,8 @@ class UserRepository:
Returns: Returns:
bool: True if user exists, False otherwise bool: True if user exists, False otherwise
""" """
return self.collection.count_documents({"username": username}) > 0 try:
count = await self.collection.count_documents({"username": username})
return count > 0
except PyMongoError:
return False

View File

@@ -4,17 +4,74 @@ FastAPI application for MyDocManager file processor service.
This service provides API endpoints for health checks and task dispatching. This service provides API endpoints for health checks and task dispatching.
""" """
import logging
import os import os
from fastapi import FastAPI, HTTPException from contextlib import asynccontextmanager
from fastapi import FastAPI, HTTPException, Depends
from pydantic import BaseModel from pydantic import BaseModel
import redis import redis
from celery import Celery from celery import Celery
from app.database.connection import test_database_connection, get_database
from app.database.repositories.user_repository import UserRepository
from app.models.user import UserCreate
from app.services.init_service import InitializationService
from app.services.user_service import UserService
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@asynccontextmanager
async def lifespan(app: FastAPI):
"""
Application lifespan manager for startup and shutdown tasks.
Handles initialization tasks that need to run when the application starts,
including admin user creation and other setup procedures.
"""
# Startup tasks
logger.info("Starting MyDocManager application...")
try:
# Initialize database connection
database = get_database()
# Initialize repositories and services
user_repository = UserRepository(database)
user_service = UserService(user_repository)
init_service = InitializationService(user_service)
# Run initialization tasks
initialization_result = init_service.initialize_application()
if initialization_result["initialization_success"]:
logger.info("Application startup completed successfully")
if initialization_result["admin_user_created"]:
logger.info("Default admin user was created during startup")
else:
logger.error("Application startup completed with errors:")
for error in initialization_result["errors"]:
logger.error(f" - {error}")
except Exception as e:
logger.error(f"Critical error during application startup: {str(e)}")
# You might want to decide if the app should continue or exit here
# For now, we log the error but continue
yield # Application is running
# Shutdown tasks (if needed)
logger.info("Shutting down MyDocManager application...")
# Initialize FastAPI app # Initialize FastAPI app
app = FastAPI( app = FastAPI(
title="MyDocManager File Processor", title="MyDocManager File Processor",
description="File processing and task dispatch service", description="File processing and task dispatch service",
version="1.0.0" version="1.0.0",
lifespan=lifespan
) )
# Environment variables # Environment variables
@@ -42,6 +99,26 @@ class TestTaskRequest(BaseModel):
message: str message: str
def get_user_service() -> UserService:
"""
Dependency to get user service instance.
This should be properly implemented with database connection management
in your actual application.
"""
database = get_database()
user_repository = UserRepository(database)
return UserService(user_repository)
# Your API routes would use the service like this:
@app.post("/api/users")
async def create_user(
user_data: UserCreate,
user_service: UserService = Depends(get_user_service)
):
return user_service.create_user(user_data)
@app.get("/health") @app.get("/health")
async def health_check(): async def health_check():
""" """
@@ -68,6 +145,12 @@ async def health_check():
health_status["dependencies"]["redis"] = "disconnected" health_status["dependencies"]["redis"] = "disconnected"
health_status["status"] = "degraded" health_status["status"] = "degraded"
# check MongoDB connection
if test_database_connection():
health_status["dependencies"]["mongodb"] = "connected"
else:
health_status["dependencies"]["mongodb"] = "disconnected"
return health_status return health_status

View File

@@ -0,0 +1,142 @@
"""
Pydantic models for document processing collections.
This module defines the data models for file documents and processing jobs
stored in MongoDB collections.
"""
from datetime import datetime
from enum import Enum
from typing import Any, Dict, Optional
from bson import ObjectId
from pydantic import BaseModel, Field, field_validator
from app.models.types import PyObjectId
class FileType(str, Enum):
"""Supported file types for document processing."""
TXT = "txt"
PDF = "pdf"
DOCX = "docx"
JPG = "jpg"
PNG = "png"
class ExtractionMethod(str, Enum):
"""Methods used to extract content from documents."""
DIRECT_TEXT = "direct_text"
OCR = "ocr"
HYBRID = "hybrid"
class ProcessingStatus(str, Enum):
"""Status values for processing jobs."""
PENDING = "pending"
PROCESSING = "processing"
COMPLETED = "completed"
FAILED = "failed"
class FileDocument(BaseModel):
"""
Model for file documents stored in the 'files' collection.
Represents a file detected in the watched directory with its
metadata and extracted content.
"""
id: Optional[PyObjectId] = Field(default=None, alias="_id")
filename: str = Field(..., description="Original filename")
filepath: str = Field(..., description="Full path to the file")
file_type: FileType = Field(..., description="Type of the file")
extraction_method: Optional[ExtractionMethod] = Field(default=None, description="Method used to extract content")
metadata: Dict[str, Any] = Field(default_factory=dict, description="File-specific metadata")
detected_at: Optional[datetime] = Field(default=None, description="Timestamp when file was detected")
file_hash: Optional[str] = Field(default=None, description="SHA256 hash of file content")
@field_validator('filepath')
@classmethod
def validate_filepath(cls, v: str) -> str:
"""Validate filepath format."""
if not v.strip():
raise ValueError("Filepath cannot be empty")
return v.strip()
@field_validator('filename')
@classmethod
def validate_filename(cls, v: str) -> str:
"""Validate filename format."""
if not v.strip():
raise ValueError("Filename cannot be empty")
return v.strip()
class Config:
"""Pydantic configuration."""
populate_by_name = True
arbitrary_types_allowed = True
json_encoders = {ObjectId: str}
class DocumentContent(BaseModel):
"""Model for document content."""
id: Optional[PyObjectId] = Field(default=None, alias="_id")
file_hash: Optional[str] = Field(..., description="SHA256 hash of file content")
content: str = Field(..., description="File content")
encoding: str = Field(default="utf-8", description="Character encoding for text files")
file_size: int = Field(..., ge=0, description="File size in bytes")
mime_type: str = Field(..., description="MIME type detected")
class ProcessingJob(BaseModel):
"""
Model for processing jobs stored in the 'processing_jobs' collection.
Tracks the lifecycle and status of document processing tasks.
"""
id: Optional[PyObjectId] = Field(default=None, alias="_id")
file_id: PyObjectId = Field(..., description="Reference to file document")
status: ProcessingStatus = Field(
default=ProcessingStatus.PENDING,
description="Current processing status"
)
task_id: Optional[str] = Field(
default=None,
description="Celery task UUID"
)
created_at: Optional[datetime] = Field(
default=None,
description="Timestamp when job was created"
)
started_at: Optional[datetime] = Field(
default=None,
description="Timestamp when processing started"
)
completed_at: Optional[datetime] = Field(
default=None,
description="Timestamp when processing completed"
)
error_message: Optional[str] = Field(
default=None,
description="Error message if processing failed"
)
@field_validator('error_message')
@classmethod
def validate_error_message(cls, v: Optional[str]) -> Optional[str]:
"""Clean up error message."""
if v is not None:
return v.strip() if v.strip() else None
return v
class Config:
"""Pydantic configuration."""
populate_by_name = True
arbitrary_types_allowed = True
json_encoders = {ObjectId: str}

View File

@@ -0,0 +1,32 @@
from typing import Any
from bson import ObjectId
from pydantic_core import core_schema
class PyObjectId(ObjectId):
"""Custom ObjectId type for Pydantic v2 compatibility."""
@classmethod
def __get_pydantic_core_schema__(
cls, source_type: Any, handler
) -> core_schema.CoreSchema:
return core_schema.json_or_python_schema(
json_schema=core_schema.str_schema(),
python_schema=core_schema.union_schema([
core_schema.is_instance_schema(ObjectId),
core_schema.chain_schema([
core_schema.str_schema(),
core_schema.no_info_plain_validator_function(cls.validate),
])
]),
serialization=core_schema.plain_serializer_function_ser_schema(
lambda x: str(x)
),
)
@classmethod
def validate(cls, v):
if not ObjectId.is_valid(v):
raise ValueError("Invalid ObjectId")
return ObjectId(v)

View File

@@ -13,167 +13,146 @@ from pydantic import BaseModel, Field, field_validator, EmailStr
from pydantic_core import core_schema from pydantic_core import core_schema
from app.models.auth import UserRole from app.models.auth import UserRole
from app.models.types import PyObjectId
class PyObjectId(ObjectId):
"""Custom ObjectId type for Pydantic v2 compatibility."""
@classmethod
def __get_pydantic_core_schema__(
cls, source_type: Any, handler
) -> core_schema.CoreSchema:
return core_schema.json_or_python_schema(
json_schema=core_schema.str_schema(),
python_schema=core_schema.union_schema([
core_schema.is_instance_schema(ObjectId),
core_schema.chain_schema([
core_schema.str_schema(),
core_schema.no_info_plain_validator_function(cls.validate),
])
]),
serialization=core_schema.plain_serializer_function_ser_schema(
lambda x: str(x)
),
)
@classmethod
def validate(cls, v):
if not ObjectId.is_valid(v):
raise ValueError("Invalid ObjectId")
return ObjectId(v)
def validate_password_strength(password: str) -> str: def validate_password_strength(password: str) -> str:
""" """
Validate password meets security requirements. Validate password meets security requirements.
Requirements: Requirements:
- At least 8 characters long - At least 8 characters long
- Contains at least one uppercase letter - Contains at least one uppercase letter
- Contains at least one lowercase letter - Contains at least one lowercase letter
- Contains at least one digit - Contains at least one digit
- Contains at least one special character - Contains at least one special character
Args: Args:
password: The password string to validate password: The password string to validate
Returns: Returns:
str: The validated password str: The validated password
Raises: Raises:
ValueError: If password doesn't meet requirements ValueError: If password doesn't meet requirements
""" """
if len(password) < 8: if len(password) < 8:
raise ValueError("Password must be at least 8 characters long") raise ValueError("Password must be at least 8 characters long")
if not re.search(r'[A-Z]', password): if not re.search(r'[A-Z]', password):
raise ValueError("Password must contain at least one uppercase letter") raise ValueError("Password must contain at least one uppercase letter")
if not re.search(r'[a-z]', password): if not re.search(r'[a-z]', password):
raise ValueError("Password must contain at least one lowercase letter") raise ValueError("Password must contain at least one lowercase letter")
if not re.search(r'\d', password): if not re.search(r'\d', password):
raise ValueError("Password must contain at least one digit") raise ValueError("Password must contain at least one digit")
if not re.search(r'[!@#$%^&*()_+\-=\[\]{};:"\\|,.<>\/?]', password): if not re.search(r'[!@#$%^&*()_+\-=\[\]{};:"\\|,.<>\/?]', password):
raise ValueError("Password must contain at least one special character") raise ValueError("Password must contain at least one special character")
return password return password
def validate_username_not_empty(username: str) -> str: def validate_username_not_empty(username: str) -> str:
""" """
Validate username is not empty or whitespace only. Validate username is not empty or whitespace only.
Args: Args:
username: The username string to validate username: The username string to validate
Returns: Returns:
str: The validated username str: The validated username
Raises: Raises:
ValueError: If username is empty or whitespace only ValueError: If username is empty or whitespace only
""" """
if not username or not username.strip(): if not username or not username.strip():
raise ValueError("Username cannot be empty or whitespace only") raise ValueError("Username cannot be empty or whitespace only")
return username.strip() return username.strip()
class UserCreate(BaseModel): class UserCreateNoValidation(BaseModel):
"""Model for creating a new user.""" """Model for creating a new user."""
username: str username: str
email: EmailStr email: str
password: str password: str
role: UserRole = UserRole.USER role: UserRole = UserRole.USER
@field_validator('username')
@classmethod
def validate_username(cls, v):
return validate_username_not_empty(v)
@field_validator('password') class UserCreate(UserCreateNoValidation):
@classmethod """Model for creating a new user."""
def validate_password(cls, v): email: EmailStr
return validate_password_strength(v)
@field_validator('username')
@classmethod
def validate_username(cls, v):
return validate_username_not_empty(v)
@field_validator('password')
@classmethod
def validate_password(cls, v):
return validate_password_strength(v)
class UserUpdate(BaseModel): class UserUpdate(BaseModel):
"""Model for updating an existing user.""" """Model for updating an existing user."""
username: Optional[str] = None username: Optional[str] = None
email: Optional[EmailStr] = None email: Optional[EmailStr] = None
password: Optional[str] = None password: Optional[str] = None
role: Optional[UserRole] = None role: Optional[UserRole] = None
is_active: Optional[bool] = None
@field_validator('username') @field_validator('username')
@classmethod @classmethod
def validate_username(cls, v): def validate_username(cls, v):
if v is not None: if v is not None:
return validate_username_not_empty(v) return validate_username_not_empty(v)
return v return v
@field_validator('password') @field_validator('password')
@classmethod @classmethod
def validate_password(cls, v): def validate_password(cls, v):
if v is not None: if v is not None:
return validate_password_strength(v) return validate_password_strength(v)
return v return v
class UserInDB(BaseModel): class UserInDB(BaseModel):
"""Model for user data stored in database.""" """Model for user data stored in database."""
id: PyObjectId = Field(default_factory=PyObjectId, alias="_id") id: PyObjectId = Field(default_factory=PyObjectId, alias="_id")
username: str username: str
email: str email: str
password_hash: str hashed_password: str
role: UserRole role: UserRole
is_active: bool = True is_active: bool = True
created_at: datetime created_at: datetime
updated_at: datetime updated_at: datetime
model_config = { model_config = {
"populate_by_name": True, "populate_by_name": True,
"arbitrary_types_allowed": True, "arbitrary_types_allowed": True,
"json_encoders": {ObjectId: str} "json_encoders": {ObjectId: str}
} }
class UserResponse(BaseModel): class UserResponse(BaseModel):
"""Model for user data in API responses (excludes password_hash).""" """Model for user data in API responses (excludes password_hash)."""
id: PyObjectId = Field(alias="_id") id: PyObjectId = Field(alias="_id")
username: str username: str
email: str email: str
role: UserRole role: UserRole
is_active: bool is_active: bool
created_at: datetime created_at: datetime
updated_at: datetime updated_at: datetime
model_config = { model_config = {
"populate_by_name": True, "populate_by_name": True,
"arbitrary_types_allowed": True, "arbitrary_types_allowed": True,
"json_encoders": {ObjectId: str} "json_encoders": {ObjectId: str}
} }

View File

@@ -0,0 +1,58 @@
"""
Authentication service for password hashing and verification.
This module provides authentication-related functionality including
password hashing, verification, and JWT token management.
"""
from app.utils.security import hash_password, verify_password
class AuthService:
"""
Service class for authentication operations.
Handles password hashing, verification, and other authentication
related operations with proper security practices.
"""
@staticmethod
def hash_user_password(password: str) -> str:
"""
Hash a plaintext password for secure storage.
Args:
password (str): Plaintext password to hash
Returns:
str: Hashed password safe for database storage
Example:
>>> auth = AuthService()
>>> hashed = auth.hash_user_password("mypassword123")
>>> len(hashed) > 0
True
"""
return hash_password(password)
@staticmethod
def verify_user_password(password: str, hashed_password: str) -> bool:
"""
Verify a password against its hash.
Args:
password (str): Plaintext password to verify
hashed_password (str): Stored hashed password
Returns:
bool: True if password matches hash, False otherwise
Example:
>>> auth = AuthService()
>>> hashed = auth.hash_user_password("mypassword123")
>>> auth.verify_user_password("mypassword123", hashed)
True
>>> auth.verify_user_password("wrongpassword", hashed)
False
"""
return verify_password(password, hashed_password)

View File

@@ -0,0 +1,134 @@
"""
Initialization service for application startup tasks.
This module handles application initialization tasks including
creating default admin user if none exists.
"""
import logging
from typing import Optional
from app.models.user import UserCreate, UserInDB, UserCreateNoValidation
from app.models.auth import UserRole
from app.services.user_service import UserService
logger = logging.getLogger(__name__)
class InitializationService:
"""
Service for handling application initialization tasks.
This service manages startup operations like ensuring required
users exist and system is properly configured.
"""
def __init__(self, user_service: UserService):
"""
Initialize service with user service dependency.
Args:
user_service (UserService): Service for user operations
"""
self.user_service = user_service
def ensure_admin_user_exists(self) -> Optional[UserInDB]:
"""
Ensure default admin user exists in the system.
Creates a default admin user if no admin user exists in the system.
Uses default credentials that should be changed after first login.
Returns:
UserInDB or None: Created admin user if created, None if already exists
Raises:
Exception: If admin user creation fails
"""
logger.info("Checking if admin user exists...")
# Check if any admin user already exists
if self._admin_user_exists():
logger.info("Admin user already exists, skipping creation")
return None
logger.info("No admin user found, creating default admin user...")
try:
# Create default admin user
admin_data = UserCreateNoValidation(
username="admin",
email="admin@mydocmanager.local",
password="admin", # Should be changed after first login
role=UserRole.ADMIN
)
created_user = self.user_service.create_user(admin_data)
logger.info(f"Default admin user created successfully with ID: {created_user.id}")
logger.warning(
"Default admin user created with username 'admin' and password 'admin'. "
"Please change these credentials immediately for security!"
)
return created_user
except Exception as e:
logger.error(f"Failed to create default admin user: {str(e)}")
raise Exception(f"Admin user creation failed: {str(e)}")
def _admin_user_exists(self) -> bool:
"""
Check if any admin user exists in the system.
Returns:
bool: True if at least one admin user exists, False otherwise
"""
try:
# Get all users and check if any have admin role
users = self.user_service.list_users(limit=1000) # Reasonable limit for admin check
for user in users:
if user.role == UserRole.ADMIN and user.is_active:
return True
return False
except Exception as e:
logger.error(f"Error checking for admin users: {str(e)}")
# In case of error, assume admin exists to avoid creating duplicates
return True
def initialize_application(self) -> dict:
"""
Perform all application initialization tasks.
This method runs all necessary initialization procedures including
admin user creation and any other startup requirements.
Returns:
dict: Summary of initialization tasks performed
"""
logger.info("Starting application initialization...")
initialization_summary = {
"admin_user_created": False,
"initialization_success": False,
"errors": []
}
try:
# Ensure admin user exists
created_admin = self.ensure_admin_user_exists()
if created_admin:
initialization_summary["admin_user_created"] = True
initialization_summary["initialization_success"] = True
logger.info("Application initialization completed successfully")
except Exception as e:
error_msg = f"Application initialization failed: {str(e)}"
logger.error(error_msg)
initialization_summary["errors"].append(error_msg)
return initialization_summary

View File

@@ -0,0 +1,181 @@
"""
User service for business logic operations.
This module provides user-related business logic including user creation,
retrieval, updates, and authentication operations with proper error handling.
"""
from typing import Optional, List
from pymongo.errors import DuplicateKeyError
from app.models.user import UserCreate, UserInDB, UserUpdate, UserResponse, UserCreateNoValidation
from app.models.auth import UserRole
from app.database.repositories.user_repository import UserRepository
from app.services.auth_service import AuthService
class UserService:
"""
Service class for user business logic operations.
This class handles user-related operations including creation,
authentication, and data management with proper validation.
"""
def __init__(self, user_repository: UserRepository):
"""
Initialize user service with repository dependency.
Args:
user_repository (UserRepository): Repository for user data operations
"""
self.user_repository = user_repository
self.auth_service = AuthService()
def create_user(self, user_data: UserCreate | UserCreateNoValidation) -> UserInDB:
"""
Create a new user with business logic validation.
Args:
user_data (UserCreate): User creation data
Returns:
UserInDB: Created user with database information
Raises:
ValueError: If user already exists or validation fails
"""
# Check if user already exists
if self.user_repository.user_exists(user_data.username):
raise ValueError(f"User with username '{user_data.username}' already exists")
# Check if email already exists
existing_user = self.user_repository.find_user_by_email(user_data.email)
if existing_user:
raise ValueError(f"User with email '{user_data.email}' already exists")
try:
return self.user_repository.create_user(user_data)
except DuplicateKeyError:
raise ValueError(f"User with username '{user_data.username}' already exists")
def get_user_by_username(self, username: str) -> Optional[UserInDB]:
"""
Retrieve user by username.
Args:
username (str): Username to search for
Returns:
UserInDB or None: User if found, None otherwise
"""
return self.user_repository.find_user_by_username(username)
def get_user_by_id(self, user_id: str) -> Optional[UserInDB]:
"""
Retrieve user by ID.
Args:
user_id (str): User ID to search for
Returns:
UserInDB or None: User if found, None otherwise
"""
return self.user_repository.find_user_by_id(user_id)
def authenticate_user(self, username: str, password: str) -> Optional[UserInDB]:
"""
Authenticate user with username and password.
Args:
username (str): Username for authentication
password (str): Password for authentication
Returns:
UserInDB or None: Authenticated user if valid, None otherwise
"""
user = self.user_repository.find_user_by_username(username)
if not user:
return None
if not user.is_active:
return None
if not self.auth_service.verify_user_password(password, user.hashed_password):
return None
return user
def update_user(self, user_id: str, user_update: UserUpdate) -> Optional[UserInDB]:
"""
Update user information.
Args:
user_id (str): User ID to update
user_update (UserUpdate): Updated user data
Returns:
UserInDB or None: Updated user if successful, None otherwise
Raises:
ValueError: If username or email already exists for different user
"""
# Validate username uniqueness if being updated
if user_update.username is not None:
existing_user = self.user_repository.find_user_by_username(user_update.username)
if existing_user and str(existing_user.id) != user_id:
raise ValueError(f"Username '{user_update.username}' is already taken")
# Validate email uniqueness if being updated
if user_update.email is not None:
existing_user = self.user_repository.find_user_by_email(user_update.email)
if existing_user and str(existing_user.id) != user_id:
raise ValueError(f"Email '{user_update.email}' is already taken")
return self.user_repository.update_user(user_id, user_update)
def delete_user(self, user_id: str) -> bool:
"""
Delete user from system.
Args:
user_id (str): User ID to delete
Returns:
bool: True if user was deleted, False otherwise
"""
return self.user_repository.delete_user(user_id)
def list_users(self, skip: int = 0, limit: int = 100) -> List[UserInDB]:
"""
List users with pagination.
Args:
skip (int): Number of users to skip (default: 0)
limit (int): Maximum number of users to return (default: 100)
Returns:
List[UserInDB]: List of users
"""
return self.user_repository.list_users(skip=skip, limit=limit)
def count_users(self) -> int:
"""
Count total number of users.
Returns:
int: Total number of users in system
"""
return self.user_repository.count_users()
def user_exists(self, username: str) -> bool:
"""
Check if user exists by username.
Args:
username (str): Username to check
Returns:
bool: True if user exists, False otherwise
"""
return self.user_repository.user_exists(username)

View File

@@ -1,6 +1,10 @@
fastapi==0.116.1 bcrypt==4.3.0
uvicorn==0.35.0
celery==5.5.3 celery==5.5.3
redis==6.4.0 email-validator==2.3.0
fastapi==0.116.1
httptools==0.6.4
motor==3.7.1
pymongo==4.15.0 pymongo==4.15.0
pydantic==2.11.9 pydantic==2.11.9
redis==6.4.0
uvicorn==0.35.0

View File

@@ -22,7 +22,9 @@ def test_i_can_get_database_connection():
"""Test successful database connection creation.""" """Test successful database connection creation."""
mock_client = Mock() mock_client = Mock()
mock_database = Mock() mock_database = Mock()
mock_client.__getitem__.return_value = mock_database
# Configure the mock to support dictionary-like access
mock_client.__getitem__ = Mock(return_value=mock_database)
with patch('app.database.connection.MongoClient', return_value=mock_client): with patch('app.database.connection.MongoClient', return_value=mock_client):
with patch('app.database.connection.get_mongodb_url', return_value="mongodb://localhost:27017"): with patch('app.database.connection.get_mongodb_url', return_value="mongodb://localhost:27017"):
@@ -36,6 +38,8 @@ def test_i_can_get_database_connection():
assert result == mock_database assert result == mock_database
mock_client.admin.command.assert_called_with('ping') mock_client.admin.command.assert_called_with('ping')
# Verify that __getitem__ was called with the database name
mock_client.__getitem__.assert_called_with("testdb")
def test_i_cannot_connect_to_invalid_mongodb_url(): def test_i_cannot_connect_to_invalid_mongodb_url():
@@ -78,7 +82,7 @@ def test_i_can_get_database_singleton():
"""Test that get_database returns the same instance (singleton pattern).""" """Test that get_database returns the same instance (singleton pattern)."""
mock_client = Mock() mock_client = Mock()
mock_database = Mock() mock_database = Mock()
mock_client.__getitem__.return_value = mock_database mock_client.__getitem__ = Mock(return_value=mock_database)
with patch('app.database.connection.MongoClient', return_value=mock_client): with patch('app.database.connection.MongoClient', return_value=mock_client):
with patch('app.database.connection.get_mongodb_url', return_value="mongodb://localhost:27017"): with patch('app.database.connection.get_mongodb_url', return_value="mongodb://localhost:27017"):
@@ -102,7 +106,7 @@ def test_i_can_close_database_connection():
"""Test closing database connection.""" """Test closing database connection."""
mock_client = Mock() mock_client = Mock()
mock_database = Mock() mock_database = Mock()
mock_client.__getitem__.return_value = mock_database mock_client.__getitem__ = Mock(return_value=mock_database)
with patch('app.database.connection.MongoClient', return_value=mock_client): with patch('app.database.connection.MongoClient', return_value=mock_client):
with patch('app.database.connection.get_mongodb_url', return_value="mongodb://localhost:27017"): with patch('app.database.connection.get_mongodb_url', return_value="mongodb://localhost:27017"):
@@ -127,7 +131,7 @@ def test_i_can_get_mongodb_client():
"""Test getting raw MongoDB client instance.""" """Test getting raw MongoDB client instance."""
mock_client = Mock() mock_client = Mock()
mock_database = Mock() mock_database = Mock()
mock_client.__getitem__.return_value = mock_database mock_client.__getitem__ = Mock(return_value=mock_database)
with patch('app.database.connection.MongoClient', return_value=mock_client): with patch('app.database.connection.MongoClient', return_value=mock_client):
with patch('app.database.connection.get_mongodb_url', return_value="mongodb://localhost:27017"): with patch('app.database.connection.get_mongodb_url', return_value="mongodb://localhost:27017"):
@@ -169,17 +173,6 @@ def test_i_can_test_database_connection_success():
mock_database.command.assert_called_with('ping') mock_database.command.assert_called_with('ping')
def test_i_cannot_test_database_connection_failure():
"""Test database connection health check - failure case."""
mock_database = Mock()
mock_database.command.side_effect = Exception("Connection error")
with patch('app.database.connection.get_database', return_value=mock_database):
result = test_database_connection()
assert result is False
def test_i_can_close_connection_when_no_client(): def test_i_can_close_connection_when_no_client():
"""Test closing connection when no client exists (should not raise error).""" """Test closing connection when no client exists (should not raise error)."""
# Reset global variables # Reset global variables

View File

@@ -0,0 +1,602 @@
"""
Test suite for FileDocumentRepository with async/await support.
This module contains comprehensive tests for all FileDocumentRepository methods
using mongomock-motor for in-memory MongoDB testing.
"""
import pytest
from datetime import datetime
from typing import Dict, Any
import pytest_asyncio
from bson import ObjectId
from pymongo.errors import DuplicateKeyError, PyMongoError
from mongomock_motor import AsyncMongoMockClient
from app.database.repositories.document_repository import FileDocumentRepository
from app.models.document import FileDocument, FileType
@pytest_asyncio.fixture
async def in_memory_repository():
"""Create an in-memory FileDocumentRepository for testing."""
client = AsyncMongoMockClient()
db = client.test_database
repo = FileDocumentRepository()
repo.db = db
repo.collection = db.files
return repo
@pytest.fixture
def sample_file_document():
"""Sample FileDocument data for testing."""
return FileDocument(
filename="test_document.pdf",
filepath="/path/to/test_document.pdf",
file_hash="a1b2c3d4e5f6789012345678901234567890abcdef1234567890abcdef123456",
file_type=FileType("pdf"),
detected_at=datetime.now(),
)
@pytest.fixture
def sample_update_data():
"""Sample update data for testing."""
return {
"metadata": {"tags": ["updated", "document"]},
"file_type": FileType("txt"),
}
@pytest.fixture
def multiple_sample_documents():
"""Multiple FileDocument objects for list/search testing."""
base_time = datetime.now()
return [
FileDocument(
filename="document1.pdf",
filepath="/path/to/document1.pdf",
file_hash="hash1" + "0" * 58,
file_type=FileType("pdf"),
detected_at=base_time,
),
FileDocument(
filename="similar_document.pdf",
filepath="/path/to/similar_document.pdf",
file_hash="hash2" + "0" * 58,
file_type=FileType("pdf"),
detected_at=base_time,
),
FileDocument(
filename="completely_different.txt",
filepath="/path/to/completely_different.txt",
file_hash="hash3" + "0" * 58,
file_type=FileType("pdf"),
detected_at=base_time,
)
]
class TestFileDocumentRepositoryInitialization:
"""Tests for repository initialization."""
@pytest.mark.asyncio
async def test_i_can_initialize_repository(self):
"""Test repository initialization."""
# Arrange
repo = FileDocumentRepository()
# Act & Assert (should not raise any exception)
assert repo.db is not None
assert repo.collection is not None
class TestFileDocumentRepositoryCreation:
"""Tests for file document creation functionality."""
@pytest.mark.asyncio
async def test_i_can_create_document(self, in_memory_repository, sample_file_document):
"""Test successful file document creation."""
# Act
created_doc = await in_memory_repository.create_document(sample_file_document)
# Assert
assert created_doc is not None
assert created_doc.filename == sample_file_document.filename
assert created_doc.filepath == sample_file_document.filepath
assert created_doc.file_hash == sample_file_document.file_hash
assert created_doc.file_type == sample_file_document.file_type
assert created_doc.id is not None
assert isinstance(created_doc.id, ObjectId)
@pytest.mark.asyncio
async def test_i_can_create_document_without_id(self, in_memory_repository, sample_file_document):
"""Test creating document with _id set to None (should be removed)."""
# Arrange
sample_file_document.id = None
# Act
created_doc = await in_memory_repository.create_document(sample_file_document)
# Assert
assert created_doc is not None
assert created_doc.id is not None
assert isinstance(created_doc.id, ObjectId)
@pytest.mark.asyncio
async def test_i_cannot_create_duplicate_document(self, in_memory_repository, sample_file_document):
"""Test that creating document with duplicate hash raises DuplicateKeyError."""
# Arrange
await in_memory_repository.create_document(sample_file_document)
duplicate_doc = FileDocument(
filename="different_name.pdf",
filepath=sample_file_document.filepath,
file_hash="different_hash" + "0" * 58,
file_type=FileType("pdf"),
detected_at=datetime.now()
)
# Act & Assert
with pytest.raises(DuplicateKeyError) as exc_info:
await in_memory_repository.create_document(duplicate_doc)
assert "already exists" in str(exc_info.value)
@pytest.mark.asyncio
async def test_i_cannot_create_document_with_pymongo_error(self, in_memory_repository, sample_file_document, mocker):
"""Test handling of PyMongo errors during document creation."""
# Arrange
mocker.patch.object(in_memory_repository.collection, 'insert_one', side_effect=PyMongoError("Database error"))
# Act & Assert
with pytest.raises(ValueError) as exc_info:
await in_memory_repository.create_document(sample_file_document)
assert "Failed to create file document" in str(exc_info.value)
class TestFileDocumentRepositoryFinding:
"""Tests for file document finding functionality."""
@pytest.mark.asyncio
async def test_i_can_find_document_by_valid_id(self, in_memory_repository, sample_file_document):
"""Test finding document by valid ObjectId."""
# Arrange
created_doc = await in_memory_repository.create_document(sample_file_document)
# Act
found_doc = await in_memory_repository.find_document_by_id(str(created_doc.id))
# Assert
assert found_doc is not None
assert found_doc.id == created_doc.id
assert found_doc.filename == created_doc.filename
assert found_doc.file_hash == created_doc.file_hash
@pytest.mark.asyncio
async def test_i_cannot_find_document_with_invalid_id(self, in_memory_repository):
"""Test that invalid ObjectId returns None."""
# Act
found_doc = await in_memory_repository.find_document_by_id("invalid_id")
# Assert
assert found_doc is None
@pytest.mark.asyncio
async def test_i_cannot_find_document_by_nonexistent_id(self, in_memory_repository):
"""Test that nonexistent but valid ObjectId returns None."""
# Arrange
nonexistent_id = str(ObjectId())
# Act
found_doc = await in_memory_repository.find_document_by_id(nonexistent_id)
# Assert
assert found_doc is None
@pytest.mark.asyncio
async def test_i_can_find_document_by_hash(self, in_memory_repository, sample_file_document):
"""Test finding document by file hash."""
# Arrange
created_doc = await in_memory_repository.create_document(sample_file_document)
# Act
found_doc = await in_memory_repository.find_document_by_hash(sample_file_document.file_hash)
# Assert
assert found_doc is not None
assert found_doc.file_hash == created_doc.file_hash
assert found_doc.id == created_doc.id
@pytest.mark.asyncio
async def test_i_cannot_find_document_with_nonexistent_hash(self, in_memory_repository):
"""Test that nonexistent hash returns None."""
# Act
found_doc = await in_memory_repository.find_document_by_hash("nonexistent_hash")
# Assert
assert found_doc is None
@pytest.mark.asyncio
async def test_i_can_find_document_by_filepath(self, in_memory_repository, sample_file_document):
"""Test finding document by exact filepath."""
# Arrange
created_doc = await in_memory_repository.create_document(sample_file_document)
# Act
found_doc = await in_memory_repository.find_document_by_filepath(sample_file_document.filepath)
# Assert
assert found_doc is not None
assert found_doc.filepath == created_doc.filepath
assert found_doc.id == created_doc.id
@pytest.mark.asyncio
async def test_i_cannot_find_document_with_nonexistent_filepath(self, in_memory_repository):
"""Test that nonexistent filepath returns None."""
# Act
found_doc = await in_memory_repository.find_document_by_filepath("/nonexistent/path.pdf")
# Assert
assert found_doc is None
class TestFileDocumentRepositoryFuzzySearch:
"""Tests for fuzzy search functionality by filename."""
@pytest.mark.asyncio
async def test_i_can_find_documents_by_exact_name(self, in_memory_repository, multiple_sample_documents):
"""Test finding documents with exact filename match."""
# Arrange
for doc in multiple_sample_documents:
await in_memory_repository.create_document(doc)
# Act
found_docs = await in_memory_repository.find_document_by_name("document1.pdf")
# Assert
assert len(found_docs) == 1
assert found_docs[0].filename == "document1.pdf"
@pytest.mark.asyncio
async def test_i_can_find_documents_by_fuzzy_name(self, in_memory_repository, multiple_sample_documents):
"""Test finding documents with fuzzy matching using default threshold."""
# Arrange
for doc in multiple_sample_documents:
await in_memory_repository.create_document(doc)
# Act
found_docs = await in_memory_repository.find_document_by_name("document")
# Assert
assert len(found_docs) >= 2 # Should find document1.pdf and similar_document.pdf
filenames = [doc.filename for doc in found_docs]
assert "document1.pdf" in filenames
assert "similar_document.pdf" in filenames
@pytest.mark.asyncio
async def test_i_can_find_documents_with_custom_threshold(self, in_memory_repository, multiple_sample_documents):
"""Test finding documents with custom similarity threshold."""
# Arrange
for doc in multiple_sample_documents:
await in_memory_repository.create_document(doc)
# Act - Very high threshold should only match exact or very similar names
found_docs = await in_memory_repository.find_document_by_name("document1.pdf", similarity_threshold=0.9)
# Assert
assert len(found_docs) == 1
assert found_docs[0].filename == "document1.pdf"
@pytest.mark.asyncio
async def test_i_can_find_documents_sorted_by_similarity(self, in_memory_repository, multiple_sample_documents):
"""Test that documents are sorted by similarity score (highest first)."""
# Arrange
for doc in multiple_sample_documents:
await in_memory_repository.create_document(doc)
# Act
found_docs = await in_memory_repository.find_document_by_name("document1", similarity_threshold=0.3)
# Assert
assert len(found_docs) >= 1
# First result should be the most similar (document1.pdf)
assert found_docs[0].filename == "document1.pdf"
@pytest.mark.asyncio
async def test_i_cannot_find_documents_below_threshold(self, in_memory_repository, multiple_sample_documents):
"""Test that no documents are returned when similarity is below threshold."""
# Arrange
for doc in multiple_sample_documents:
await in_memory_repository.create_document(doc)
# Act
found_docs = await in_memory_repository.find_document_by_name("xyz", similarity_threshold=0.6)
# Assert
assert len(found_docs) == 0
@pytest.mark.asyncio
async def test_i_cannot_find_documents_by_name_with_pymongo_error(self, in_memory_repository, mocker):
"""Test handling of PyMongo errors during name search."""
# Arrange
mocker.patch.object(in_memory_repository.collection, 'find', side_effect=PyMongoError("Database error"))
# Act
found_docs = await in_memory_repository.find_document_by_name("test")
# Assert
assert found_docs == []
class TestFileDocumentRepositoryListing:
"""Tests for document listing functionality."""
@pytest.mark.asyncio
async def test_i_can_list_documents_with_default_pagination(self, in_memory_repository, multiple_sample_documents):
"""Test listing documents with default pagination."""
# Arrange
for doc in multiple_sample_documents:
await in_memory_repository.create_document(doc)
# Act
docs = await in_memory_repository.list_documents()
# Assert
assert len(docs) == len(multiple_sample_documents)
assert all(isinstance(doc, FileDocument) for doc in docs)
@pytest.mark.asyncio
async def test_i_can_list_documents_with_custom_pagination(self, in_memory_repository, multiple_sample_documents):
"""Test listing documents with custom pagination."""
# Arrange
for doc in multiple_sample_documents:
await in_memory_repository.create_document(doc)
# Act
docs_page1 = await in_memory_repository.list_documents(skip=0, limit=2)
docs_page2 = await in_memory_repository.list_documents(skip=2, limit=2)
# Assert
assert len(docs_page1) == 2
assert len(docs_page2) == 1 # Only 3 total documents
# Ensure no overlap between pages
page1_ids = [doc.id for doc in docs_page1]
page2_ids = [doc.id for doc in docs_page2]
assert len(set(page1_ids).intersection(set(page2_ids))) == 0
@pytest.mark.asyncio
async def test_i_can_list_documents_sorted_by_date(self, in_memory_repository, sample_file_document):
"""Test that documents are sorted by detected_at in descending order."""
# Arrange
from datetime import timedelta
# Create documents with different timestamps
doc1 = sample_file_document.model_copy()
doc1.filename = "oldest.pdf"
doc1.file_hash = "hash1" + "0" * 58
doc1.detected_at = datetime.now() - timedelta(hours=2)
doc2 = sample_file_document.model_copy()
doc2.filename = "newest.pdf"
doc2.file_hash = "hash2" + "0" * 58
doc2.detected_at = datetime.now()
await in_memory_repository.create_document(doc1)
await in_memory_repository.create_document(doc2)
# Act
docs = await in_memory_repository.list_documents()
# Assert
assert len(docs) == 2
assert docs[0].filename == "newest.pdf" # Most recent first
assert docs[1].filename == "oldest.pdf"
@pytest.mark.asyncio
async def test_i_can_list_empty_documents(self, in_memory_repository):
"""Test listing documents from empty collection."""
# Act
docs = await in_memory_repository.list_documents()
# Assert
assert docs == []
@pytest.mark.asyncio
async def test_i_cannot_list_documents_with_pymongo_error(self, in_memory_repository, mocker):
"""Test handling of PyMongo errors during document listing."""
# Arrange
mocker.patch.object(in_memory_repository.collection, 'find', side_effect=PyMongoError("Database error"))
# Act
docs = await in_memory_repository.list_documents()
# Assert
assert docs == []
class TestFileDocumentRepositoryUpdate:
"""Tests for document update functionality."""
@pytest.mark.asyncio
async def test_i_can_update_document_successfully(self, in_memory_repository, sample_file_document,
sample_update_data):
"""Test successful document update."""
# Arrange
created_doc = await in_memory_repository.create_document(sample_file_document)
# Act
updated_doc = await in_memory_repository.update_document(str(created_doc.id), sample_update_data)
# Assert
assert updated_doc is not None
assert updated_doc.tags == sample_update_data["tags"]
assert updated_doc.file_type == sample_update_data["file_type"]
assert updated_doc.id == created_doc.id
assert updated_doc.filename == created_doc.filename # Unchanged fields remain
@pytest.mark.asyncio
async def test_i_can_update_document_with_partial_data(self, in_memory_repository, sample_file_document):
"""Test updating document with partial data."""
# Arrange
created_doc = await in_memory_repository.create_document(sample_file_document)
partial_update = {"tags": ["new_tag"]}
# Act
updated_doc = await in_memory_repository.update_document(str(created_doc.id), partial_update)
# Assert
assert updated_doc is not None
assert updated_doc.tags == ["new_tag"]
assert updated_doc.filename == created_doc.filename # Should remain unchanged
assert updated_doc.file_type == created_doc.file_type # Should remain unchanged
@pytest.mark.asyncio
async def test_i_can_update_document_filtering_none_values(self, in_memory_repository, sample_file_document):
"""Test that None values are filtered out from update data."""
# Arrange
created_doc = await in_memory_repository.create_document(sample_file_document)
update_with_none = {"tags": ["new_tag"], "file_type": None}
# Act
updated_doc = await in_memory_repository.update_document(str(created_doc.id), update_with_none)
# Assert
assert updated_doc is not None
assert updated_doc.tags == ["new_tag"]
assert updated_doc.file_type == created_doc.file_type # Should remain unchanged (None filtered out)
@pytest.mark.asyncio
async def test_i_can_update_document_with_empty_data(self, in_memory_repository, sample_file_document):
"""Test updating document with empty data returns current document."""
# Arrange
created_doc = await in_memory_repository.create_document(sample_file_document)
empty_update = {}
# Act
result = await in_memory_repository.update_document(str(created_doc.id), empty_update)
# Assert
assert result is not None
assert result.filename == created_doc.filename
assert result.file_hash == created_doc.file_hash
assert result.tags == created_doc.tags
@pytest.mark.asyncio
async def test_i_cannot_update_document_with_invalid_id(self, in_memory_repository, sample_update_data):
"""Test that updating with invalid ID returns None."""
# Act
result = await in_memory_repository.update_document("invalid_id", sample_update_data)
# Assert
assert result is None
@pytest.mark.asyncio
async def test_i_cannot_update_nonexistent_document(self, in_memory_repository, sample_update_data):
"""Test that updating nonexistent document returns None."""
# Arrange
nonexistent_id = str(ObjectId())
# Act
result = await in_memory_repository.update_document(nonexistent_id, sample_update_data)
# Assert
assert result is None
@pytest.mark.asyncio
async def test_i_cannot_update_document_with_pymongo_error(self, in_memory_repository, sample_file_document,
sample_update_data, mocker):
"""Test handling of PyMongo errors during document update."""
# Arrange
created_doc = await in_memory_repository.create_document(sample_file_document)
mocker.patch.object(in_memory_repository.collection, 'find_one_and_update',
side_effect=PyMongoError("Database error"))
# Act
result = await in_memory_repository.update_document(str(created_doc.id), sample_update_data)
# Assert
assert result is None
class TestFileDocumentRepositoryDeletion:
"""Tests for document deletion functionality."""
@pytest.mark.asyncio
async def test_i_can_delete_existing_document(self, in_memory_repository, sample_file_document):
"""Test successful document deletion."""
# Arrange
created_doc = await in_memory_repository.create_document(sample_file_document)
# Act
deletion_result = await in_memory_repository.delete_document(str(created_doc.id))
# Assert
assert deletion_result is True
# Verify document is actually deleted
found_doc = await in_memory_repository.find_document_by_id(str(created_doc.id))
assert found_doc is None
@pytest.mark.asyncio
async def test_i_cannot_delete_document_with_invalid_id(self, in_memory_repository):
"""Test that deleting with invalid ID returns False."""
# Act
result = await in_memory_repository.delete_document("invalid_id")
# Assert
assert result is False
@pytest.mark.asyncio
async def test_i_cannot_delete_nonexistent_document(self, in_memory_repository):
"""Test that deleting nonexistent document returns False."""
# Arrange
nonexistent_id = str(ObjectId())
# Act
result = await in_memory_repository.delete_document(nonexistent_id)
# Assert
assert result is False
@pytest.mark.asyncio
async def test_i_cannot_delete_document_with_pymongo_error(self, in_memory_repository, sample_file_document, mocker):
"""Test handling of PyMongo errors during document deletion."""
# Arrange
created_doc = await in_memory_repository.create_document(sample_file_document)
mocker.patch.object(in_memory_repository.collection, 'delete_one', side_effect=PyMongoError("Database error"))
# Act
result = await in_memory_repository.delete_document(str(created_doc.id))
# Assert
assert result is False
class TestFileDocumentRepositoryUtilities:
"""Tests for utility methods."""
@pytest.mark.asyncio
async def test_i_can_count_documents(self, in_memory_repository, sample_file_document):
"""Test counting documents."""
# Arrange
initial_count = await in_memory_repository.count_documents()
await in_memory_repository.create_document(sample_file_document)
# Act
final_count = await in_memory_repository.count_documents()
# Assert
assert final_count == initial_count + 1
@pytest.mark.asyncio
async def test_i_can_count_zero_documents(self, in_memory_repository):
"""Test counting documents in empty collection."""
# Act
count = await in_memory_repository.count_documents()
# Assert
assert count == 0

View File

@@ -262,14 +262,14 @@ class TestUserInDBModel:
def test_i_can_create_user_in_db_model(self): def test_i_can_create_user_in_db_model(self):
"""Test creation of valid UserInDB model with all fields.""" """Test creation of valid UserInDB model with all fields."""
user_id = ObjectId() user_id = ObjectId()
created_at = datetime.utcnow() created_at = datetime.now()
updated_at = datetime.utcnow() updated_at = datetime.now()
user_data = { user_data = {
"id": user_id, "id": user_id,
"username": "testuser", "username": "testuser",
"email": "test@example.com", "email": "test@example.com",
"password_hash": "$2b$12$hashedpassword", "hashed_password": "$2b$12$hashedpassword",
"role": UserRole.USER, "role": UserRole.USER,
"is_active": True, "is_active": True,
"created_at": created_at, "created_at": created_at,
@@ -281,29 +281,12 @@ class TestUserInDBModel:
assert user.id == user_id assert user.id == user_id
assert user.username == "testuser" assert user.username == "testuser"
assert user.email == "test@example.com" assert user.email == "test@example.com"
assert user.password_hash == "$2b$12$hashedpassword" assert user.hashed_password == "$2b$12$hashedpassword"
assert user.role == UserRole.USER assert user.role == UserRole.USER
assert user.is_active is True assert user.is_active is True
assert user.created_at == created_at assert user.created_at == created_at
assert user.updated_at == updated_at assert user.updated_at == updated_at
def test_i_can_create_inactive_user(self):
"""Test creation of inactive user."""
user_data = {
"id": ObjectId(),
"username": "testuser",
"email": "test@example.com",
"password_hash": "$2b$12$hashedpassword",
"role": UserRole.USER,
"is_active": False,
"created_at": datetime.utcnow(),
"updated_at": datetime.utcnow()
}
user = UserInDB(**user_data)
assert user.is_active is False
class TestUserResponseModel: class TestUserResponseModel:
"""Tests for UserResponse Pydantic model (API response).""" """Tests for UserResponse Pydantic model (API response)."""
@@ -311,8 +294,8 @@ class TestUserResponseModel:
def test_i_can_create_user_response_model(self): def test_i_can_create_user_response_model(self):
"""Test creation of valid UserResponse model without password.""" """Test creation of valid UserResponse model without password."""
user_id = ObjectId() user_id = ObjectId()
created_at = datetime.utcnow() created_at = datetime.now()
updated_at = datetime.utcnow() updated_at = datetime.now()
user_data = { user_data = {
"id": user_id, "id": user_id,
@@ -350,14 +333,14 @@ class TestUserResponseModel:
def test_i_can_convert_user_in_db_to_response(self): def test_i_can_convert_user_in_db_to_response(self):
"""Test conversion from UserInDB to UserResponse model.""" """Test conversion from UserInDB to UserResponse model."""
user_id = ObjectId() user_id = ObjectId()
created_at = datetime.utcnow() created_at = datetime.now()
updated_at = datetime.utcnow() updated_at = datetime.now()
user_in_db = UserInDB( user_in_db = UserInDB(
id=user_id, id=user_id,
username="testuser", username="testuser",
email="test@example.com", email="test@example.com",
password_hash="$2b$12$hashedpassword", hashed_password="$2b$12$hashedpassword",
role=UserRole.USER, role=UserRole.USER,
is_active=True, is_active=True,
created_at=created_at, created_at=created_at,

View File

@@ -1,385 +1,301 @@
""" """
Unit tests for user repository module. Test suite for UserRepository with async/await support.
Tests all CRUD operations for users with MongoDB mocking This module contains comprehensive tests for all UserRepository methods
to ensure proper database interactions without requiring using mongomock-motor for in-memory MongoDB testing.
actual MongoDB instance during tests.
""" """
import pytest import pytest
from unittest.mock import Mock, MagicMock
from datetime import datetime from datetime import datetime
import pytest_asyncio
from bson import ObjectId from bson import ObjectId
from pymongo.errors import DuplicateKeyError from pymongo.errors import DuplicateKeyError
from mongomock_motor import AsyncMongoMockClient
from app.database.repositories.user_repository import UserRepository from app.database.repositories.user_repository import UserRepository
from app.models.user import UserCreate, UserUpdate, UserInDB, UserRole from app.models.user import UserCreate, UserUpdate, UserInDB
@pytest.fixture @pytest_asyncio.fixture
def mock_database(): async def in_memory_repository():
"""Create mock database with users collection.""" """Create an in-memory UserRepository for testing."""
db = Mock() client = AsyncMongoMockClient()
collection = Mock() db = client.test_database
db.users = collection repo = UserRepository(db)
return db #await repo.initialize()
return repo
@pytest.fixture
def user_repository(mock_database):
"""Create UserRepository instance with mocked database."""
return UserRepository(mock_database)
@pytest.fixture @pytest.fixture
def sample_user_create(): def sample_user_create():
"""Create sample UserCreate object for testing.""" """Sample UserCreate data for testing."""
return UserCreate( return UserCreate(
username="testuser", username="testuser",
email="test@example.com", email="test@example.com",
hashed_password="hashed_password_123", password="#TestPassword123",
role=UserRole.USER, role="user"
is_active=True
) )
@pytest.fixture @pytest.fixture
def sample_user_update(): def sample_user_update():
"""Create sample UserUpdate object for testing.""" """Sample UserUpdate data for testing."""
return UserUpdate( return UserUpdate(
username="updateduser",
email="updated@example.com", email="updated@example.com",
role=UserRole.ADMIN, role="admin"
is_active=False
) )
def test_i_can_create_user(user_repository, mock_database, sample_user_create): class TestUserRepositoryCreation:
"""Test successful user creation.""" """Tests for user creation functionality."""
# Mock successful insertion
mock_result = Mock() @pytest.mark.asyncio
mock_result.inserted_id = ObjectId() async def test_i_can_create_user(self, in_memory_repository, sample_user_create):
mock_database.users.insert_one.return_value = mock_result """Test successful user creation."""
# Act
created_user = await in_memory_repository.create_user(sample_user_create)
# Assert
assert created_user is not None
assert created_user.username == sample_user_create.username
assert created_user.email == sample_user_create.email
assert created_user.role == sample_user_create.role
assert created_user.is_active is True
assert created_user.id is not None
assert created_user.created_at is not None
assert created_user.updated_at is not None
assert created_user.hashed_password != sample_user_create.password # Should be hashed
@pytest.mark.asyncio
async def test_i_cannot_create_user_with_duplicate_username(self, in_memory_repository, sample_user_create):
"""Test that creating user with duplicate username raises DuplicateKeyError."""
# Arrange
await in_memory_repository.create_user(sample_user_create)
# Act & Assert
with pytest.raises(DuplicateKeyError) as exc_info:
await in_memory_repository.create_user(sample_user_create)
assert "already exists" in str(exc_info.value)
class TestUserRepositoryFinding:
"""Tests for user finding functionality."""
@pytest.mark.asyncio
async def test_i_can_find_user_by_id(self, in_memory_repository, sample_user_create):
"""Test finding user by valid ID."""
# Arrange
created_user = await in_memory_repository.create_user(sample_user_create)
# Act
found_user = await in_memory_repository.find_user_by_id(str(created_user.id))
# Assert
assert found_user is not None
assert found_user.id == created_user.id
assert found_user.username == created_user.username
assert found_user.email == created_user.email
@pytest.mark.asyncio
async def test_i_cannot_find_user_by_invalid_id(self, in_memory_repository):
"""Test that invalid ObjectId returns None."""
# Act
found_user = await in_memory_repository.find_user_by_id("invalid_id")
# Assert
assert found_user is None
@pytest.mark.asyncio
async def test_i_cannot_find_user_by_nonexistent_id(self, in_memory_repository):
"""Test that nonexistent but valid ObjectId returns None."""
# Arrange
nonexistent_id = str(ObjectId())
# Act
found_user = await in_memory_repository.find_user_by_id(nonexistent_id)
# Assert
assert found_user is None
@pytest.mark.asyncio
async def test_i_can_find_user_by_username(self, in_memory_repository, sample_user_create):
"""Test finding user by username."""
# Arrange
created_user = await in_memory_repository.create_user(sample_user_create)
# Act
found_user = await in_memory_repository.find_user_by_username(sample_user_create.username)
# Assert
assert found_user is not None
assert found_user.username == created_user.username
assert found_user.id == created_user.id
@pytest.mark.asyncio
async def test_i_cannot_find_user_by_nonexistent_username(self, in_memory_repository):
"""Test that nonexistent username returns None."""
# Act
found_user = await in_memory_repository.find_user_by_username("nonexistent")
# Assert
assert found_user is None
@pytest.mark.asyncio
async def test_i_can_find_user_by_email(self, in_memory_repository, sample_user_create):
"""Test finding user by email."""
# Arrange
created_user = await in_memory_repository.create_user(sample_user_create)
# Act
found_user = await in_memory_repository.find_user_by_email(str(sample_user_create.email))
# Assert
assert found_user is not None
assert found_user.email == created_user.email
assert found_user.id == created_user.id
@pytest.mark.asyncio
async def test_i_cannot_find_user_by_nonexistent_email(self, in_memory_repository):
"""Test that nonexistent email returns None."""
# Act
found_user = await in_memory_repository.find_user_by_email("nonexistent@example.com")
# Assert
assert found_user is None
class TestUserRepositoryUpdate:
"""Tests for user update functionality."""
@pytest.mark.asyncio
async def test_i_can_update_user(self, in_memory_repository, sample_user_create, sample_user_update):
"""Test successful user update."""
# Arrange
created_user = await in_memory_repository.create_user(sample_user_create)
original_updated_at = created_user.updated_at
# Act
updated_user = await in_memory_repository.update_user(str(created_user.id), sample_user_update)
# Assert
assert updated_user is not None
assert updated_user.username == sample_user_update.username
assert updated_user.email == sample_user_update.email
assert updated_user.role == sample_user_update.role
assert updated_user.id == created_user.id
@pytest.mark.asyncio
async def test_i_cannot_update_user_with_invalid_id(self, in_memory_repository, sample_user_update):
"""Test that updating with invalid ID returns None."""
# Act
result = await in_memory_repository.update_user("invalid_id", sample_user_update)
# Assert
assert result is None
@pytest.mark.asyncio
async def test_i_can_update_user_with_partial_data(self, in_memory_repository, sample_user_create):
"""Test updating user with partial data."""
# Arrange
created_user = await in_memory_repository.create_user(sample_user_create)
partial_update = UserUpdate(username="newusername")
# Act
updated_user = await in_memory_repository.update_user(str(created_user.id), partial_update)
# Assert
assert updated_user is not None
assert updated_user.username == "newusername"
assert updated_user.email == created_user.email # Should remain unchanged
assert updated_user.role == created_user.role # Should remain unchanged
@pytest.mark.asyncio
async def test_i_can_update_user_with_empty_data(self, in_memory_repository, sample_user_create):
"""Test updating user with empty data returns current user."""
# Arrange
created_user = await in_memory_repository.create_user(sample_user_create)
empty_update = UserUpdate()
# Act
result = await in_memory_repository.update_user(str(created_user.id), empty_update)
# Assert
assert result is not None
assert result.username == created_user.username
assert result.email == created_user.email
class TestUserRepositoryDeletion:
"""Tests for user deletion functionality."""
@pytest.mark.asyncio
async def test_i_can_delete_user(self, in_memory_repository, sample_user_create):
"""Test successful user deletion."""
# Arrange
created_user = await in_memory_repository.create_user(sample_user_create)
# Act
deletion_result = await in_memory_repository.delete_user(str(created_user.id))
# Assert
assert deletion_result is True
# Verify user is actually deleted
found_user = await in_memory_repository.find_user_by_id(str(created_user.id))
assert found_user is None
@pytest.mark.asyncio
async def test_i_cannot_delete_user_with_invalid_id(self, in_memory_repository):
"""Test that deleting with invalid ID returns False."""
# Act
result = await in_memory_repository.delete_user("invalid_id")
# Assert
assert result is False
@pytest.mark.asyncio
async def test_i_cannot_delete_nonexistent_user(self, in_memory_repository):
"""Test that deleting nonexistent user returns False."""
# Arrange
nonexistent_id = str(ObjectId())
result = user_repository.create_user(sample_user_create) # Act
result = await in_memory_repository.delete_user(nonexistent_id)
assert isinstance(result, UserInDB) # Assert
assert result.username == sample_user_create.username assert result is False
assert result.email == sample_user_create.email
assert result.hashed_password == sample_user_create.hashed_password
assert result.role == sample_user_create.role class TestUserRepositoryUtilities:
assert result.is_active == sample_user_create.is_active """Tests for utility methods."""
assert result.id is not None
assert isinstance(result.created_at, datetime) @pytest.mark.asyncio
assert isinstance(result.updated_at, datetime) async def test_i_can_count_users(self, in_memory_repository, sample_user_create):
"""Test counting users."""
# Arrange
initial_count = await in_memory_repository.count_users()
await in_memory_repository.create_user(sample_user_create)
# Act
final_count = await in_memory_repository.count_users()
# Assert
assert final_count == initial_count + 1
# Verify insert_one was called with correct data @pytest.mark.asyncio
mock_database.users.insert_one.assert_called_once() async def test_i_can_check_user_exists(self, in_memory_repository, sample_user_create):
call_args = mock_database.users.insert_one.call_args[0][0] """Test checking if user exists."""
assert call_args["username"] == sample_user_create.username # Arrange
assert call_args["email"] == sample_user_create.email await in_memory_repository.create_user(sample_user_create)
def test_i_cannot_create_duplicate_username(user_repository, mock_database, sample_user_create):
"""Test that creating user with duplicate username raises DuplicateKeyError."""
# Mock DuplicateKeyError from MongoDB
mock_database.users.insert_one.side_effect = DuplicateKeyError("duplicate key error")
with pytest.raises(DuplicateKeyError, match="User with username 'testuser' already exists"):
user_repository.create_user(sample_user_create)
# Act
exists = await in_memory_repository.user_exists(sample_user_create.username)
not_exists = await in_memory_repository.user_exists("nonexistent")
def test_i_can_find_user_by_username(user_repository, mock_database): # Assert
"""Test finding user by username.""" assert exists is True
# Mock user document from database assert not_exists is False
user_doc = {
"_id": ObjectId(),
"username": "testuser",
"email": "test@example.com",
"hashed_password": "hashed_password_123",
"role": "user",
"is_active": True,
"created_at": datetime.utcnow(),
"updated_at": datetime.utcnow()
}
mock_database.users.find_one.return_value = user_doc
result = user_repository.find_user_by_username("testuser")
assert isinstance(result, UserInDB)
assert result.username == "testuser"
assert result.email == "test@example.com"
mock_database.users.find_one.assert_called_once_with({"username": "testuser"})
def test_i_cannot_find_nonexistent_user_by_username(user_repository, mock_database):
"""Test finding nonexistent user by username returns None."""
mock_database.users.find_one.return_value = None
result = user_repository.find_user_by_username("nonexistent")
assert result is None
mock_database.users.find_one.assert_called_once_with({"username": "nonexistent"})
def test_i_can_find_user_by_id(user_repository, mock_database):
"""Test finding user by ID."""
user_id = ObjectId()
user_doc = {
"_id": user_id,
"username": "testuser",
"email": "test@example.com",
"hashed_password": "hashed_password_123",
"role": "user",
"is_active": True,
"created_at": datetime.utcnow(),
"updated_at": datetime.utcnow()
}
mock_database.users.find_one.return_value = user_doc
result = user_repository.find_user_by_id(str(user_id))
assert isinstance(result, UserInDB)
assert result.id == user_id
assert result.username == "testuser"
mock_database.users.find_one.assert_called_once_with({"_id": user_id})
def test_i_cannot_find_user_with_invalid_id(user_repository, mock_database):
"""Test finding user with invalid ObjectId returns None."""
result = user_repository.find_user_by_id("invalid_id")
assert result is None
# find_one should not be called with invalid ID
mock_database.users.find_one.assert_not_called()
def test_i_cannot_find_nonexistent_user_by_id(user_repository, mock_database):
"""Test finding nonexistent user by ID returns None."""
user_id = ObjectId()
mock_database.users.find_one.return_value = None
result = user_repository.find_user_by_id(str(user_id))
assert result is None
mock_database.users.find_one.assert_called_once_with({"_id": user_id})
def test_i_can_find_user_by_email(user_repository, mock_database):
"""Test finding user by email address."""
user_doc = {
"_id": ObjectId(),
"username": "testuser",
"email": "test@example.com",
"hashed_password": "hashed_password_123",
"role": "user",
"is_active": True,
"created_at": datetime.utcnow(),
"updated_at": datetime.utcnow()
}
mock_database.users.find_one.return_value = user_doc
result = user_repository.find_user_by_email("test@example.com")
assert isinstance(result, UserInDB)
assert result.email == "test@example.com"
mock_database.users.find_one.assert_called_once_with({"email": "test@example.com"})
def test_i_can_update_user(user_repository, mock_database, sample_user_update):
"""Test updating user information."""
user_id = ObjectId()
# Mock successful update
mock_update_result = Mock()
mock_update_result.matched_count = 1
mock_database.users.update_one.return_value = mock_update_result
# Mock find_one for returning updated user
updated_user_doc = {
"_id": user_id,
"username": "testuser",
"email": "updated@example.com",
"hashed_password": "hashed_password_123",
"role": "admin",
"is_active": False,
"created_at": datetime.utcnow(),
"updated_at": datetime.utcnow()
}
mock_database.users.find_one.return_value = updated_user_doc
result = user_repository.update_user(str(user_id), sample_user_update)
assert isinstance(result, UserInDB)
assert result.email == "updated@example.com"
assert result.role == UserRole.ADMIN
assert result.is_active is False
# Verify update_one was called with correct data
mock_database.users.update_one.assert_called_once()
call_args = mock_database.users.update_one.call_args
assert call_args[0][0] == {"_id": user_id} # Filter
update_data = call_args[0][1]["$set"] # Update data
assert update_data["email"] == "updated@example.com"
assert update_data["role"] == UserRole.ADMIN
assert update_data["is_active"] is False
assert "updated_at" in update_data
def test_i_cannot_update_nonexistent_user(user_repository, mock_database, sample_user_update):
"""Test updating nonexistent user returns None."""
user_id = ObjectId()
# Mock no match found
mock_update_result = Mock()
mock_update_result.matched_count = 0
mock_database.users.update_one.return_value = mock_update_result
result = user_repository.update_user(str(user_id), sample_user_update)
assert result is None
def test_i_cannot_update_user_with_invalid_id(user_repository, mock_database, sample_user_update):
"""Test updating user with invalid ID returns None."""
result = user_repository.update_user("invalid_id", sample_user_update)
assert result is None
# update_one should not be called with invalid ID
mock_database.users.update_one.assert_not_called()
def test_i_can_delete_user(user_repository, mock_database):
"""Test successful user deletion."""
user_id = ObjectId()
# Mock successful deletion
mock_delete_result = Mock()
mock_delete_result.deleted_count = 1
mock_database.users.delete_one.return_value = mock_delete_result
result = user_repository.delete_user(str(user_id))
assert result is True
mock_database.users.delete_one.assert_called_once_with({"_id": user_id})
def test_i_cannot_delete_nonexistent_user(user_repository, mock_database):
"""Test deleting nonexistent user returns False."""
user_id = ObjectId()
# Mock no deletion occurred
mock_delete_result = Mock()
mock_delete_result.deleted_count = 0
mock_database.users.delete_one.return_value = mock_delete_result
result = user_repository.delete_user(str(user_id))
assert result is False
def test_i_cannot_delete_user_with_invalid_id(user_repository, mock_database):
"""Test deleting user with invalid ID returns False."""
result = user_repository.delete_user("invalid_id")
assert result is False
# delete_one should not be called with invalid ID
mock_database.users.delete_one.assert_not_called()
def test_i_can_list_users(user_repository, mock_database):
"""Test listing users with pagination."""
# Mock cursor with user documents
user_docs = [
{
"_id": ObjectId(),
"username": "user1",
"email": "user1@example.com",
"hashed_password": "hash1",
"role": "user",
"is_active": True,
"created_at": datetime.utcnow(),
"updated_at": datetime.utcnow()
},
{
"_id": ObjectId(),
"username": "user2",
"email": "user2@example.com",
"hashed_password": "hash2",
"role": "admin",
"is_active": False,
"created_at": datetime.utcnow(),
"updated_at": datetime.utcnow()
}
]
mock_cursor = Mock()
mock_cursor.__iter__.return_value = iter(user_docs)
mock_cursor.skip.return_value = mock_cursor
mock_cursor.limit.return_value = mock_cursor
mock_database.users.find.return_value = mock_cursor
result = user_repository.list_users(skip=10, limit=50)
assert len(result) == 2
assert all(isinstance(user, UserInDB) for user in result)
assert result[0].username == "user1"
assert result[1].username == "user2"
mock_database.users.find.assert_called_once()
mock_cursor.skip.assert_called_once_with(10)
mock_cursor.limit.assert_called_once_with(50)
def test_i_can_count_users(user_repository, mock_database):
"""Test counting total users."""
mock_database.users.count_documents.return_value = 42
result = user_repository.count_users()
assert result == 42
mock_database.users.count_documents.assert_called_once_with({})
def test_i_can_check_user_exists(user_repository, mock_database):
"""Test checking if user exists by username."""
mock_database.users.count_documents.return_value = 1
result = user_repository.user_exists("testuser")
assert result is True
mock_database.users.count_documents.assert_called_once_with({"username": "testuser"})
def test_i_can_check_user_does_not_exist(user_repository, mock_database):
"""Test checking if user does not exist by username."""
mock_database.users.count_documents.return_value = 0
result = user_repository.user_exists("nonexistent")
assert result is False
mock_database.users.count_documents.assert_called_once_with({"username": "nonexistent"})
def test_i_can_create_indexes_on_initialization(mock_database):
"""Test that indexes are created when repository is initialized."""
# Mock create_index to not raise exception
mock_database.users.create_index.return_value = None
repository = UserRepository(mock_database)
mock_database.users.create_index.assert_called_once_with("username", unique=True)
def test_i_can_handle_index_creation_error(mock_database):
"""Test that index creation errors are handled gracefully."""
# Mock create_index to raise exception (index already exists)
mock_database.users.create_index.side_effect = Exception("Index already exists")
# Should not raise exception
repository = UserRepository(mock_database)
assert repository is not None
mock_database.users.create_index.assert_called_once_with("username", unique=True)