Compare commits

3 Commits

Author SHA1 Message Date
c3ea80363f Working on document repository 2025-09-18 22:53:51 +02:00
df86a3d998 Fixed docker config. Added services 2025-09-17 22:45:33 +02:00
da63f1b75b Fixed unit tests 2025-09-17 21:24:03 +02:00
21 changed files with 2246 additions and 712 deletions

346
Readme.md
View File

@@ -2,11 +2,14 @@
## Overview
MyDocManager is a real-time document processing application that automatically detects files in a monitored directory, processes them asynchronously, and stores the results in a database. The application uses a modern microservices architecture with Redis for task queuing and MongoDB for data persistence.
MyDocManager is a real-time document processing application that automatically detects files in a monitored directory,
processes them asynchronously, and stores the results in a database. The application uses a modern microservices
architecture with Redis for task queuing and MongoDB for data persistence.
## Architecture
### Technology Stack
- **Backend API**: FastAPI (Python 3.12)
- **Task Processing**: Celery with Redis broker
- **Document Processing**: EasyOCR, PyMuPDF, python-docx, pdfplumber
@@ -16,6 +19,7 @@ MyDocManager is a real-time document processing application that automatically d
- **File Monitoring**: Python watchdog library
### Services Architecture
┌─────────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Frontend │ │ file- │ │ Redis │ │ Worker │ │ MongoDB │
│ (React) │◄──►│ processor │───►│ (Broker) │◄──►│ (Celery) │───►│ (Results) │
@@ -24,13 +28,13 @@ MyDocManager is a real-time document processing application that automatically d
└─────────────────┘ └─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘
### Docker Services
1. **file-processor**: FastAPI + real-time file monitoring + Celery task dispatch
2. **worker**: Celery workers for document processing (OCR, text extraction)
3. **redis**: Message broker for Celery tasks
4. **mongodb**: Final database for processing results
5. **frontend**: React interface for monitoring and file access
## Data Flow
1. **File Detection**: Watchdog monitors target directory in real-time
@@ -42,11 +46,13 @@ MyDocManager is a real-time document processing application that automatically d
## Document Processing Capabilities
### Supported File Types
- **PDF**: Direct text extraction + OCR for scanned documents
- **Word Documents**: .docx text extraction
- **Images**: OCR text recognition (JPG, PNG, etc.)
### Processing Libraries
- **EasyOCR**: Modern OCR engine (80+ languages, deep learning-based)
- **PyMuPDF**: PDF text extraction and manipulation
- **python-docx**: Word document processing
@@ -55,12 +61,15 @@ MyDocManager is a real-time document processing application that automatically d
## Development Environment
### Container-Based Development
The application is designed for container-based development with hot-reload capabilities:
- Source code mounted as volumes for real-time updates
- All services orchestrated via Docker Compose
- Development and production parity
### Key Features
- **Real-time Processing**: Immediate file detection and processing
- **Horizontal Scaling**: Multiple workers can be added easily
- **Fault Tolerance**: Celery provides automatic retry mechanisms
@@ -68,6 +77,7 @@ The application is designed for container-based development with hot-reload capa
- **Hot Reload**: Development changes reflected instantly in containers
### Docker Services
1. **file-processor**: FastAPI + real-time file monitoring + Celery task dispatch
2. **worker**: Celery workers for document processing (OCR, text extraction)
3. **redis**: Message broker for Celery tasks
@@ -138,6 +148,7 @@ MyDocManager/
## Authentication & User Management
### Security Features
- **JWT Authentication**: Stateless authentication with 24-hour token expiration
- **Password Security**: bcrypt hashing with automatic salting
- **Role-Based Access**: Admin and User roles with granular permissions
@@ -145,16 +156,19 @@ MyDocManager/
- **Auto Admin Creation**: Default admin user created on first startup
### User Roles
- **Admin**: Full access to user management (create, read, update, delete users)
- **User**: Limited access (view own profile, access document processing features)
### Authentication Flow
1. **Login**: User provides credentials → Server validates → Returns JWT token
2. **API Access**: Client includes JWT in Authorization header
3. **Token Validation**: Server verifies token signature and expiration
4. **Role Check**: Server validates user permissions for requested resource
### User Management APIs
```
POST /auth/login # Generate JWT token
GET /users # List all users (admin only)
@@ -164,10 +178,227 @@ DELETE /users/{user_id} # Delete user (admin only)
GET /users/me # Get current user profile (authenticated users)
```
### Useful Service URLs
## Docker Commands Reference
- **FastAPI API**: http://localhost:8000
- **FastAPI Docs**: http://localhost:8000/docs
- **Health Check**: http://localhost:8000/health
- **Redis**: localhost:6379
- **MongoDB**: localhost:27017
### Initial Setup & Build
### Testing Commands
```bash
# Test FastAPI health
curl http://localhost:8000/health
# Test Celery task dispatch
curl -X POST http://localhost:8000/test-task \
-H "Content-Type: application/json" \
-d '{"message": "Hello from test!"}'
# Monitor Celery tasks
docker-compose logs -f worker
```
## Default Admin User
On first startup, the application automatically creates a default admin user:
- **Username**: `admin`
- **Password**: `admin`
- **Role**: `admin`
- **Email**: `admin@mydocmanager.local`
**⚠️ Important**: Change the default admin password immediately after first login in production environments.
## File Processing Architecture
### Document Processing Flow
1. **File Detection**: Watchdog monitors `/volumes/watched_files/` directory in real-time
2. **Task Creation**: File watcher creates Celery task for each detected file
3. **Document Processing**: Celery worker processes the document and extracts content
4. **Database Storage**: Processed data stored in MongoDB collections
### MongoDB Collections Design
#### Files Collection
Stores file metadata and extracted content:
```json
{
"_id": "ObjectId",
"filename": "document.pdf",
"filepath": "/watched_files/document.pdf",
"file_type": "pdf",
"mime_type": "application/pdf",
"file_size": 2048576,
"content": "extracted text content...",
"encoding": "utf-8",
"extraction_method": "direct_text",
// direct_text, ocr, hybrid
"metadata": {
"page_count": 15,
// for PDFs
"word_count": 250,
// for text files
"image_dimensions": {
// for images
"width": 1920,
"height": 1080
}
},
"detected_at": "2024-01-15T10:29:00Z",
"file_hash": "sha256_hash_value"
}
```
#### Processing Jobs Collection
Tracks processing status and lifecycle:
```json
{
"_id": "ObjectId",
"file_id": "reference_to_files_collection",
"status": "completed",
// pending, processing, completed, failed
"task_id": "celery_task_uuid",
"created_at": "2024-01-15T10:29:00Z",
"started_at": "2024-01-15T10:29:30Z",
"completed_at": "2024-01-15T10:30:00Z",
"error_message": null
}
```
### Supported File Types (Initial Implementation)
- **Text Files** (`.txt`): Direct content reading
- **PDF Documents** (`.pdf`): Text extraction via PyMuPDF/pdfplumber
- **Word Documents** (`.docx`): Content extraction via python-docx
### File Processing Architecture Decisions
#### Watchdog Implementation
- **Choice**: Dedicated observer thread (Option A)
- **Rationale**: Standard approach, clean separation of concerns
- **Implementation**: Watchdog observer runs in separate thread from FastAPI
#### Task Dispatch Strategy
- **Choice**: Direct Celery task creation from file watcher
- **Rationale**: Minimal latency, straightforward flow
- **Implementation**: File detected → Immediate Celery task dispatch
#### Data Storage Strategy
- **Choice**: Separate collections for files and processing status
- **Rationale**: Clean separation of file data vs processing lifecycle
- **Benefits**:
- Better query performance
- Clear data model boundaries
- Easy processing status tracking
#### Content Storage Location
- **Choice**: Store extracted content in `files` collection
- **Rationale**: Content is intrinsic property of the file
- **Benefits**: Single query to get file + content, simpler data model
### Implementation Order
1. ✅ Pydantic models for MongoDB collections
2. ✅ Repository layer for data access (files + processing_jobs)
3. ✅ Celery tasks for document processing
4. ✅ Watchdog file monitoring implementation
5. ✅ FastAPI integration and startup coordination
### Processing Pipeline Features
- **Duplicate Detection**: SHA256 hashing prevents reprocessing same files
- **Error Handling**: Failed processing tracked with error messages
- **Status Tracking**: Real-time processing status via `processing_jobs` collection
- **Extensible Metadata**: Flexible metadata storage per file type
- **Multiple Extraction Methods**: Support for direct text, OCR, and hybrid approaches
## Key Implementation Notes
### Python Standards
- **Style**: PEP 8 compliance
- **Documentation**: Google/NumPy docstring format
- **Naming**: snake_case for variables and functions
- **Testing**: pytest with test_i_can_xxx / test_i_cannot_xxx patterns
### Security Best Practices
- **Password Storage**: Never store plain text passwords, always use bcrypt hashing
- **JWT Secrets**: Use strong, randomly generated secret keys in production
- **Token Expiration**: 24-hour expiration with secure signature validation
- **Role Validation**: Server-side role checking for all protected endpoints
### Dependencies Management
- **Package Manager**: pip (standard)
- **External Dependencies**: Listed in each service's requirements.txt
- **Standard Library First**: Prefer standard library when possible
### Testing Strategy
- All code must be testable
- Unit tests for each authentication and user management function
- Integration tests for complete authentication flow
- Tests validated before implementation
### Critical Architecture Decisions Made
1. **JWT Authentication**: Simple token-based auth with 24-hour expiration
2. **Role-Based Access**: Admin/User roles for granular permissions
3. **bcrypt Password Hashing**: Industry-standard password security
4. **MongoDB User Storage**: Centralized user management in main database
5. **Auto Admin Creation**: Automatic setup for first-time deployment
6. **Single FastAPI Service**: Handles both API and file watching with authentication
7. **Celery with Redis**: Chosen over other async patterns for scalability
8. **EasyOCR Preferred**: Selected over Tesseract for modern OCR needs
9. **Container Development**: Hot-reload setup required for development workflow
10. **Dedicated Watchdog Observer**: Thread-based file monitoring for reliability
11. **Separate MongoDB Collections**: Files and processing jobs stored separately
12. **Content in Files Collection**: Extracted content stored with file metadata
13. **Direct Task Dispatch**: File watcher directly creates Celery tasks
14. **SHA256 Duplicate Detection**: Prevents reprocessing identical files
### Development Process Requirements
1. **Collaborative Validation**: All options must be explained before coding
2. **Test-First Approach**: Test cases defined and validated before implementation
3. **Incremental Development**: Start simple, extend functionality progressively
4. **Error Handling**: Clear problem explanation required before proposing fixes
### Next Implementation Steps
1. ✅ Create docker-compose.yml with all services => Done
2. ✅ Define user management and authentication architecture => Done
3. ✅ Implement user models and authentication services =>
1. models/user.py => Done
2. models/auth.py => Done
3. database/repositories/user_repository.py => Done
4. ✅ Add automatic admin user creation if it does not exists => Done
5. **IN PROGRESS**: Implement file processing pipeline =>
1. Create Pydantic models for files and processing_jobs collections
2. Implement repository layer for file and processing job data access
3. Create Celery tasks for document processing (.txt, .pdf, .docx)
4. Implement Watchdog file monitoring with dedicated observer
5. Integrate file watcher with FastAPI startup
6. Create protected API routes for user management
7. Build React monitoring interface with authentication
## Annexes
### Docker Commands Reference
#### Initial Setup & Build
```bash
# Build and start all services (first time)
@@ -181,7 +412,7 @@ docker-compose build file-processor
docker-compose build worker
```
### Development Workflow
#### Development Workflow
```bash
# Start all services
@@ -203,7 +434,7 @@ docker-compose restart redis
docker-compose restart mongodb
```
### Monitoring & Debugging
#### Monitoring & Debugging
```bash
# View logs of all services
@@ -228,7 +459,7 @@ docker-compose exec worker bash
docker-compose exec mongodb mongosh
```
### Service Management
#### Service Management
```bash
# Start only specific services
@@ -248,103 +479,6 @@ docker-compose up --scale worker=3
### Hot-Reload Configuration
- **file-processor**: Hot-reload enabled via `--reload` flag
- Code changes in `src/file-processor/app/` automatically restart FastAPI
- Code changes in `src/file-processor/app/` automatically restart FastAPI
- **worker**: No hot-reload (manual restart required for stability)
- Code changes in `src/worker/tasks/` require: `docker-compose restart worker`
### Useful Service URLs
- **FastAPI API**: http://localhost:8000
- **FastAPI Docs**: http://localhost:8000/docs
- **Health Check**: http://localhost:8000/health
- **Redis**: localhost:6379
- **MongoDB**: localhost:27017
### Testing Commands
```bash
# Test FastAPI health
curl http://localhost:8000/health
# Test Celery task dispatch
curl -X POST http://localhost:8000/test-task \
-H "Content-Type: application/json" \
-d '{"message": "Hello from test!"}'
# Monitor Celery tasks
docker-compose logs -f worker
```
## Default Admin User
On first startup, the application automatically creates a default admin user:
- **Username**: `admin`
- **Password**: `admin`
- **Role**: `admin`
- **Email**: `admin@mydocmanager.local`
**⚠️ Important**: Change the default admin password immediately after first login in production environments.
## Key Implementation Notes
### Python Standards
- **Style**: PEP 8 compliance
- **Documentation**: Google/NumPy docstring format
- **Naming**: snake_case for variables and functions
- **Testing**: pytest with test_i_can_xxx / test_i_cannot_xxx patterns
### Security Best Practices
- **Password Storage**: Never store plain text passwords, always use bcrypt hashing
- **JWT Secrets**: Use strong, randomly generated secret keys in production
- **Token Expiration**: 24-hour expiration with secure signature validation
- **Role Validation**: Server-side role checking for all protected endpoints
### Dependencies Management
- **Package Manager**: pip (standard)
- **External Dependencies**: Listed in each service's requirements.txt
- **Standard Library First**: Prefer standard library when possible
### Testing Strategy
- All code must be testable
- Unit tests for each authentication and user management function
- Integration tests for complete authentication flow
- Tests validated before implementation
### Critical Architecture Decisions Made
1. **JWT Authentication**: Simple token-based auth with 24-hour expiration
2. **Role-Based Access**: Admin/User roles for granular permissions
3. **bcrypt Password Hashing**: Industry-standard password security
4. **MongoDB User Storage**: Centralized user management in main database
5. **Auto Admin Creation**: Automatic setup for first-time deployment
6. **Single FastAPI Service**: Handles both API and file watching with authentication
7. **Celery with Redis**: Chosen over other async patterns for scalability
8. **EasyOCR Preferred**: Selected over Tesseract for modern OCR needs
9. **Container Development**: Hot-reload setup required for development workflow
### Development Process Requirements
1. **Collaborative Validation**: All options must be explained before coding
2. **Test-First Approach**: Test cases defined and validated before implementation
3. **Incremental Development**: Start simple, extend functionality progressively
4. **Error Handling**: Clear problem explanation required before proposing fixes
### Next Implementation Steps
1. ✅ Create docker-compose.yml with all services
2. ✅ Define user management and authentication architecture
3. Implement user models and authentication services
4. Create protected API routes for user management
5. Add automatic admin user creation
6. Implement basic FastAPI service structure
7. Add watchdog file monitoring
8. Create Celery task structure
9. Implement document processing tasks
10. Build React monitoring interface with authentication
### prochaines étapes
MongoDB CRUD
Nous devons absolument mocker MongoDB pour les tests unitaires avec pytest-mock
Fichiers à créer:
* app/models/auht.py => déjà fait
* app/models/user.py => déjà fait
* app/database/connection.py
* Utilise les settings pour l'URL MongoDB. Il faut créer un fichier de configuration (app/config/settings.py)
* Fonction get_database() + gestion des erreurs
* Configuration via variables d'environnement
* app/database/repositories/user_repository.py
- Code changes in `src/worker/tasks/` require: `docker-compose restart worker`

View File

@@ -1,5 +1,3 @@
version: '3.8'
services:
# Redis - Message broker for Celery
redis:
@@ -36,15 +34,16 @@ services:
environment:
- REDIS_URL=redis://redis:6379/0
- MONGODB_URL=mongodb://admin:password123@mongodb:27017/mydocmanager?authSource=admin
- PYTHONPATH=/app
volumes:
- ./src/file-processor/app:/app
- ./src/file-processor:/app
- ./volumes/watched_files:/watched_files
depends_on:
- redis
- mongodb
networks:
- mydocmanager-network
command: uvicorn main:app --host 0.0.0.0 --port 8000 --reload
command: uvicorn app.main:app --host 0.0.0.0 --port 8000 --reload
# Worker - Celery workers for document processing
worker:
@@ -55,6 +54,7 @@ services:
environment:
- REDIS_URL=redis://redis:6379/0
- MONGODB_URL=mongodb://admin:password123@mongodb:27017/mydocmanager?authSource=admin
- PYTHONPATH=/app
volumes:
- ./src/worker/tasks:/app
- ./volumes/watched_files:/watched_files

7
pytest.ini Normal file
View File

@@ -0,0 +1,7 @@
[tool:pytest]
asyncio_mode = auto
testpaths = tests
python_files = test_*.py
python_classes = Test*
python_functions = test_*
pythonpath = src/file-processor

View File

@@ -3,7 +3,6 @@ annotated-types==0.7.0
anyio==4.10.0
bcrypt==4.3.0
billiard==4.2.1
bson==0.5.10
celery==5.5.3
click==8.2.1
click-didyoumean==0.3.1
@@ -17,17 +16,25 @@ httptools==0.6.4
idna==3.10
iniconfig==2.1.0
kombu==5.5.4
mongomock==4.3.0
mongomock-motor==0.0.36
motor==3.7.1
packaging==25.0
pipdeptree==2.28.0
pluggy==1.6.0
prompt_toolkit==3.0.52
pydantic==2.11.9
pydantic_core==2.33.2
Pygments==2.19.2
pymongo==4.15.0
pymongo==4.15.1
pytest==8.4.2
pytest-asyncio==1.2.0
pytest-mock==3.15.1
python-dateutil==2.9.0.post0
python-dotenv==1.1.1
pytz==2025.2
PyYAML==6.0.2
sentinels==1.1.1
six==1.17.0
sniffio==1.3.1
starlette==0.47.3

View File

@@ -8,10 +8,12 @@ COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application code
COPY app/ .
COPY . .
ENV PYTHONPATH=/app
# Expose port
EXPOSE 8000
# Command will be overridden by docker-compose
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]

View File

@@ -11,7 +11,7 @@ from pymongo import MongoClient
from pymongo.database import Database
from pymongo.errors import ConnectionFailure, ServerSelectionTimeoutError
from ..config.settings import get_mongodb_url, get_mongodb_database_name
from app.config.settings import get_mongodb_url, get_mongodb_database_name
# Global variables for singleton pattern
_client: Optional[MongoClient] = None

View File

@@ -0,0 +1,248 @@
"""
File repository for database operations on FileDocument collection.
This module provides data access operations for file documents stored
in MongoDB with proper error handling and type safety.
"""
from typing import Optional, List
from bson import ObjectId
from pymongo.errors import DuplicateKeyError, PyMongoError
from difflib import SequenceMatcher
from motor.motor_asyncio import AsyncIOMotorCollection
from app.models.document import FileDocument
from app.database.connection import get_database
class FileDocumentRepository:
"""
Repository class for file document database operations.
This class handles all database operations for FileDocument objects
with proper error handling and data validation.
"""
def __init__(self):
"""Initialize file repository with database connection."""
self.db = get_database()
self.collection: AsyncIOMotorCollection = self.db.files
self._ensure_indexes()
async def _ensure_indexes(self):
"""
Ensure required database indexes exist.
Creates unique index on username field to prevent duplicates.
"""
try:
await self.collection.create_index("filepath", unique=True)
except PyMongoError:
# Index might already exist, ignore error
pass
async def create_document(self, file_data: FileDocument) -> FileDocument:
"""
Create a new file document in database.
Args:
file_data (FileDocument): File document data to create
Returns:
FileDocument: Created file document with database ID
Raises:
ValueError: If file creation fails due to validation
DuplicateKeyError: If file with same hash already exists
"""
try:
file_dict = file_data.model_dump(by_alias=True, exclude_unset=True)
if "_id" in file_dict and file_dict["_id"] is None:
del file_dict["_id"]
result = await self.collection.insert_one(file_dict)
file_data.id = result.inserted_id
return file_data
except DuplicateKeyError as e:
raise DuplicateKeyError(f"File with same hash already exists: {e}")
except PyMongoError as e:
raise ValueError(f"Failed to create file document: {e}")
async def find_document_by_id(self, file_id: str) -> Optional[FileDocument]:
"""
Find file document by ID.
Args:
file_id (str): File document ID to search for
Returns:
FileDocument or None: File document if found, None otherwise
"""
try:
if not ObjectId.is_valid(file_id):
return None
file_doc = await self.collection.find_one({"_id": ObjectId(file_id)})
if file_doc:
return FileDocument(**file_doc)
return None
except PyMongoError:
return None
async def find_document_by_hash(self, file_hash: str) -> Optional[FileDocument]:
"""
Find file document by file hash to detect duplicates.
Args:
file_hash (str): SHA256 hash of file content
Returns:
FileDocument or None: File document if found, None otherwise
"""
try:
file_doc = await self.collection.find_one({"file_hash": file_hash})
if file_doc:
return FileDocument(**file_doc)
return None
except PyMongoError:
return None
async def find_document_by_filepath(self, filepath: str) -> Optional[FileDocument]:
"""
Find file document by exact filepath.
Args:
filepath (str): Full path to the file
Returns:
FileDocument or None: File document if found, None otherwise
"""
try:
file_doc = await self.collection.find_one({"filepath": filepath})
if file_doc:
return FileDocument(**file_doc)
return None
except PyMongoError:
return None
async def find_document_by_name(self, filename: str, similarity_threshold: float = 0.6) -> List[FileDocument]:
"""
Find file documents by filename using fuzzy matching.
Args:
filename (str): Filename to search for
similarity_threshold (float): Minimum similarity ratio (0.0 to 1.0)
Returns:
List[FileDocument]: List of matching files sorted by similarity score
"""
try:
# Get all files from database
cursor = self.collection.find({})
all_files = await cursor.to_list(length=None)
matches = []
for file_doc in all_files:
file_obj = FileDocument(**file_doc)
# Calculate similarity between search term and filename
similarity = SequenceMatcher(None, filename.lower(), file_obj.filename.lower()).ratio()
if similarity >= similarity_threshold:
matches.append((file_obj, similarity))
# Sort by similarity score (highest first)
matches.sort(key=lambda x: x[1], reverse=True)
# Return only the FileDocument objects
return [match[0] for match in matches]
except PyMongoError:
return []
async def list_documents(self, skip: int = 0, limit: int = 100) -> List[FileDocument]:
"""
List file documents with pagination.
Args:
skip (int): Number of documents to skip (default: 0)
limit (int): Maximum number of documents to return (default: 100)
Returns:
List[FileDocument]: List of file documents
"""
try:
cursor = self.collection.find({}).skip(skip).limit(limit).sort("detected_at", -1)
file_docs = await cursor.to_list(length=limit)
return [FileDocument(**doc) for doc in file_docs]
except PyMongoError:
return []
async def count_documents(self) -> int:
"""
Count total number of file documents.
Returns:
int: Total number of file documents in collection
"""
try:
return await self.collection.count_documents({})
except PyMongoError:
return 0
async def update_document(self, file_id: str, update_data: dict) -> Optional[FileDocument]:
"""
Update file document with new data.
Args:
file_id (str): File document ID to update
update_data (dict): Fields to update
Returns:
FileDocument or None: Updated file document if successful, None otherwise
"""
try:
if not ObjectId.is_valid(file_id):
return None
# Remove None values from update data
clean_update_data = {k: v for k, v in update_data.items() if v is not None}
if not clean_update_data:
return await self.find_document_by_id(file_id)
result = await self.collection.find_one_and_update(
{"_id": ObjectId(file_id)},
{"$set": clean_update_data},
return_document=True
)
if result:
return FileDocument(**result)
return None
except PyMongoError:
return None
async def delete_document(self, file_id: str) -> bool:
"""
Delete file document from database.
Args:
file_id (str): File document ID to delete
Returns:
bool: True if file was deleted, False otherwise
"""
try:
if not ObjectId.is_valid(file_id):
return False
result = await self.collection.delete_one({"_id": ObjectId(file_id)})
return result.deleted_count > 0
except PyMongoError:
return False

View File

@@ -2,17 +2,17 @@
User repository for MongoDB operations.
This module implements the repository pattern for user CRUD operations
with dependency injection of the database connection.
with dependency injection of the database connection using async/await.
"""
from typing import Optional, List, Dict, Any
from typing import Optional, List
from datetime import datetime
from bson import ObjectId
from pymongo.database import Database
from pymongo.errors import DuplicateKeyError
from pymongo.collection import Collection
from motor.motor_asyncio import AsyncIOMotorDatabase, AsyncIOMotorCollection
from pymongo.errors import DuplicateKeyError, PyMongoError
from app.models.user import UserCreate, UserInDB, UserUpdate
from app.utils.security import hash_password
class UserRepository:
@@ -20,35 +20,33 @@ class UserRepository:
Repository class for user CRUD operations in MongoDB.
This class handles all database operations related to users,
following the repository pattern with dependency injection.
following the repository pattern with dependency injection and async/await.
"""
def __init__(self, database: Database):
def __init__(self, database: AsyncIOMotorDatabase):
"""
Initialize repository with database dependency.
Args:
database (Database): MongoDB database instance
database (AsyncIOMotorDatabase): MongoDB database instance
"""
self.db = database
self.collection: Collection = database.users
# Create unique index on username for duplicate prevention
self.collection: AsyncIOMotorCollection = database.users
self._ensure_indexes()
def _ensure_indexes(self):
async def _ensure_indexes(self):
"""
Ensure required database indexes exist.
Creates unique index on username field to prevent duplicates.
"""
try:
self.collection.create_index("username", unique=True)
except Exception:
await self.collection.create_index("username", unique=True)
except PyMongoError:
# Index might already exist, ignore error
pass
def create_user(self, user_data: UserCreate) -> UserInDB:
async def create_user(self, user_data: UserCreate) -> UserInDB:
"""
Create a new user in the database.
@@ -60,25 +58,28 @@ class UserRepository:
Raises:
DuplicateKeyError: If username already exists
ValueError: If user creation fails due to validation
"""
user_dict = {
"username": user_data.username,
"email": user_data.email,
"hashed_password": user_data.hashed_password,
"hashed_password": hash_password(user_data.password),
"role": user_data.role,
"is_active": user_data.is_active,
"created_at": datetime.utcnow(),
"updated_at": datetime.utcnow()
"is_active": True,
"created_at": datetime.now(),
"updated_at": datetime.now()
}
try:
result = self.collection.insert_one(user_dict)
result = await self.collection.insert_one(user_dict)
user_dict["_id"] = result.inserted_id
return UserInDB(**user_dict)
except DuplicateKeyError:
raise DuplicateKeyError(f"User with username '{user_data.username}' already exists")
except DuplicateKeyError as e:
raise DuplicateKeyError(f"User with username '{user_data.username}' already exists: {e}")
except PyMongoError as e:
raise ValueError(f"Failed to create user: {e}")
def find_user_by_username(self, username: str) -> Optional[UserInDB]:
async def find_user_by_username(self, username: str) -> Optional[UserInDB]:
"""
Find user by username.
@@ -88,12 +89,15 @@ class UserRepository:
Returns:
UserInDB or None: User if found, None otherwise
"""
user_doc = self.collection.find_one({"username": username})
if user_doc:
return UserInDB(**user_doc)
return None
try:
user_doc = await self.collection.find_one({"username": username})
if user_doc:
return UserInDB(**user_doc)
return None
except PyMongoError:
return None
def find_user_by_id(self, user_id: str) -> Optional[UserInDB]:
async def find_user_by_id(self, user_id: str) -> Optional[UserInDB]:
"""
Find user by ID.
@@ -104,16 +108,17 @@ class UserRepository:
UserInDB or None: User if found, None otherwise
"""
try:
object_id = ObjectId(user_id)
user_doc = self.collection.find_one({"_id": object_id})
if not ObjectId.is_valid(user_id):
return None
user_doc = await self.collection.find_one({"_id": ObjectId(user_id)})
if user_doc:
return UserInDB(**user_doc)
except Exception:
# Invalid ObjectId format
pass
return None
return None
except PyMongoError:
return None
def find_user_by_email(self, email: str) -> Optional[UserInDB]:
async def find_user_by_email(self, email: str) -> Optional[UserInDB]:
"""
Find user by email address.
@@ -123,12 +128,15 @@ class UserRepository:
Returns:
UserInDB or None: User if found, None otherwise
"""
user_doc = self.collection.find_one({"email": email})
if user_doc:
return UserInDB(**user_doc)
return None
try:
user_doc = await self.collection.find_one({"email": email})
if user_doc:
return UserInDB(**user_doc)
return None
except PyMongoError:
return None
def update_user(self, user_id: str, user_update: UserUpdate) -> Optional[UserInDB]:
async def update_user(self, user_id: str, user_update: UserUpdate) -> Optional[UserInDB]:
"""
Update user information.
@@ -140,34 +148,43 @@ class UserRepository:
UserInDB or None: Updated user if found, None otherwise
"""
try:
object_id = ObjectId(user_id)
if not ObjectId.is_valid(user_id):
return None
# Build update document with only provided fields
update_data = {"updated_at": datetime.utcnow()}
update_data = {"updated_at": datetime.now()}
if user_update.username is not None:
update_data["username"] = user_update.username
if user_update.email is not None:
update_data["email"] = user_update.email
if user_update.hashed_password is not None:
update_data["hashed_password"] = user_update.hashed_password
if user_update.password is not None:
update_data["hashed_password"] = hash_password(user_update.password)
if user_update.role is not None:
update_data["role"] = user_update.role
if user_update.is_active is not None:
update_data["is_active"] = user_update.is_active
result = self.collection.update_one(
{"_id": object_id},
{"$set": update_data}
# Remove None values from update data
clean_update_data = {k: v for k, v in update_data.items() if v is not None}
if not clean_update_data:
return await self.find_user_by_id(user_id)
result = await self.collection.find_one_and_update(
{"_id": ObjectId(user_id)},
{"$set": clean_update_data},
return_document=True
)
if result.matched_count > 0:
return self.find_user_by_id(user_id)
if result:
return UserInDB(**result)
return None
except Exception:
# Invalid ObjectId format or other errors
pass
return None
except PyMongoError:
return None
def delete_user(self, user_id: str) -> bool:
async def delete_user(self, user_id: str) -> bool:
"""
Delete user from database.
@@ -178,14 +195,15 @@ class UserRepository:
bool: True if user was deleted, False otherwise
"""
try:
object_id = ObjectId(user_id)
result = self.collection.delete_one({"_id": object_id})
if not ObjectId.is_valid(user_id):
return False
result = await self.collection.delete_one({"_id": ObjectId(user_id)})
return result.deleted_count > 0
except Exception:
# Invalid ObjectId format
except PyMongoError:
return False
def list_users(self, skip: int = 0, limit: int = 100) -> List[UserInDB]:
async def list_users(self, skip: int = 0, limit: int = 100) -> List[UserInDB]:
"""
List users with pagination.
@@ -196,19 +214,26 @@ class UserRepository:
Returns:
List[UserInDB]: List of users
"""
cursor = self.collection.find().skip(skip).limit(limit)
return [UserInDB(**user_doc) for user_doc in cursor]
try:
cursor = self.collection.find({}).skip(skip).limit(limit).sort("created_at", -1)
user_docs = await cursor.to_list(length=limit)
return [UserInDB(**user_doc) for user_doc in user_docs]
except PyMongoError:
return []
def count_users(self) -> int:
async def count_users(self) -> int:
"""
Count total number of users.
Returns:
int: Total number of users in database
"""
return self.collection.count_documents({})
try:
return await self.collection.count_documents({})
except PyMongoError:
return 0
def user_exists(self, username: str) -> bool:
async def user_exists(self, username: str) -> bool:
"""
Check if user exists by username.
@@ -218,4 +243,8 @@ class UserRepository:
Returns:
bool: True if user exists, False otherwise
"""
return self.collection.count_documents({"username": username}) > 0
try:
count = await self.collection.count_documents({"username": username})
return count > 0
except PyMongoError:
return False

View File

@@ -4,17 +4,74 @@ FastAPI application for MyDocManager file processor service.
This service provides API endpoints for health checks and task dispatching.
"""
import logging
import os
from fastapi import FastAPI, HTTPException
from contextlib import asynccontextmanager
from fastapi import FastAPI, HTTPException, Depends
from pydantic import BaseModel
import redis
from celery import Celery
from app.database.connection import test_database_connection, get_database
from app.database.repositories.user_repository import UserRepository
from app.models.user import UserCreate
from app.services.init_service import InitializationService
from app.services.user_service import UserService
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@asynccontextmanager
async def lifespan(app: FastAPI):
"""
Application lifespan manager for startup and shutdown tasks.
Handles initialization tasks that need to run when the application starts,
including admin user creation and other setup procedures.
"""
# Startup tasks
logger.info("Starting MyDocManager application...")
try:
# Initialize database connection
database = get_database()
# Initialize repositories and services
user_repository = UserRepository(database)
user_service = UserService(user_repository)
init_service = InitializationService(user_service)
# Run initialization tasks
initialization_result = init_service.initialize_application()
if initialization_result["initialization_success"]:
logger.info("Application startup completed successfully")
if initialization_result["admin_user_created"]:
logger.info("Default admin user was created during startup")
else:
logger.error("Application startup completed with errors:")
for error in initialization_result["errors"]:
logger.error(f" - {error}")
except Exception as e:
logger.error(f"Critical error during application startup: {str(e)}")
# You might want to decide if the app should continue or exit here
# For now, we log the error but continue
yield # Application is running
# Shutdown tasks (if needed)
logger.info("Shutting down MyDocManager application...")
# Initialize FastAPI app
app = FastAPI(
title="MyDocManager File Processor",
description="File processing and task dispatch service",
version="1.0.0"
version="1.0.0",
lifespan=lifespan
)
# Environment variables
@@ -42,6 +99,26 @@ class TestTaskRequest(BaseModel):
message: str
def get_user_service() -> UserService:
"""
Dependency to get user service instance.
This should be properly implemented with database connection management
in your actual application.
"""
database = get_database()
user_repository = UserRepository(database)
return UserService(user_repository)
# Your API routes would use the service like this:
@app.post("/api/users")
async def create_user(
user_data: UserCreate,
user_service: UserService = Depends(get_user_service)
):
return user_service.create_user(user_data)
@app.get("/health")
async def health_check():
"""
@@ -68,6 +145,12 @@ async def health_check():
health_status["dependencies"]["redis"] = "disconnected"
health_status["status"] = "degraded"
# check MongoDB connection
if test_database_connection():
health_status["dependencies"]["mongodb"] = "connected"
else:
health_status["dependencies"]["mongodb"] = "disconnected"
return health_status
@@ -117,4 +200,4 @@ async def root():
"service": "MyDocManager File Processor",
"version": "1.0.0",
"status": "running"
}
}

View File

@@ -0,0 +1,142 @@
"""
Pydantic models for document processing collections.
This module defines the data models for file documents and processing jobs
stored in MongoDB collections.
"""
from datetime import datetime
from enum import Enum
from typing import Any, Dict, Optional
from bson import ObjectId
from pydantic import BaseModel, Field, field_validator
from app.models.types import PyObjectId
class FileType(str, Enum):
"""Supported file types for document processing."""
TXT = "txt"
PDF = "pdf"
DOCX = "docx"
JPG = "jpg"
PNG = "png"
class ExtractionMethod(str, Enum):
"""Methods used to extract content from documents."""
DIRECT_TEXT = "direct_text"
OCR = "ocr"
HYBRID = "hybrid"
class ProcessingStatus(str, Enum):
"""Status values for processing jobs."""
PENDING = "pending"
PROCESSING = "processing"
COMPLETED = "completed"
FAILED = "failed"
class FileDocument(BaseModel):
"""
Model for file documents stored in the 'files' collection.
Represents a file detected in the watched directory with its
metadata and extracted content.
"""
id: Optional[PyObjectId] = Field(default=None, alias="_id")
filename: str = Field(..., description="Original filename")
filepath: str = Field(..., description="Full path to the file")
file_type: FileType = Field(..., description="Type of the file")
extraction_method: Optional[ExtractionMethod] = Field(default=None, description="Method used to extract content")
metadata: Dict[str, Any] = Field(default_factory=dict, description="File-specific metadata")
detected_at: Optional[datetime] = Field(default=None, description="Timestamp when file was detected")
file_hash: Optional[str] = Field(default=None, description="SHA256 hash of file content")
@field_validator('filepath')
@classmethod
def validate_filepath(cls, v: str) -> str:
"""Validate filepath format."""
if not v.strip():
raise ValueError("Filepath cannot be empty")
return v.strip()
@field_validator('filename')
@classmethod
def validate_filename(cls, v: str) -> str:
"""Validate filename format."""
if not v.strip():
raise ValueError("Filename cannot be empty")
return v.strip()
class Config:
"""Pydantic configuration."""
populate_by_name = True
arbitrary_types_allowed = True
json_encoders = {ObjectId: str}
class DocumentContent(BaseModel):
"""Model for document content."""
id: Optional[PyObjectId] = Field(default=None, alias="_id")
file_hash: Optional[str] = Field(..., description="SHA256 hash of file content")
content: str = Field(..., description="File content")
encoding: str = Field(default="utf-8", description="Character encoding for text files")
file_size: int = Field(..., ge=0, description="File size in bytes")
mime_type: str = Field(..., description="MIME type detected")
class ProcessingJob(BaseModel):
"""
Model for processing jobs stored in the 'processing_jobs' collection.
Tracks the lifecycle and status of document processing tasks.
"""
id: Optional[PyObjectId] = Field(default=None, alias="_id")
file_id: PyObjectId = Field(..., description="Reference to file document")
status: ProcessingStatus = Field(
default=ProcessingStatus.PENDING,
description="Current processing status"
)
task_id: Optional[str] = Field(
default=None,
description="Celery task UUID"
)
created_at: Optional[datetime] = Field(
default=None,
description="Timestamp when job was created"
)
started_at: Optional[datetime] = Field(
default=None,
description="Timestamp when processing started"
)
completed_at: Optional[datetime] = Field(
default=None,
description="Timestamp when processing completed"
)
error_message: Optional[str] = Field(
default=None,
description="Error message if processing failed"
)
@field_validator('error_message')
@classmethod
def validate_error_message(cls, v: Optional[str]) -> Optional[str]:
"""Clean up error message."""
if v is not None:
return v.strip() if v.strip() else None
return v
class Config:
"""Pydantic configuration."""
populate_by_name = True
arbitrary_types_allowed = True
json_encoders = {ObjectId: str}

View File

@@ -0,0 +1,32 @@
from typing import Any
from bson import ObjectId
from pydantic_core import core_schema
class PyObjectId(ObjectId):
"""Custom ObjectId type for Pydantic v2 compatibility."""
@classmethod
def __get_pydantic_core_schema__(
cls, source_type: Any, handler
) -> core_schema.CoreSchema:
return core_schema.json_or_python_schema(
json_schema=core_schema.str_schema(),
python_schema=core_schema.union_schema([
core_schema.is_instance_schema(ObjectId),
core_schema.chain_schema([
core_schema.str_schema(),
core_schema.no_info_plain_validator_function(cls.validate),
])
]),
serialization=core_schema.plain_serializer_function_ser_schema(
lambda x: str(x)
),
)
@classmethod
def validate(cls, v):
if not ObjectId.is_valid(v):
raise ValueError("Invalid ObjectId")
return ObjectId(v)

View File

@@ -13,167 +13,146 @@ from pydantic import BaseModel, Field, field_validator, EmailStr
from pydantic_core import core_schema
from app.models.auth import UserRole
class PyObjectId(ObjectId):
"""Custom ObjectId type for Pydantic v2 compatibility."""
@classmethod
def __get_pydantic_core_schema__(
cls, source_type: Any, handler
) -> core_schema.CoreSchema:
return core_schema.json_or_python_schema(
json_schema=core_schema.str_schema(),
python_schema=core_schema.union_schema([
core_schema.is_instance_schema(ObjectId),
core_schema.chain_schema([
core_schema.str_schema(),
core_schema.no_info_plain_validator_function(cls.validate),
])
]),
serialization=core_schema.plain_serializer_function_ser_schema(
lambda x: str(x)
),
)
@classmethod
def validate(cls, v):
if not ObjectId.is_valid(v):
raise ValueError("Invalid ObjectId")
return ObjectId(v)
from app.models.types import PyObjectId
def validate_password_strength(password: str) -> str:
"""
Validate password meets security requirements.
Requirements:
- At least 8 characters long
- Contains at least one uppercase letter
- Contains at least one lowercase letter
- Contains at least one digit
- Contains at least one special character
Args:
password: The password string to validate
Returns:
str: The validated password
Raises:
ValueError: If password doesn't meet requirements
"""
if len(password) < 8:
raise ValueError("Password must be at least 8 characters long")
if not re.search(r'[A-Z]', password):
raise ValueError("Password must contain at least one uppercase letter")
if not re.search(r'[a-z]', password):
raise ValueError("Password must contain at least one lowercase letter")
if not re.search(r'\d', password):
raise ValueError("Password must contain at least one digit")
if not re.search(r'[!@#$%^&*()_+\-=\[\]{};:"\\|,.<>\/?]', password):
raise ValueError("Password must contain at least one special character")
return password
"""
Validate password meets security requirements.
Requirements:
- At least 8 characters long
- Contains at least one uppercase letter
- Contains at least one lowercase letter
- Contains at least one digit
- Contains at least one special character
Args:
password: The password string to validate
Returns:
str: The validated password
Raises:
ValueError: If password doesn't meet requirements
"""
if len(password) < 8:
raise ValueError("Password must be at least 8 characters long")
if not re.search(r'[A-Z]', password):
raise ValueError("Password must contain at least one uppercase letter")
if not re.search(r'[a-z]', password):
raise ValueError("Password must contain at least one lowercase letter")
if not re.search(r'\d', password):
raise ValueError("Password must contain at least one digit")
if not re.search(r'[!@#$%^&*()_+\-=\[\]{};:"\\|,.<>\/?]', password):
raise ValueError("Password must contain at least one special character")
return password
def validate_username_not_empty(username: str) -> str:
"""
Validate username is not empty or whitespace only.
Args:
username: The username string to validate
Returns:
str: The validated username
Raises:
ValueError: If username is empty or whitespace only
"""
if not username or not username.strip():
raise ValueError("Username cannot be empty or whitespace only")
return username.strip()
"""
Validate username is not empty or whitespace only.
Args:
username: The username string to validate
Returns:
str: The validated username
Raises:
ValueError: If username is empty or whitespace only
"""
if not username or not username.strip():
raise ValueError("Username cannot be empty or whitespace only")
return username.strip()
class UserCreate(BaseModel):
"""Model for creating a new user."""
username: str
email: EmailStr
password: str
role: UserRole = UserRole.USER
@field_validator('username')
@classmethod
def validate_username(cls, v):
return validate_username_not_empty(v)
@field_validator('password')
@classmethod
def validate_password(cls, v):
return validate_password_strength(v)
class UserCreateNoValidation(BaseModel):
"""Model for creating a new user."""
username: str
email: str
password: str
role: UserRole = UserRole.USER
class UserCreate(UserCreateNoValidation):
"""Model for creating a new user."""
email: EmailStr
@field_validator('username')
@classmethod
def validate_username(cls, v):
return validate_username_not_empty(v)
@field_validator('password')
@classmethod
def validate_password(cls, v):
return validate_password_strength(v)
class UserUpdate(BaseModel):
"""Model for updating an existing user."""
username: Optional[str] = None
email: Optional[EmailStr] = None
password: Optional[str] = None
role: Optional[UserRole] = None
@field_validator('username')
@classmethod
def validate_username(cls, v):
if v is not None:
return validate_username_not_empty(v)
return v
@field_validator('password')
@classmethod
def validate_password(cls, v):
if v is not None:
return validate_password_strength(v)
return v
"""Model for updating an existing user."""
username: Optional[str] = None
email: Optional[EmailStr] = None
password: Optional[str] = None
role: Optional[UserRole] = None
is_active: Optional[bool] = None
@field_validator('username')
@classmethod
def validate_username(cls, v):
if v is not None:
return validate_username_not_empty(v)
return v
@field_validator('password')
@classmethod
def validate_password(cls, v):
if v is not None:
return validate_password_strength(v)
return v
class UserInDB(BaseModel):
"""Model for user data stored in database."""
id: PyObjectId = Field(default_factory=PyObjectId, alias="_id")
username: str
email: str
password_hash: str
role: UserRole
is_active: bool = True
created_at: datetime
updated_at: datetime
model_config = {
"populate_by_name": True,
"arbitrary_types_allowed": True,
"json_encoders": {ObjectId: str}
}
"""Model for user data stored in database."""
id: PyObjectId = Field(default_factory=PyObjectId, alias="_id")
username: str
email: str
hashed_password: str
role: UserRole
is_active: bool = True
created_at: datetime
updated_at: datetime
model_config = {
"populate_by_name": True,
"arbitrary_types_allowed": True,
"json_encoders": {ObjectId: str}
}
class UserResponse(BaseModel):
"""Model for user data in API responses (excludes password_hash)."""
id: PyObjectId = Field(alias="_id")
username: str
email: str
role: UserRole
is_active: bool
created_at: datetime
updated_at: datetime
model_config = {
"populate_by_name": True,
"arbitrary_types_allowed": True,
"json_encoders": {ObjectId: str}
}
"""Model for user data in API responses (excludes password_hash)."""
id: PyObjectId = Field(alias="_id")
username: str
email: str
role: UserRole
is_active: bool
created_at: datetime
updated_at: datetime
model_config = {
"populate_by_name": True,
"arbitrary_types_allowed": True,
"json_encoders": {ObjectId: str}
}

View File

@@ -0,0 +1,58 @@
"""
Authentication service for password hashing and verification.
This module provides authentication-related functionality including
password hashing, verification, and JWT token management.
"""
from app.utils.security import hash_password, verify_password
class AuthService:
"""
Service class for authentication operations.
Handles password hashing, verification, and other authentication
related operations with proper security practices.
"""
@staticmethod
def hash_user_password(password: str) -> str:
"""
Hash a plaintext password for secure storage.
Args:
password (str): Plaintext password to hash
Returns:
str: Hashed password safe for database storage
Example:
>>> auth = AuthService()
>>> hashed = auth.hash_user_password("mypassword123")
>>> len(hashed) > 0
True
"""
return hash_password(password)
@staticmethod
def verify_user_password(password: str, hashed_password: str) -> bool:
"""
Verify a password against its hash.
Args:
password (str): Plaintext password to verify
hashed_password (str): Stored hashed password
Returns:
bool: True if password matches hash, False otherwise
Example:
>>> auth = AuthService()
>>> hashed = auth.hash_user_password("mypassword123")
>>> auth.verify_user_password("mypassword123", hashed)
True
>>> auth.verify_user_password("wrongpassword", hashed)
False
"""
return verify_password(password, hashed_password)

View File

@@ -0,0 +1,134 @@
"""
Initialization service for application startup tasks.
This module handles application initialization tasks including
creating default admin user if none exists.
"""
import logging
from typing import Optional
from app.models.user import UserCreate, UserInDB, UserCreateNoValidation
from app.models.auth import UserRole
from app.services.user_service import UserService
logger = logging.getLogger(__name__)
class InitializationService:
"""
Service for handling application initialization tasks.
This service manages startup operations like ensuring required
users exist and system is properly configured.
"""
def __init__(self, user_service: UserService):
"""
Initialize service with user service dependency.
Args:
user_service (UserService): Service for user operations
"""
self.user_service = user_service
def ensure_admin_user_exists(self) -> Optional[UserInDB]:
"""
Ensure default admin user exists in the system.
Creates a default admin user if no admin user exists in the system.
Uses default credentials that should be changed after first login.
Returns:
UserInDB or None: Created admin user if created, None if already exists
Raises:
Exception: If admin user creation fails
"""
logger.info("Checking if admin user exists...")
# Check if any admin user already exists
if self._admin_user_exists():
logger.info("Admin user already exists, skipping creation")
return None
logger.info("No admin user found, creating default admin user...")
try:
# Create default admin user
admin_data = UserCreateNoValidation(
username="admin",
email="admin@mydocmanager.local",
password="admin", # Should be changed after first login
role=UserRole.ADMIN
)
created_user = self.user_service.create_user(admin_data)
logger.info(f"Default admin user created successfully with ID: {created_user.id}")
logger.warning(
"Default admin user created with username 'admin' and password 'admin'. "
"Please change these credentials immediately for security!"
)
return created_user
except Exception as e:
logger.error(f"Failed to create default admin user: {str(e)}")
raise Exception(f"Admin user creation failed: {str(e)}")
def _admin_user_exists(self) -> bool:
"""
Check if any admin user exists in the system.
Returns:
bool: True if at least one admin user exists, False otherwise
"""
try:
# Get all users and check if any have admin role
users = self.user_service.list_users(limit=1000) # Reasonable limit for admin check
for user in users:
if user.role == UserRole.ADMIN and user.is_active:
return True
return False
except Exception as e:
logger.error(f"Error checking for admin users: {str(e)}")
# In case of error, assume admin exists to avoid creating duplicates
return True
def initialize_application(self) -> dict:
"""
Perform all application initialization tasks.
This method runs all necessary initialization procedures including
admin user creation and any other startup requirements.
Returns:
dict: Summary of initialization tasks performed
"""
logger.info("Starting application initialization...")
initialization_summary = {
"admin_user_created": False,
"initialization_success": False,
"errors": []
}
try:
# Ensure admin user exists
created_admin = self.ensure_admin_user_exists()
if created_admin:
initialization_summary["admin_user_created"] = True
initialization_summary["initialization_success"] = True
logger.info("Application initialization completed successfully")
except Exception as e:
error_msg = f"Application initialization failed: {str(e)}"
logger.error(error_msg)
initialization_summary["errors"].append(error_msg)
return initialization_summary

View File

@@ -0,0 +1,181 @@
"""
User service for business logic operations.
This module provides user-related business logic including user creation,
retrieval, updates, and authentication operations with proper error handling.
"""
from typing import Optional, List
from pymongo.errors import DuplicateKeyError
from app.models.user import UserCreate, UserInDB, UserUpdate, UserResponse, UserCreateNoValidation
from app.models.auth import UserRole
from app.database.repositories.user_repository import UserRepository
from app.services.auth_service import AuthService
class UserService:
"""
Service class for user business logic operations.
This class handles user-related operations including creation,
authentication, and data management with proper validation.
"""
def __init__(self, user_repository: UserRepository):
"""
Initialize user service with repository dependency.
Args:
user_repository (UserRepository): Repository for user data operations
"""
self.user_repository = user_repository
self.auth_service = AuthService()
def create_user(self, user_data: UserCreate | UserCreateNoValidation) -> UserInDB:
"""
Create a new user with business logic validation.
Args:
user_data (UserCreate): User creation data
Returns:
UserInDB: Created user with database information
Raises:
ValueError: If user already exists or validation fails
"""
# Check if user already exists
if self.user_repository.user_exists(user_data.username):
raise ValueError(f"User with username '{user_data.username}' already exists")
# Check if email already exists
existing_user = self.user_repository.find_user_by_email(user_data.email)
if existing_user:
raise ValueError(f"User with email '{user_data.email}' already exists")
try:
return self.user_repository.create_user(user_data)
except DuplicateKeyError:
raise ValueError(f"User with username '{user_data.username}' already exists")
def get_user_by_username(self, username: str) -> Optional[UserInDB]:
"""
Retrieve user by username.
Args:
username (str): Username to search for
Returns:
UserInDB or None: User if found, None otherwise
"""
return self.user_repository.find_user_by_username(username)
def get_user_by_id(self, user_id: str) -> Optional[UserInDB]:
"""
Retrieve user by ID.
Args:
user_id (str): User ID to search for
Returns:
UserInDB or None: User if found, None otherwise
"""
return self.user_repository.find_user_by_id(user_id)
def authenticate_user(self, username: str, password: str) -> Optional[UserInDB]:
"""
Authenticate user with username and password.
Args:
username (str): Username for authentication
password (str): Password for authentication
Returns:
UserInDB or None: Authenticated user if valid, None otherwise
"""
user = self.user_repository.find_user_by_username(username)
if not user:
return None
if not user.is_active:
return None
if not self.auth_service.verify_user_password(password, user.hashed_password):
return None
return user
def update_user(self, user_id: str, user_update: UserUpdate) -> Optional[UserInDB]:
"""
Update user information.
Args:
user_id (str): User ID to update
user_update (UserUpdate): Updated user data
Returns:
UserInDB or None: Updated user if successful, None otherwise
Raises:
ValueError: If username or email already exists for different user
"""
# Validate username uniqueness if being updated
if user_update.username is not None:
existing_user = self.user_repository.find_user_by_username(user_update.username)
if existing_user and str(existing_user.id) != user_id:
raise ValueError(f"Username '{user_update.username}' is already taken")
# Validate email uniqueness if being updated
if user_update.email is not None:
existing_user = self.user_repository.find_user_by_email(user_update.email)
if existing_user and str(existing_user.id) != user_id:
raise ValueError(f"Email '{user_update.email}' is already taken")
return self.user_repository.update_user(user_id, user_update)
def delete_user(self, user_id: str) -> bool:
"""
Delete user from system.
Args:
user_id (str): User ID to delete
Returns:
bool: True if user was deleted, False otherwise
"""
return self.user_repository.delete_user(user_id)
def list_users(self, skip: int = 0, limit: int = 100) -> List[UserInDB]:
"""
List users with pagination.
Args:
skip (int): Number of users to skip (default: 0)
limit (int): Maximum number of users to return (default: 100)
Returns:
List[UserInDB]: List of users
"""
return self.user_repository.list_users(skip=skip, limit=limit)
def count_users(self) -> int:
"""
Count total number of users.
Returns:
int: Total number of users in system
"""
return self.user_repository.count_users()
def user_exists(self, username: str) -> bool:
"""
Check if user exists by username.
Args:
username (str): Username to check
Returns:
bool: True if user exists, False otherwise
"""
return self.user_repository.user_exists(username)

View File

@@ -1,6 +1,10 @@
fastapi==0.116.1
uvicorn==0.35.0
bcrypt==4.3.0
celery==5.5.3
redis==6.4.0
email-validator==2.3.0
fastapi==0.116.1
httptools==0.6.4
motor==3.7.1
pymongo==4.15.0
pydantic==2.11.9
pydantic==2.11.9
redis==6.4.0
uvicorn==0.35.0

View File

@@ -22,7 +22,9 @@ def test_i_can_get_database_connection():
"""Test successful database connection creation."""
mock_client = Mock()
mock_database = Mock()
mock_client.__getitem__.return_value = mock_database
# Configure the mock to support dictionary-like access
mock_client.__getitem__ = Mock(return_value=mock_database)
with patch('app.database.connection.MongoClient', return_value=mock_client):
with patch('app.database.connection.get_mongodb_url', return_value="mongodb://localhost:27017"):
@@ -36,6 +38,8 @@ def test_i_can_get_database_connection():
assert result == mock_database
mock_client.admin.command.assert_called_with('ping')
# Verify that __getitem__ was called with the database name
mock_client.__getitem__.assert_called_with("testdb")
def test_i_cannot_connect_to_invalid_mongodb_url():
@@ -78,7 +82,7 @@ def test_i_can_get_database_singleton():
"""Test that get_database returns the same instance (singleton pattern)."""
mock_client = Mock()
mock_database = Mock()
mock_client.__getitem__.return_value = mock_database
mock_client.__getitem__ = Mock(return_value=mock_database)
with patch('app.database.connection.MongoClient', return_value=mock_client):
with patch('app.database.connection.get_mongodb_url', return_value="mongodb://localhost:27017"):
@@ -102,7 +106,7 @@ def test_i_can_close_database_connection():
"""Test closing database connection."""
mock_client = Mock()
mock_database = Mock()
mock_client.__getitem__.return_value = mock_database
mock_client.__getitem__ = Mock(return_value=mock_database)
with patch('app.database.connection.MongoClient', return_value=mock_client):
with patch('app.database.connection.get_mongodb_url', return_value="mongodb://localhost:27017"):
@@ -127,7 +131,7 @@ def test_i_can_get_mongodb_client():
"""Test getting raw MongoDB client instance."""
mock_client = Mock()
mock_database = Mock()
mock_client.__getitem__.return_value = mock_database
mock_client.__getitem__ = Mock(return_value=mock_database)
with patch('app.database.connection.MongoClient', return_value=mock_client):
with patch('app.database.connection.get_mongodb_url', return_value="mongodb://localhost:27017"):
@@ -169,17 +173,6 @@ def test_i_can_test_database_connection_success():
mock_database.command.assert_called_with('ping')
def test_i_cannot_test_database_connection_failure():
"""Test database connection health check - failure case."""
mock_database = Mock()
mock_database.command.side_effect = Exception("Connection error")
with patch('app.database.connection.get_database', return_value=mock_database):
result = test_database_connection()
assert result is False
def test_i_can_close_connection_when_no_client():
"""Test closing connection when no client exists (should not raise error)."""
# Reset global variables

View File

@@ -0,0 +1,602 @@
"""
Test suite for FileDocumentRepository with async/await support.
This module contains comprehensive tests for all FileDocumentRepository methods
using mongomock-motor for in-memory MongoDB testing.
"""
import pytest
from datetime import datetime
from typing import Dict, Any
import pytest_asyncio
from bson import ObjectId
from pymongo.errors import DuplicateKeyError, PyMongoError
from mongomock_motor import AsyncMongoMockClient
from app.database.repositories.document_repository import FileDocumentRepository
from app.models.document import FileDocument, FileType
@pytest_asyncio.fixture
async def in_memory_repository():
"""Create an in-memory FileDocumentRepository for testing."""
client = AsyncMongoMockClient()
db = client.test_database
repo = FileDocumentRepository()
repo.db = db
repo.collection = db.files
return repo
@pytest.fixture
def sample_file_document():
"""Sample FileDocument data for testing."""
return FileDocument(
filename="test_document.pdf",
filepath="/path/to/test_document.pdf",
file_hash="a1b2c3d4e5f6789012345678901234567890abcdef1234567890abcdef123456",
file_type=FileType("pdf"),
detected_at=datetime.now(),
)
@pytest.fixture
def sample_update_data():
"""Sample update data for testing."""
return {
"metadata": {"tags": ["updated", "document"]},
"file_type": FileType("txt"),
}
@pytest.fixture
def multiple_sample_documents():
"""Multiple FileDocument objects for list/search testing."""
base_time = datetime.now()
return [
FileDocument(
filename="document1.pdf",
filepath="/path/to/document1.pdf",
file_hash="hash1" + "0" * 58,
file_type=FileType("pdf"),
detected_at=base_time,
),
FileDocument(
filename="similar_document.pdf",
filepath="/path/to/similar_document.pdf",
file_hash="hash2" + "0" * 58,
file_type=FileType("pdf"),
detected_at=base_time,
),
FileDocument(
filename="completely_different.txt",
filepath="/path/to/completely_different.txt",
file_hash="hash3" + "0" * 58,
file_type=FileType("pdf"),
detected_at=base_time,
)
]
class TestFileDocumentRepositoryInitialization:
"""Tests for repository initialization."""
@pytest.mark.asyncio
async def test_i_can_initialize_repository(self):
"""Test repository initialization."""
# Arrange
repo = FileDocumentRepository()
# Act & Assert (should not raise any exception)
assert repo.db is not None
assert repo.collection is not None
class TestFileDocumentRepositoryCreation:
"""Tests for file document creation functionality."""
@pytest.mark.asyncio
async def test_i_can_create_document(self, in_memory_repository, sample_file_document):
"""Test successful file document creation."""
# Act
created_doc = await in_memory_repository.create_document(sample_file_document)
# Assert
assert created_doc is not None
assert created_doc.filename == sample_file_document.filename
assert created_doc.filepath == sample_file_document.filepath
assert created_doc.file_hash == sample_file_document.file_hash
assert created_doc.file_type == sample_file_document.file_type
assert created_doc.id is not None
assert isinstance(created_doc.id, ObjectId)
@pytest.mark.asyncio
async def test_i_can_create_document_without_id(self, in_memory_repository, sample_file_document):
"""Test creating document with _id set to None (should be removed)."""
# Arrange
sample_file_document.id = None
# Act
created_doc = await in_memory_repository.create_document(sample_file_document)
# Assert
assert created_doc is not None
assert created_doc.id is not None
assert isinstance(created_doc.id, ObjectId)
@pytest.mark.asyncio
async def test_i_cannot_create_duplicate_document(self, in_memory_repository, sample_file_document):
"""Test that creating document with duplicate hash raises DuplicateKeyError."""
# Arrange
await in_memory_repository.create_document(sample_file_document)
duplicate_doc = FileDocument(
filename="different_name.pdf",
filepath=sample_file_document.filepath,
file_hash="different_hash" + "0" * 58,
file_type=FileType("pdf"),
detected_at=datetime.now()
)
# Act & Assert
with pytest.raises(DuplicateKeyError) as exc_info:
await in_memory_repository.create_document(duplicate_doc)
assert "already exists" in str(exc_info.value)
@pytest.mark.asyncio
async def test_i_cannot_create_document_with_pymongo_error(self, in_memory_repository, sample_file_document, mocker):
"""Test handling of PyMongo errors during document creation."""
# Arrange
mocker.patch.object(in_memory_repository.collection, 'insert_one', side_effect=PyMongoError("Database error"))
# Act & Assert
with pytest.raises(ValueError) as exc_info:
await in_memory_repository.create_document(sample_file_document)
assert "Failed to create file document" in str(exc_info.value)
class TestFileDocumentRepositoryFinding:
"""Tests for file document finding functionality."""
@pytest.mark.asyncio
async def test_i_can_find_document_by_valid_id(self, in_memory_repository, sample_file_document):
"""Test finding document by valid ObjectId."""
# Arrange
created_doc = await in_memory_repository.create_document(sample_file_document)
# Act
found_doc = await in_memory_repository.find_document_by_id(str(created_doc.id))
# Assert
assert found_doc is not None
assert found_doc.id == created_doc.id
assert found_doc.filename == created_doc.filename
assert found_doc.file_hash == created_doc.file_hash
@pytest.mark.asyncio
async def test_i_cannot_find_document_with_invalid_id(self, in_memory_repository):
"""Test that invalid ObjectId returns None."""
# Act
found_doc = await in_memory_repository.find_document_by_id("invalid_id")
# Assert
assert found_doc is None
@pytest.mark.asyncio
async def test_i_cannot_find_document_by_nonexistent_id(self, in_memory_repository):
"""Test that nonexistent but valid ObjectId returns None."""
# Arrange
nonexistent_id = str(ObjectId())
# Act
found_doc = await in_memory_repository.find_document_by_id(nonexistent_id)
# Assert
assert found_doc is None
@pytest.mark.asyncio
async def test_i_can_find_document_by_hash(self, in_memory_repository, sample_file_document):
"""Test finding document by file hash."""
# Arrange
created_doc = await in_memory_repository.create_document(sample_file_document)
# Act
found_doc = await in_memory_repository.find_document_by_hash(sample_file_document.file_hash)
# Assert
assert found_doc is not None
assert found_doc.file_hash == created_doc.file_hash
assert found_doc.id == created_doc.id
@pytest.mark.asyncio
async def test_i_cannot_find_document_with_nonexistent_hash(self, in_memory_repository):
"""Test that nonexistent hash returns None."""
# Act
found_doc = await in_memory_repository.find_document_by_hash("nonexistent_hash")
# Assert
assert found_doc is None
@pytest.mark.asyncio
async def test_i_can_find_document_by_filepath(self, in_memory_repository, sample_file_document):
"""Test finding document by exact filepath."""
# Arrange
created_doc = await in_memory_repository.create_document(sample_file_document)
# Act
found_doc = await in_memory_repository.find_document_by_filepath(sample_file_document.filepath)
# Assert
assert found_doc is not None
assert found_doc.filepath == created_doc.filepath
assert found_doc.id == created_doc.id
@pytest.mark.asyncio
async def test_i_cannot_find_document_with_nonexistent_filepath(self, in_memory_repository):
"""Test that nonexistent filepath returns None."""
# Act
found_doc = await in_memory_repository.find_document_by_filepath("/nonexistent/path.pdf")
# Assert
assert found_doc is None
class TestFileDocumentRepositoryFuzzySearch:
"""Tests for fuzzy search functionality by filename."""
@pytest.mark.asyncio
async def test_i_can_find_documents_by_exact_name(self, in_memory_repository, multiple_sample_documents):
"""Test finding documents with exact filename match."""
# Arrange
for doc in multiple_sample_documents:
await in_memory_repository.create_document(doc)
# Act
found_docs = await in_memory_repository.find_document_by_name("document1.pdf")
# Assert
assert len(found_docs) == 1
assert found_docs[0].filename == "document1.pdf"
@pytest.mark.asyncio
async def test_i_can_find_documents_by_fuzzy_name(self, in_memory_repository, multiple_sample_documents):
"""Test finding documents with fuzzy matching using default threshold."""
# Arrange
for doc in multiple_sample_documents:
await in_memory_repository.create_document(doc)
# Act
found_docs = await in_memory_repository.find_document_by_name("document")
# Assert
assert len(found_docs) >= 2 # Should find document1.pdf and similar_document.pdf
filenames = [doc.filename for doc in found_docs]
assert "document1.pdf" in filenames
assert "similar_document.pdf" in filenames
@pytest.mark.asyncio
async def test_i_can_find_documents_with_custom_threshold(self, in_memory_repository, multiple_sample_documents):
"""Test finding documents with custom similarity threshold."""
# Arrange
for doc in multiple_sample_documents:
await in_memory_repository.create_document(doc)
# Act - Very high threshold should only match exact or very similar names
found_docs = await in_memory_repository.find_document_by_name("document1.pdf", similarity_threshold=0.9)
# Assert
assert len(found_docs) == 1
assert found_docs[0].filename == "document1.pdf"
@pytest.mark.asyncio
async def test_i_can_find_documents_sorted_by_similarity(self, in_memory_repository, multiple_sample_documents):
"""Test that documents are sorted by similarity score (highest first)."""
# Arrange
for doc in multiple_sample_documents:
await in_memory_repository.create_document(doc)
# Act
found_docs = await in_memory_repository.find_document_by_name("document1", similarity_threshold=0.3)
# Assert
assert len(found_docs) >= 1
# First result should be the most similar (document1.pdf)
assert found_docs[0].filename == "document1.pdf"
@pytest.mark.asyncio
async def test_i_cannot_find_documents_below_threshold(self, in_memory_repository, multiple_sample_documents):
"""Test that no documents are returned when similarity is below threshold."""
# Arrange
for doc in multiple_sample_documents:
await in_memory_repository.create_document(doc)
# Act
found_docs = await in_memory_repository.find_document_by_name("xyz", similarity_threshold=0.6)
# Assert
assert len(found_docs) == 0
@pytest.mark.asyncio
async def test_i_cannot_find_documents_by_name_with_pymongo_error(self, in_memory_repository, mocker):
"""Test handling of PyMongo errors during name search."""
# Arrange
mocker.patch.object(in_memory_repository.collection, 'find', side_effect=PyMongoError("Database error"))
# Act
found_docs = await in_memory_repository.find_document_by_name("test")
# Assert
assert found_docs == []
class TestFileDocumentRepositoryListing:
"""Tests for document listing functionality."""
@pytest.mark.asyncio
async def test_i_can_list_documents_with_default_pagination(self, in_memory_repository, multiple_sample_documents):
"""Test listing documents with default pagination."""
# Arrange
for doc in multiple_sample_documents:
await in_memory_repository.create_document(doc)
# Act
docs = await in_memory_repository.list_documents()
# Assert
assert len(docs) == len(multiple_sample_documents)
assert all(isinstance(doc, FileDocument) for doc in docs)
@pytest.mark.asyncio
async def test_i_can_list_documents_with_custom_pagination(self, in_memory_repository, multiple_sample_documents):
"""Test listing documents with custom pagination."""
# Arrange
for doc in multiple_sample_documents:
await in_memory_repository.create_document(doc)
# Act
docs_page1 = await in_memory_repository.list_documents(skip=0, limit=2)
docs_page2 = await in_memory_repository.list_documents(skip=2, limit=2)
# Assert
assert len(docs_page1) == 2
assert len(docs_page2) == 1 # Only 3 total documents
# Ensure no overlap between pages
page1_ids = [doc.id for doc in docs_page1]
page2_ids = [doc.id for doc in docs_page2]
assert len(set(page1_ids).intersection(set(page2_ids))) == 0
@pytest.mark.asyncio
async def test_i_can_list_documents_sorted_by_date(self, in_memory_repository, sample_file_document):
"""Test that documents are sorted by detected_at in descending order."""
# Arrange
from datetime import timedelta
# Create documents with different timestamps
doc1 = sample_file_document.model_copy()
doc1.filename = "oldest.pdf"
doc1.file_hash = "hash1" + "0" * 58
doc1.detected_at = datetime.now() - timedelta(hours=2)
doc2 = sample_file_document.model_copy()
doc2.filename = "newest.pdf"
doc2.file_hash = "hash2" + "0" * 58
doc2.detected_at = datetime.now()
await in_memory_repository.create_document(doc1)
await in_memory_repository.create_document(doc2)
# Act
docs = await in_memory_repository.list_documents()
# Assert
assert len(docs) == 2
assert docs[0].filename == "newest.pdf" # Most recent first
assert docs[1].filename == "oldest.pdf"
@pytest.mark.asyncio
async def test_i_can_list_empty_documents(self, in_memory_repository):
"""Test listing documents from empty collection."""
# Act
docs = await in_memory_repository.list_documents()
# Assert
assert docs == []
@pytest.mark.asyncio
async def test_i_cannot_list_documents_with_pymongo_error(self, in_memory_repository, mocker):
"""Test handling of PyMongo errors during document listing."""
# Arrange
mocker.patch.object(in_memory_repository.collection, 'find', side_effect=PyMongoError("Database error"))
# Act
docs = await in_memory_repository.list_documents()
# Assert
assert docs == []
class TestFileDocumentRepositoryUpdate:
"""Tests for document update functionality."""
@pytest.mark.asyncio
async def test_i_can_update_document_successfully(self, in_memory_repository, sample_file_document,
sample_update_data):
"""Test successful document update."""
# Arrange
created_doc = await in_memory_repository.create_document(sample_file_document)
# Act
updated_doc = await in_memory_repository.update_document(str(created_doc.id), sample_update_data)
# Assert
assert updated_doc is not None
assert updated_doc.tags == sample_update_data["tags"]
assert updated_doc.file_type == sample_update_data["file_type"]
assert updated_doc.id == created_doc.id
assert updated_doc.filename == created_doc.filename # Unchanged fields remain
@pytest.mark.asyncio
async def test_i_can_update_document_with_partial_data(self, in_memory_repository, sample_file_document):
"""Test updating document with partial data."""
# Arrange
created_doc = await in_memory_repository.create_document(sample_file_document)
partial_update = {"tags": ["new_tag"]}
# Act
updated_doc = await in_memory_repository.update_document(str(created_doc.id), partial_update)
# Assert
assert updated_doc is not None
assert updated_doc.tags == ["new_tag"]
assert updated_doc.filename == created_doc.filename # Should remain unchanged
assert updated_doc.file_type == created_doc.file_type # Should remain unchanged
@pytest.mark.asyncio
async def test_i_can_update_document_filtering_none_values(self, in_memory_repository, sample_file_document):
"""Test that None values are filtered out from update data."""
# Arrange
created_doc = await in_memory_repository.create_document(sample_file_document)
update_with_none = {"tags": ["new_tag"], "file_type": None}
# Act
updated_doc = await in_memory_repository.update_document(str(created_doc.id), update_with_none)
# Assert
assert updated_doc is not None
assert updated_doc.tags == ["new_tag"]
assert updated_doc.file_type == created_doc.file_type # Should remain unchanged (None filtered out)
@pytest.mark.asyncio
async def test_i_can_update_document_with_empty_data(self, in_memory_repository, sample_file_document):
"""Test updating document with empty data returns current document."""
# Arrange
created_doc = await in_memory_repository.create_document(sample_file_document)
empty_update = {}
# Act
result = await in_memory_repository.update_document(str(created_doc.id), empty_update)
# Assert
assert result is not None
assert result.filename == created_doc.filename
assert result.file_hash == created_doc.file_hash
assert result.tags == created_doc.tags
@pytest.mark.asyncio
async def test_i_cannot_update_document_with_invalid_id(self, in_memory_repository, sample_update_data):
"""Test that updating with invalid ID returns None."""
# Act
result = await in_memory_repository.update_document("invalid_id", sample_update_data)
# Assert
assert result is None
@pytest.mark.asyncio
async def test_i_cannot_update_nonexistent_document(self, in_memory_repository, sample_update_data):
"""Test that updating nonexistent document returns None."""
# Arrange
nonexistent_id = str(ObjectId())
# Act
result = await in_memory_repository.update_document(nonexistent_id, sample_update_data)
# Assert
assert result is None
@pytest.mark.asyncio
async def test_i_cannot_update_document_with_pymongo_error(self, in_memory_repository, sample_file_document,
sample_update_data, mocker):
"""Test handling of PyMongo errors during document update."""
# Arrange
created_doc = await in_memory_repository.create_document(sample_file_document)
mocker.patch.object(in_memory_repository.collection, 'find_one_and_update',
side_effect=PyMongoError("Database error"))
# Act
result = await in_memory_repository.update_document(str(created_doc.id), sample_update_data)
# Assert
assert result is None
class TestFileDocumentRepositoryDeletion:
"""Tests for document deletion functionality."""
@pytest.mark.asyncio
async def test_i_can_delete_existing_document(self, in_memory_repository, sample_file_document):
"""Test successful document deletion."""
# Arrange
created_doc = await in_memory_repository.create_document(sample_file_document)
# Act
deletion_result = await in_memory_repository.delete_document(str(created_doc.id))
# Assert
assert deletion_result is True
# Verify document is actually deleted
found_doc = await in_memory_repository.find_document_by_id(str(created_doc.id))
assert found_doc is None
@pytest.mark.asyncio
async def test_i_cannot_delete_document_with_invalid_id(self, in_memory_repository):
"""Test that deleting with invalid ID returns False."""
# Act
result = await in_memory_repository.delete_document("invalid_id")
# Assert
assert result is False
@pytest.mark.asyncio
async def test_i_cannot_delete_nonexistent_document(self, in_memory_repository):
"""Test that deleting nonexistent document returns False."""
# Arrange
nonexistent_id = str(ObjectId())
# Act
result = await in_memory_repository.delete_document(nonexistent_id)
# Assert
assert result is False
@pytest.mark.asyncio
async def test_i_cannot_delete_document_with_pymongo_error(self, in_memory_repository, sample_file_document, mocker):
"""Test handling of PyMongo errors during document deletion."""
# Arrange
created_doc = await in_memory_repository.create_document(sample_file_document)
mocker.patch.object(in_memory_repository.collection, 'delete_one', side_effect=PyMongoError("Database error"))
# Act
result = await in_memory_repository.delete_document(str(created_doc.id))
# Assert
assert result is False
class TestFileDocumentRepositoryUtilities:
"""Tests for utility methods."""
@pytest.mark.asyncio
async def test_i_can_count_documents(self, in_memory_repository, sample_file_document):
"""Test counting documents."""
# Arrange
initial_count = await in_memory_repository.count_documents()
await in_memory_repository.create_document(sample_file_document)
# Act
final_count = await in_memory_repository.count_documents()
# Assert
assert final_count == initial_count + 1
@pytest.mark.asyncio
async def test_i_can_count_zero_documents(self, in_memory_repository):
"""Test counting documents in empty collection."""
# Act
count = await in_memory_repository.count_documents()
# Assert
assert count == 0

View File

@@ -262,14 +262,14 @@ class TestUserInDBModel:
def test_i_can_create_user_in_db_model(self):
"""Test creation of valid UserInDB model with all fields."""
user_id = ObjectId()
created_at = datetime.utcnow()
updated_at = datetime.utcnow()
created_at = datetime.now()
updated_at = datetime.now()
user_data = {
"id": user_id,
"username": "testuser",
"email": "test@example.com",
"password_hash": "$2b$12$hashedpassword",
"hashed_password": "$2b$12$hashedpassword",
"role": UserRole.USER,
"is_active": True,
"created_at": created_at,
@@ -281,28 +281,11 @@ class TestUserInDBModel:
assert user.id == user_id
assert user.username == "testuser"
assert user.email == "test@example.com"
assert user.password_hash == "$2b$12$hashedpassword"
assert user.hashed_password == "$2b$12$hashedpassword"
assert user.role == UserRole.USER
assert user.is_active is True
assert user.created_at == created_at
assert user.updated_at == updated_at
def test_i_can_create_inactive_user(self):
"""Test creation of inactive user."""
user_data = {
"id": ObjectId(),
"username": "testuser",
"email": "test@example.com",
"password_hash": "$2b$12$hashedpassword",
"role": UserRole.USER,
"is_active": False,
"created_at": datetime.utcnow(),
"updated_at": datetime.utcnow()
}
user = UserInDB(**user_data)
assert user.is_active is False
class TestUserResponseModel:
@@ -311,8 +294,8 @@ class TestUserResponseModel:
def test_i_can_create_user_response_model(self):
"""Test creation of valid UserResponse model without password."""
user_id = ObjectId()
created_at = datetime.utcnow()
updated_at = datetime.utcnow()
created_at = datetime.now()
updated_at = datetime.now()
user_data = {
"id": user_id,
@@ -350,14 +333,14 @@ class TestUserResponseModel:
def test_i_can_convert_user_in_db_to_response(self):
"""Test conversion from UserInDB to UserResponse model."""
user_id = ObjectId()
created_at = datetime.utcnow()
updated_at = datetime.utcnow()
created_at = datetime.now()
updated_at = datetime.now()
user_in_db = UserInDB(
id=user_id,
username="testuser",
email="test@example.com",
password_hash="$2b$12$hashedpassword",
hashed_password="$2b$12$hashedpassword",
role=UserRole.USER,
is_active=True,
created_at=created_at,

View File

@@ -1,385 +1,301 @@
"""
Unit tests for user repository module.
Test suite for UserRepository with async/await support.
Tests all CRUD operations for users with MongoDB mocking
to ensure proper database interactions without requiring
actual MongoDB instance during tests.
This module contains comprehensive tests for all UserRepository methods
using mongomock-motor for in-memory MongoDB testing.
"""
import pytest
from unittest.mock import Mock, MagicMock
from datetime import datetime
import pytest_asyncio
from bson import ObjectId
from pymongo.errors import DuplicateKeyError
from mongomock_motor import AsyncMongoMockClient
from app.database.repositories.user_repository import UserRepository
from app.models.user import UserCreate, UserUpdate, UserInDB, UserRole
from app.models.user import UserCreate, UserUpdate, UserInDB
@pytest.fixture
def mock_database():
"""Create mock database with users collection."""
db = Mock()
collection = Mock()
db.users = collection
return db
@pytest.fixture
def user_repository(mock_database):
"""Create UserRepository instance with mocked database."""
return UserRepository(mock_database)
@pytest_asyncio.fixture
async def in_memory_repository():
"""Create an in-memory UserRepository for testing."""
client = AsyncMongoMockClient()
db = client.test_database
repo = UserRepository(db)
#await repo.initialize()
return repo
@pytest.fixture
def sample_user_create():
"""Create sample UserCreate object for testing."""
"""Sample UserCreate data for testing."""
return UserCreate(
username="testuser",
email="test@example.com",
hashed_password="hashed_password_123",
role=UserRole.USER,
is_active=True
password="#TestPassword123",
role="user"
)
@pytest.fixture
def sample_user_update():
"""Create sample UserUpdate object for testing."""
"""Sample UserUpdate data for testing."""
return UserUpdate(
username="updateduser",
email="updated@example.com",
role=UserRole.ADMIN,
is_active=False
role="admin"
)
def test_i_can_create_user(user_repository, mock_database, sample_user_create):
"""Test successful user creation."""
# Mock successful insertion
mock_result = Mock()
mock_result.inserted_id = ObjectId()
mock_database.users.insert_one.return_value = mock_result
class TestUserRepositoryCreation:
"""Tests for user creation functionality."""
result = user_repository.create_user(sample_user_create)
@pytest.mark.asyncio
async def test_i_can_create_user(self, in_memory_repository, sample_user_create):
"""Test successful user creation."""
# Act
created_user = await in_memory_repository.create_user(sample_user_create)
# Assert
assert created_user is not None
assert created_user.username == sample_user_create.username
assert created_user.email == sample_user_create.email
assert created_user.role == sample_user_create.role
assert created_user.is_active is True
assert created_user.id is not None
assert created_user.created_at is not None
assert created_user.updated_at is not None
assert created_user.hashed_password != sample_user_create.password # Should be hashed
assert isinstance(result, UserInDB)
assert result.username == sample_user_create.username
assert result.email == sample_user_create.email
assert result.hashed_password == sample_user_create.hashed_password
assert result.role == sample_user_create.role
assert result.is_active == sample_user_create.is_active
assert result.id is not None
assert isinstance(result.created_at, datetime)
assert isinstance(result.updated_at, datetime)
# Verify insert_one was called with correct data
mock_database.users.insert_one.assert_called_once()
call_args = mock_database.users.insert_one.call_args[0][0]
assert call_args["username"] == sample_user_create.username
assert call_args["email"] == sample_user_create.email
@pytest.mark.asyncio
async def test_i_cannot_create_user_with_duplicate_username(self, in_memory_repository, sample_user_create):
"""Test that creating user with duplicate username raises DuplicateKeyError."""
# Arrange
await in_memory_repository.create_user(sample_user_create)
# Act & Assert
with pytest.raises(DuplicateKeyError) as exc_info:
await in_memory_repository.create_user(sample_user_create)
assert "already exists" in str(exc_info.value)
def test_i_cannot_create_duplicate_username(user_repository, mock_database, sample_user_create):
"""Test that creating user with duplicate username raises DuplicateKeyError."""
# Mock DuplicateKeyError from MongoDB
mock_database.users.insert_one.side_effect = DuplicateKeyError("duplicate key error")
class TestUserRepositoryFinding:
"""Tests for user finding functionality."""
with pytest.raises(DuplicateKeyError, match="User with username 'testuser' already exists"):
user_repository.create_user(sample_user_create)
@pytest.mark.asyncio
async def test_i_can_find_user_by_id(self, in_memory_repository, sample_user_create):
"""Test finding user by valid ID."""
# Arrange
created_user = await in_memory_repository.create_user(sample_user_create)
# Act
found_user = await in_memory_repository.find_user_by_id(str(created_user.id))
# Assert
assert found_user is not None
assert found_user.id == created_user.id
assert found_user.username == created_user.username
assert found_user.email == created_user.email
@pytest.mark.asyncio
async def test_i_cannot_find_user_by_invalid_id(self, in_memory_repository):
"""Test that invalid ObjectId returns None."""
# Act
found_user = await in_memory_repository.find_user_by_id("invalid_id")
# Assert
assert found_user is None
@pytest.mark.asyncio
async def test_i_cannot_find_user_by_nonexistent_id(self, in_memory_repository):
"""Test that nonexistent but valid ObjectId returns None."""
# Arrange
nonexistent_id = str(ObjectId())
# Act
found_user = await in_memory_repository.find_user_by_id(nonexistent_id)
# Assert
assert found_user is None
@pytest.mark.asyncio
async def test_i_can_find_user_by_username(self, in_memory_repository, sample_user_create):
"""Test finding user by username."""
# Arrange
created_user = await in_memory_repository.create_user(sample_user_create)
# Act
found_user = await in_memory_repository.find_user_by_username(sample_user_create.username)
# Assert
assert found_user is not None
assert found_user.username == created_user.username
assert found_user.id == created_user.id
@pytest.mark.asyncio
async def test_i_cannot_find_user_by_nonexistent_username(self, in_memory_repository):
"""Test that nonexistent username returns None."""
# Act
found_user = await in_memory_repository.find_user_by_username("nonexistent")
# Assert
assert found_user is None
@pytest.mark.asyncio
async def test_i_can_find_user_by_email(self, in_memory_repository, sample_user_create):
"""Test finding user by email."""
# Arrange
created_user = await in_memory_repository.create_user(sample_user_create)
# Act
found_user = await in_memory_repository.find_user_by_email(str(sample_user_create.email))
# Assert
assert found_user is not None
assert found_user.email == created_user.email
assert found_user.id == created_user.id
@pytest.mark.asyncio
async def test_i_cannot_find_user_by_nonexistent_email(self, in_memory_repository):
"""Test that nonexistent email returns None."""
# Act
found_user = await in_memory_repository.find_user_by_email("nonexistent@example.com")
# Assert
assert found_user is None
def test_i_can_find_user_by_username(user_repository, mock_database):
"""Test finding user by username."""
# Mock user document from database
user_doc = {
"_id": ObjectId(),
"username": "testuser",
"email": "test@example.com",
"hashed_password": "hashed_password_123",
"role": "user",
"is_active": True,
"created_at": datetime.utcnow(),
"updated_at": datetime.utcnow()
}
mock_database.users.find_one.return_value = user_doc
class TestUserRepositoryUpdate:
"""Tests for user update functionality."""
result = user_repository.find_user_by_username("testuser")
@pytest.mark.asyncio
async def test_i_can_update_user(self, in_memory_repository, sample_user_create, sample_user_update):
"""Test successful user update."""
# Arrange
created_user = await in_memory_repository.create_user(sample_user_create)
original_updated_at = created_user.updated_at
# Act
updated_user = await in_memory_repository.update_user(str(created_user.id), sample_user_update)
# Assert
assert updated_user is not None
assert updated_user.username == sample_user_update.username
assert updated_user.email == sample_user_update.email
assert updated_user.role == sample_user_update.role
assert updated_user.id == created_user.id
assert isinstance(result, UserInDB)
assert result.username == "testuser"
assert result.email == "test@example.com"
@pytest.mark.asyncio
async def test_i_cannot_update_user_with_invalid_id(self, in_memory_repository, sample_user_update):
"""Test that updating with invalid ID returns None."""
# Act
result = await in_memory_repository.update_user("invalid_id", sample_user_update)
# Assert
assert result is None
mock_database.users.find_one.assert_called_once_with({"username": "testuser"})
@pytest.mark.asyncio
async def test_i_can_update_user_with_partial_data(self, in_memory_repository, sample_user_create):
"""Test updating user with partial data."""
# Arrange
created_user = await in_memory_repository.create_user(sample_user_create)
partial_update = UserUpdate(username="newusername")
# Act
updated_user = await in_memory_repository.update_user(str(created_user.id), partial_update)
# Assert
assert updated_user is not None
assert updated_user.username == "newusername"
assert updated_user.email == created_user.email # Should remain unchanged
assert updated_user.role == created_user.role # Should remain unchanged
@pytest.mark.asyncio
async def test_i_can_update_user_with_empty_data(self, in_memory_repository, sample_user_create):
"""Test updating user with empty data returns current user."""
# Arrange
created_user = await in_memory_repository.create_user(sample_user_create)
empty_update = UserUpdate()
# Act
result = await in_memory_repository.update_user(str(created_user.id), empty_update)
# Assert
assert result is not None
assert result.username == created_user.username
assert result.email == created_user.email
def test_i_cannot_find_nonexistent_user_by_username(user_repository, mock_database):
"""Test finding nonexistent user by username returns None."""
mock_database.users.find_one.return_value = None
class TestUserRepositoryDeletion:
"""Tests for user deletion functionality."""
result = user_repository.find_user_by_username("nonexistent")
@pytest.mark.asyncio
async def test_i_can_delete_user(self, in_memory_repository, sample_user_create):
"""Test successful user deletion."""
# Arrange
created_user = await in_memory_repository.create_user(sample_user_create)
# Act
deletion_result = await in_memory_repository.delete_user(str(created_user.id))
# Assert
assert deletion_result is True
# Verify user is actually deleted
found_user = await in_memory_repository.find_user_by_id(str(created_user.id))
assert found_user is None
assert result is None
mock_database.users.find_one.assert_called_once_with({"username": "nonexistent"})
@pytest.mark.asyncio
async def test_i_cannot_delete_user_with_invalid_id(self, in_memory_repository):
"""Test that deleting with invalid ID returns False."""
# Act
result = await in_memory_repository.delete_user("invalid_id")
# Assert
assert result is False
@pytest.mark.asyncio
async def test_i_cannot_delete_nonexistent_user(self, in_memory_repository):
"""Test that deleting nonexistent user returns False."""
# Arrange
nonexistent_id = str(ObjectId())
# Act
result = await in_memory_repository.delete_user(nonexistent_id)
# Assert
assert result is False
def test_i_can_find_user_by_id(user_repository, mock_database):
"""Test finding user by ID."""
user_id = ObjectId()
user_doc = {
"_id": user_id,
"username": "testuser",
"email": "test@example.com",
"hashed_password": "hashed_password_123",
"role": "user",
"is_active": True,
"created_at": datetime.utcnow(),
"updated_at": datetime.utcnow()
}
mock_database.users.find_one.return_value = user_doc
class TestUserRepositoryUtilities:
"""Tests for utility methods."""
result = user_repository.find_user_by_id(str(user_id))
@pytest.mark.asyncio
async def test_i_can_count_users(self, in_memory_repository, sample_user_create):
"""Test counting users."""
# Arrange
initial_count = await in_memory_repository.count_users()
await in_memory_repository.create_user(sample_user_create)
# Act
final_count = await in_memory_repository.count_users()
# Assert
assert final_count == initial_count + 1
assert isinstance(result, UserInDB)
assert result.id == user_id
assert result.username == "testuser"
mock_database.users.find_one.assert_called_once_with({"_id": user_id})
@pytest.mark.asyncio
async def test_i_can_check_user_exists(self, in_memory_repository, sample_user_create):
"""Test checking if user exists."""
# Arrange
await in_memory_repository.create_user(sample_user_create)
# Act
exists = await in_memory_repository.user_exists(sample_user_create.username)
not_exists = await in_memory_repository.user_exists("nonexistent")
# Assert
assert exists is True
assert not_exists is False
def test_i_cannot_find_user_with_invalid_id(user_repository, mock_database):
"""Test finding user with invalid ObjectId returns None."""
result = user_repository.find_user_by_id("invalid_id")
assert result is None
# find_one should not be called with invalid ID
mock_database.users.find_one.assert_not_called()
def test_i_cannot_find_nonexistent_user_by_id(user_repository, mock_database):
"""Test finding nonexistent user by ID returns None."""
user_id = ObjectId()
mock_database.users.find_one.return_value = None
result = user_repository.find_user_by_id(str(user_id))
assert result is None
mock_database.users.find_one.assert_called_once_with({"_id": user_id})
def test_i_can_find_user_by_email(user_repository, mock_database):
"""Test finding user by email address."""
user_doc = {
"_id": ObjectId(),
"username": "testuser",
"email": "test@example.com",
"hashed_password": "hashed_password_123",
"role": "user",
"is_active": True,
"created_at": datetime.utcnow(),
"updated_at": datetime.utcnow()
}
mock_database.users.find_one.return_value = user_doc
result = user_repository.find_user_by_email("test@example.com")
assert isinstance(result, UserInDB)
assert result.email == "test@example.com"
mock_database.users.find_one.assert_called_once_with({"email": "test@example.com"})
def test_i_can_update_user(user_repository, mock_database, sample_user_update):
"""Test updating user information."""
user_id = ObjectId()
# Mock successful update
mock_update_result = Mock()
mock_update_result.matched_count = 1
mock_database.users.update_one.return_value = mock_update_result
# Mock find_one for returning updated user
updated_user_doc = {
"_id": user_id,
"username": "testuser",
"email": "updated@example.com",
"hashed_password": "hashed_password_123",
"role": "admin",
"is_active": False,
"created_at": datetime.utcnow(),
"updated_at": datetime.utcnow()
}
mock_database.users.find_one.return_value = updated_user_doc
result = user_repository.update_user(str(user_id), sample_user_update)
assert isinstance(result, UserInDB)
assert result.email == "updated@example.com"
assert result.role == UserRole.ADMIN
assert result.is_active is False
# Verify update_one was called with correct data
mock_database.users.update_one.assert_called_once()
call_args = mock_database.users.update_one.call_args
assert call_args[0][0] == {"_id": user_id} # Filter
update_data = call_args[0][1]["$set"] # Update data
assert update_data["email"] == "updated@example.com"
assert update_data["role"] == UserRole.ADMIN
assert update_data["is_active"] is False
assert "updated_at" in update_data
def test_i_cannot_update_nonexistent_user(user_repository, mock_database, sample_user_update):
"""Test updating nonexistent user returns None."""
user_id = ObjectId()
# Mock no match found
mock_update_result = Mock()
mock_update_result.matched_count = 0
mock_database.users.update_one.return_value = mock_update_result
result = user_repository.update_user(str(user_id), sample_user_update)
assert result is None
def test_i_cannot_update_user_with_invalid_id(user_repository, mock_database, sample_user_update):
"""Test updating user with invalid ID returns None."""
result = user_repository.update_user("invalid_id", sample_user_update)
assert result is None
# update_one should not be called with invalid ID
mock_database.users.update_one.assert_not_called()
def test_i_can_delete_user(user_repository, mock_database):
"""Test successful user deletion."""
user_id = ObjectId()
# Mock successful deletion
mock_delete_result = Mock()
mock_delete_result.deleted_count = 1
mock_database.users.delete_one.return_value = mock_delete_result
result = user_repository.delete_user(str(user_id))
assert result is True
mock_database.users.delete_one.assert_called_once_with({"_id": user_id})
def test_i_cannot_delete_nonexistent_user(user_repository, mock_database):
"""Test deleting nonexistent user returns False."""
user_id = ObjectId()
# Mock no deletion occurred
mock_delete_result = Mock()
mock_delete_result.deleted_count = 0
mock_database.users.delete_one.return_value = mock_delete_result
result = user_repository.delete_user(str(user_id))
assert result is False
def test_i_cannot_delete_user_with_invalid_id(user_repository, mock_database):
"""Test deleting user with invalid ID returns False."""
result = user_repository.delete_user("invalid_id")
assert result is False
# delete_one should not be called with invalid ID
mock_database.users.delete_one.assert_not_called()
def test_i_can_list_users(user_repository, mock_database):
"""Test listing users with pagination."""
# Mock cursor with user documents
user_docs = [
{
"_id": ObjectId(),
"username": "user1",
"email": "user1@example.com",
"hashed_password": "hash1",
"role": "user",
"is_active": True,
"created_at": datetime.utcnow(),
"updated_at": datetime.utcnow()
},
{
"_id": ObjectId(),
"username": "user2",
"email": "user2@example.com",
"hashed_password": "hash2",
"role": "admin",
"is_active": False,
"created_at": datetime.utcnow(),
"updated_at": datetime.utcnow()
}
]
mock_cursor = Mock()
mock_cursor.__iter__.return_value = iter(user_docs)
mock_cursor.skip.return_value = mock_cursor
mock_cursor.limit.return_value = mock_cursor
mock_database.users.find.return_value = mock_cursor
result = user_repository.list_users(skip=10, limit=50)
assert len(result) == 2
assert all(isinstance(user, UserInDB) for user in result)
assert result[0].username == "user1"
assert result[1].username == "user2"
mock_database.users.find.assert_called_once()
mock_cursor.skip.assert_called_once_with(10)
mock_cursor.limit.assert_called_once_with(50)
def test_i_can_count_users(user_repository, mock_database):
"""Test counting total users."""
mock_database.users.count_documents.return_value = 42
result = user_repository.count_users()
assert result == 42
mock_database.users.count_documents.assert_called_once_with({})
def test_i_can_check_user_exists(user_repository, mock_database):
"""Test checking if user exists by username."""
mock_database.users.count_documents.return_value = 1
result = user_repository.user_exists("testuser")
assert result is True
mock_database.users.count_documents.assert_called_once_with({"username": "testuser"})
def test_i_can_check_user_does_not_exist(user_repository, mock_database):
"""Test checking if user does not exist by username."""
mock_database.users.count_documents.return_value = 0
result = user_repository.user_exists("nonexistent")
assert result is False
mock_database.users.count_documents.assert_called_once_with({"username": "nonexistent"})
def test_i_can_create_indexes_on_initialization(mock_database):
"""Test that indexes are created when repository is initialized."""
# Mock create_index to not raise exception
mock_database.users.create_index.return_value = None
repository = UserRepository(mock_database)
mock_database.users.create_index.assert_called_once_with("username", unique=True)
def test_i_can_handle_index_creation_error(mock_database):
"""Test that index creation errors are handled gracefully."""
# Mock create_index to raise exception (index already exists)
mock_database.users.create_index.side_effect = Exception("Index already exists")
# Should not raise exception
repository = UserRepository(mock_database)
assert repository is not None
mock_database.users.create_index.assert_called_once_with("username", unique=True)