Compare commits

2 Commits

Author SHA1 Message Date
34f7854b3c Working on tasks 2025-09-21 22:51:34 +02:00
98c43feadf Added JobRepository and JobServices 2025-09-21 19:11:57 +02:00
21 changed files with 2233 additions and 170 deletions

2
.gitignore vendored
View File

@@ -1,3 +1,5 @@
volumes
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[codz]

View File

@@ -95,8 +95,8 @@ MyDocManager/
│ │ ├── requirements.txt
│ │ ├── app/
│ │ │ ├── main.py
│ │ │ ├── file_watcher.py
│ │ │ ├── celery_app.py
│ │ │ ├── file_watcher.py # FileWatcher class with observer thread
│ │ │ ├── celery_app.py # Celery Configuration
│ │ │ ├── config/
│ │ │ │ ├── __init__.py
│ │ │ │ └── settings.py # JWT, MongoDB config
@@ -342,6 +342,71 @@ class ProcessingJob(BaseModel):
4. TODO : Watchdog file monitoring implementation
5. TODO : FastAPI integration and startup coordination
## Job Management Layer
### Repository Pattern Implementation
The job management system follows the repository pattern for clean separation between data access and business logic.
#### JobRepository
Handles direct MongoDB operations for processing jobs:
**CRUD Operations:**
- `create_job()` - Create new processing job with automatic `created_at` timestamp
- `get_job_by_id()` - Retrieve job by ObjectId
- `update_job_status()` - Update job status with automatic timestamp management
- `delete_job()` - Remove job from database
- `get_jobs_by_file_id()` - Get all jobs for specific file
- `get_jobs_by_status()` - Get jobs filtered by processing status
**Automatic Timestamp Management:**
- `created_at`: Set automatically during job creation
- `started_at`: Set automatically when status changes to PROCESSING
- `completed_at`: Set automatically when status changes to COMPLETED or FAILED
#### JobService
Provides business logic layer with strict status transition validation:
**Status Transition Methods:**
- `mark_job_as_started()` - PENDING → PROCESSING
- `mark_job_as_completed()` - PROCESSING → COMPLETED
- `mark_job_as_failed()` - PROCESSING → FAILED
**Validation Rules:**
- Strict status transitions (invalid transitions raise exceptions)
- Job existence verification before any operation
- Automatic timestamp management through repository layer
#### Custom Exceptions
**JobNotFoundError**: Raised when job ID doesn't exist
**InvalidStatusTransitionError**: Raised for invalid status transitions
**JobRepositoryError**: Raised for MongoDB operation failures
#### Valid Status Transitions
```
PENDING → PROCESSING (via mark_job_as_started)
PROCESSING → COMPLETED (via mark_job_as_completed)
PROCESSING → FAILED (via mark_job_as_failed)
```
All other transitions are forbidden and will raise `InvalidStatusTransitionError`.
### File Structure
```
src/file-processor/app/
├── database/repositories/
│ └── job_repository.py # JobRepository class
├── services/
│ └── job_service.py # JobService class
└── exceptions/
└── job_exceptions.py # Custom exceptions
```
### Processing Pipeline Features
- **Duplicate Detection**: SHA256 hashing prevents reprocessing same files

View File

@@ -34,9 +34,10 @@ services:
environment:
- REDIS_URL=redis://redis:6379/0
- MONGODB_URL=mongodb://admin:password123@mongodb:27017/mydocmanager?authSource=admin
- PYTHONPATH=/app
- PYTHONPATH=/app:/tasks # Added /tasks to Python path
volumes:
- ./src/file-processor:/app
- ./src/worker/tasks:/app/tasks # <- Added: shared access to worker tasks
- ./volumes/watched_files:/watched_files
- ./volumes/objects:/objects
depends_on:
@@ -57,14 +58,15 @@ services:
- MONGODB_URL=mongodb://admin:password123@mongodb:27017/mydocmanager?authSource=admin
- PYTHONPATH=/app
volumes:
- ./src/worker/tasks:/app
- ./src/worker:/app
- ./volumes/watched_files:/watched_files
depends_on:
- redis
- mongodb
networks:
- mydocmanager-network
command: celery -A main worker --loglevel=info
command: celery -A tasks.main worker --loglevel=info
#command: celery -A main --loglevel=info # pour la production
volumes:
mongodb-data:

View File

@@ -45,6 +45,7 @@ tzdata==2025.2
uvicorn==0.35.0
uvloop==0.21.0
vine==5.1.0
watchdog==6.0.0
watchfiles==1.1.0
wcwidth==0.2.13
websockets==15.0.1

View File

@@ -3,6 +3,12 @@ FROM python:3.12-slim
# Set working directory
WORKDIR /app
# Install libmagic
RUN apt-get update && apt-get install -y --no-install-recommends \
libmagic1 \
file \
&& rm -rf /var/lib/apt/lists/*
# Copy requirements and install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

View File

@@ -6,7 +6,6 @@ using simple os.getenv() approach without external validation libraries.
"""
import os
from typing import Optional
def get_mongodb_url() -> str:
@@ -51,15 +50,6 @@ def get_jwt_secret_key() -> str:
raise ValueError("JWT_SECRET environment variable must be set in production")
return secret
def get_objects_folder() -> str:
"""
Get Vault path from environment variables.
Returns:
str: Vault path
"""
return os.getenv("OBJECTS_FOLDER", "/objects")
def get_jwt_algorithm() -> str:
"""
@@ -91,4 +81,19 @@ def is_development_environment() -> bool:
Returns:
bool: True if development environment
"""
return os.getenv("ENVIRONMENT", "development").lower() == "development"
return os.getenv("ENVIRONMENT", "development").lower() == "development"
def get_objects_folder() -> str:
"""
Get Vault path from environment variables.
Returns:
str: Vault path
"""
return os.getenv("OBJECTS_FOLDER", "/objects")
def watch_directory() -> str:
"""Directory to monitor for new files"""
return os.getenv("WATCH_DIRECTORY", "/watched_files")

View File

@@ -0,0 +1,229 @@
"""
Repository for managing processing jobs in MongoDB.
This module provides data access layer for ProcessingJob operations
with automatic timestamp management and error handling.
"""
from datetime import datetime
from typing import List, Optional
from motor.motor_asyncio import AsyncIOMotorCollection, AsyncIOMotorDatabase
from pymongo.errors import PyMongoError
from app.exceptions.job_exceptions import JobRepositoryError
from app.models.job import ProcessingJob, ProcessingStatus
from app.models.types import PyObjectId
class JobRepository:
"""
Repository for processing job data access operations.
Provides CRUD operations for ProcessingJob documents with automatic
timestamp management and proper error handling.
"""
def __init__(self, database: AsyncIOMotorDatabase):
"""Initialize repository with MongoDB collection reference."""
self.db = database
self.collection: AsyncIOMotorCollection = self.db.processing_jobs
async def _ensure_indexes(self):
"""
Ensure required database indexes exist.
Creates unique index on username field to prevent duplicates.
"""
try:
await self.collection.create_index("document_id", unique=True)
except PyMongoError:
# Index might already exist, ignore error
pass
async def initialize(self):
"""
Initialize repository by ensuring required indexes exist.
Should be called after repository instantiation to setup database indexes.
"""
await self._ensure_indexes()
return self
async def create_job(self, document_id: PyObjectId, task_id: Optional[str] = None) -> ProcessingJob:
"""
Create a new processing job.
Args:
file_id: Reference to the file document
task_id: Optional Celery task UUID
Returns:
The created ProcessingJob
Raises:
JobRepositoryError: If database operation fails
"""
try:
job_data = {
"document_id": document_id,
"status": ProcessingStatus.PENDING,
"task_id": task_id,
"created_at": datetime.now(),
"started_at": None,
"completed_at": None,
"error_message": None
}
result = await self.collection.insert_one(job_data)
job_data["_id"] = result.inserted_id
return ProcessingJob(**job_data)
except PyMongoError as e:
raise JobRepositoryError("create_job", e)
async def find_job_by_id(self, job_id: PyObjectId) -> Optional[ProcessingJob]:
"""
Retrieve a job by its ID.
Args:
job_id: The job ObjectId
Returns:
The ProcessingJob document
Raises:
JobNotFoundError: If job doesn't exist
JobRepositoryError: If database operation fails
"""
try:
job_data = await self.collection.find_one({"_id": job_id})
if job_data:
return ProcessingJob(**job_data)
return None
except PyMongoError as e:
raise JobRepositoryError("get_job_by_id", e)
async def update_job_status(
self,
job_id: PyObjectId,
status: ProcessingStatus,
error_message: Optional[str] = None
) -> Optional[ProcessingJob]:
"""
Update job status with automatic timestamp management.
Args:
job_id: The job ObjectId
status: New processing status
error_message: Optional error message for failed jobs
Returns:
The updated ProcessingJob
Raises:
JobNotFoundError: If job doesn't exist
JobRepositoryError: If database operation fails
"""
try:
# Prepare update data
update_data = {"status": status}
# Set appropriate timestamp based on status
current_time = datetime.now()
if status == ProcessingStatus.PROCESSING:
update_data["started_at"] = current_time
elif status in (ProcessingStatus.COMPLETED, ProcessingStatus.FAILED):
update_data["completed_at"] = current_time
# Add error message if provided
if error_message is not None:
update_data["error_message"] = error_message
result = await self.collection.find_one_and_update(
{"_id": job_id},
{"$set": update_data},
return_document=True
)
if result:
return ProcessingJob(**result)
return None
except PyMongoError as e:
raise JobRepositoryError("update_job_status", e)
async def delete_job(self, job_id: PyObjectId) -> bool:
"""
Delete a job from the database.
Args:
job_id: The job ObjectId
Returns:
True if job was deleted, False if not found
Raises:
JobRepositoryError: If database operation fails
"""
try:
result = await self.collection.delete_one({"_id": job_id})
return result.deleted_count > 0
except PyMongoError as e:
raise JobRepositoryError("delete_job", e)
async def find_jobs_by_document_id(self, document_id: PyObjectId) -> List[ProcessingJob]:
"""
Retrieve all jobs for a specific file.
Args:
document_id: The file ObjectId
Returns:
List of ProcessingJob documents
Raises:
JobRepositoryError: If database operation fails
"""
try:
cursor = self.collection.find({"document_id": document_id})
jobs = []
async for job_data in cursor:
jobs.append(ProcessingJob(**job_data))
return jobs
except PyMongoError as e:
raise JobRepositoryError("get_jobs_by_file_id", e)
async def get_jobs_by_status(self, status: ProcessingStatus) -> List[ProcessingJob]:
"""
Retrieve all jobs with a specific status.
Args:
status: The processing status to filter by
Returns:
List of ProcessingJob documents
Raises:
JobRepositoryError: If database operation fails
"""
try:
cursor = self.collection.find({"status": status})
jobs = []
async for job_data in cursor:
jobs.append(ProcessingJob(**job_data))
return jobs
except PyMongoError as e:
raise JobRepositoryError("get_jobs_by_status", e)

View File

@@ -0,0 +1,38 @@
"""
Custom exceptions for job management operations.
This module defines specific exceptions for job processing lifecycle
and repository operations to provide clear error handling.
"""
from app.models.job import ProcessingStatus
class InvalidStatusTransitionError(Exception):
"""
Raised when an invalid status transition is attempted.
This exception indicates that an attempt was made to change a job's
status to an invalid target status given the current status.
"""
def __init__(self, current_status: ProcessingStatus, target_status: ProcessingStatus):
self.current_status = current_status
self.target_status = target_status
super().__init__(
f"Invalid status transition from '{current_status}' to '{target_status}'"
)
class JobRepositoryError(Exception):
"""
Raised when a MongoDB operation fails in the job repository.
This exception wraps database-related errors that occur during
job repository operations.
"""
def __init__(self, operation: str, original_error: Exception):
self.operation = operation
self.original_error = original_error
super().__init__(f"Repository operation '{operation}' failed: {str(original_error)}")

View File

@@ -0,0 +1,241 @@
"""
File watcher implementation with Watchdog observer and ProcessingJob management.
This module provides real-time file monitoring for document processing.
When a file is created in the watched directory, it:
1. Creates a document record via DocumentService
2. Dispatches a Celery task for processing
3. Creates a ProcessingJob to track the task lifecycle
"""
import logging
import threading
from pathlib import Path
from typing import Optional
from watchdog.events import FileSystemEventHandler, FileCreatedEvent
from watchdog.observers import Observer
from app.services.document_service import DocumentService
from app.services.job_service import JobService
logger = logging.getLogger(__name__)
class DocumentFileEventHandler(FileSystemEventHandler):
"""
Event handler for document file creation events.
Processes newly created files by creating document records,
dispatching Celery tasks, and managing processing jobs.
"""
SUPPORTED_EXTENSIONS = {'.txt', '.pdf', '.docx'}
def __init__(self, document_service: DocumentService, job_service: JobService):
"""
Initialize the event handler.
Args:
document_service: Service for document management
job_service: Service for processing job management
"""
super().__init__()
self.document_service = document_service
self.job_service = job_service
def on_created(self, event: FileCreatedEvent) -> None:
"""
Handle file creation events.
Args:
event: File system event containing file path information
"""
if event.is_directory:
return
filepath = event.src_path
file_extension = Path(filepath).suffix.lower()
if file_extension not in self.SUPPORTED_EXTENSIONS:
logger.info(f"Ignoring unsupported file type: {filepath}")
return
logger.info(f"Processing new file: {filepath}")
try:
from tasks.document_processing import process_document
celery_result = process_document.delay(filepath)
celery_task_id = celery_result.id
logger.info(f"Dispatched Celery task with ID: {celery_task_id}")
except Exception as e:
logger.error(f"Failed to process file {filepath}: {str(e)}")
# Note: We don't re-raise the exception to keep the watcher running
class FileWatcher:
"""
File system watcher for automatic document processing.
Monitors a directory for new files and triggers processing pipeline
using a dedicated observer thread.
"""
def __init__(
self,
watch_directory: str,
document_service: DocumentService,
job_service: JobService,
recursive: bool = True
):
"""
Initialize the file watcher.
Args:
watch_directory: Directory path to monitor
document_service: Service for document management
job_service: Service for processing job management
recursive: Whether to watch subdirectories recursively
"""
self.watch_directory = Path(watch_directory)
self.recursive = recursive
self.observer: Optional[Observer] = None
self._observer_thread: Optional[threading.Thread] = None
self._stop_event = threading.Event()
# Validate watch directory
if not self.watch_directory.exists():
raise ValueError(f"Watch directory does not exist: {watch_directory}")
if not self.watch_directory.is_dir():
raise ValueError(f"Watch path is not a directory: {watch_directory}")
# Create event handler
self.event_handler = DocumentFileEventHandler(
document_service=document_service,
job_service=job_service
)
logger.info(f"FileWatcher initialized for directory: {self.watch_directory}")
def start(self) -> None:
"""
Start the file watcher in a separate thread.
Raises:
RuntimeError: If the watcher is already running
"""
if self.is_running():
raise RuntimeError("FileWatcher is already running")
self.observer = Observer()
self.observer.schedule(
self.event_handler,
str(self.watch_directory),
recursive=self.recursive
)
# Start observer in separate thread
self._observer_thread = threading.Thread(
target=self._run_observer,
name="FileWatcher-Observer"
)
self._stop_event.clear()
self._observer_thread.start()
logger.info("FileWatcher started successfully")
def stop(self, timeout: float = 5.0) -> None:
"""
Stop the file watcher gracefully.
Args:
timeout: Maximum time to wait for graceful shutdown
"""
if not self.is_running():
logger.warning("FileWatcher is not running")
return
logger.info("Stopping FileWatcher...")
# Signal stop and wait for observer thread
self._stop_event.set()
if self.observer:
self.observer.stop()
if self._observer_thread and self._observer_thread.is_alive():
self._observer_thread.join(timeout=timeout)
if self._observer_thread.is_alive():
logger.warning("FileWatcher thread did not stop gracefully within timeout")
else:
logger.info("FileWatcher stopped gracefully")
# Clean up
self.observer = None
self._observer_thread = None
def is_running(self) -> bool:
"""
Check if the file watcher is currently running.
Returns:
True if the watcher is running, False otherwise
"""
return (
self.observer is not None
and self._observer_thread is not None
and self._observer_thread.is_alive()
)
def _run_observer(self) -> None:
"""
Internal method to run the observer in a separate thread.
This method should not be called directly.
"""
if not self.observer:
logger.error("Observer not initialized")
return
try:
self.observer.start()
logger.info("Observer thread started")
# Keep the observer running until stop is requested
while not self._stop_event.is_set():
self._stop_event.wait(timeout=1.0)
logger.info("Observer thread stopping...")
except Exception as e:
logger.error(f"Observer thread error: {str(e)}")
finally:
if self.observer:
self.observer.join()
logger.info("Observer thread stopped")
def create_file_watcher(
watch_directory: str,
document_service: DocumentService,
job_service: JobService
) -> FileWatcher:
"""
Factory function to create a FileWatcher instance.
Args:
watch_directory: Directory path to monitor
document_service: Service for document management
job_service: Service for processing job management
Returns:
Configured FileWatcher instance
"""
return FileWatcher(
watch_directory=watch_directory,
document_service=document_service,
job_service=job_service
)

View File

@@ -1,205 +1,174 @@
"""
FastAPI application for MyDocManager file processor service.
FastAPI application with integrated FileWatcher for document processing.
This service provides API endpoints for health checks and task dispatching.
This module provides the main FastAPI application with:
- JWT authentication
- User management APIs
- Real-time file monitoring via FileWatcher
- Document processing via Celery tasks
"""
import logging
import os
from contextlib import asynccontextmanager
from typing import AsyncGenerator
import redis
from celery import Celery
from fastapi import FastAPI, HTTPException, Depends
from pydantic import BaseModel
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from app.database.connection import test_database_connection, get_database
from app.database.repositories.user_repository import UserRepository
from app.models.user import UserCreate
from app.config import settings
from app.database.connection import get_database
from app.file_watcher import create_file_watcher, FileWatcher
from app.services.document_service import DocumentService
from app.services.init_service import InitializationService
from app.services.job_service import JobService
from app.services.user_service import UserService
# from api.routes.auth import router as auth_router
# from api.routes.users import router as users_router
# from api.routes.documents import router as documents_router
# from api.routes.jobs import router as jobs_router
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Global file watcher instance
file_watcher: FileWatcher = None
@asynccontextmanager
async def lifespan(app: FastAPI):
async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
"""
Application lifespan manager for startup and shutdown tasks.
Handles initialization tasks that need to run when the application starts,
including admin user creation and other setup procedures.
FastAPI lifespan context manager.
Handles application startup and shutdown events including:
- Database connection
- Default admin user creation
- FileWatcher startup/shutdown
"""
# Startup tasks
global file_watcher
# Startup
logger.info("Starting MyDocManager application...")
try:
# Initialize database connection
database = get_database()
logger.info("Database connection established")
# Initialize repositories and services
user_service = await UserService(database).initialize()
document_service = DocumentService(database=database, objects_folder=settings.get_objects_folder())
job_service = JobService(database=database)
user_service = UserService(database=database)
logger.info("Service created")
# Create default admin user
init_service = InitializationService(user_service)
await init_service.initialize_application()
logger.info("Default admin user initialization completed")
# Run initialization tasks
initialization_result = await init_service.initialize_application()
# Create and start file watcher
file_watcher = create_file_watcher(
watch_directory=settings.watch_directory(),
document_service=document_service,
job_service=job_service
)
file_watcher.start()
logger.info(f"FileWatcher started for directory: {settings.watch_directory()}")
if initialization_result["initialization_success"]:
logger.info("Application startup completed successfully")
if initialization_result["admin_user_created"]:
logger.info("Default admin user was created during startup")
else:
logger.error("Application startup completed with errors:")
for error in initialization_result["errors"]:
logger.error(f" - {error}")
logger.info("Application startup completed successfully")
yield
except Exception as e:
raise e
logger.error(f"Critical error during application startup: {str(e)}")
# You might want to decide if the app should continue or exit here
# For now, we log the error but continue
logger.error(f"Application startup failed: {str(e)}")
raise
yield # Application is running
# Shutdown tasks (if needed)
logger.info("Shutting down MyDocManager application...")
finally:
# Shutdown
logger.info("Shutting down MyDocManager application...")
if file_watcher and file_watcher.is_running():
file_watcher.stop()
logger.info("FileWatcher stopped")
logger.info("Application shutdown completed")
# Initialize FastAPI app
# Create FastAPI application
app = FastAPI(
title="MyDocManager File Processor",
description="File processing and task dispatch service",
version="1.0.0",
title="MyDocManager",
description="Real-time document processing application with authentication",
version="0.1.0",
lifespan=lifespan
)
# Environment variables
REDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379/0")
MONGODB_URL = os.getenv("MONGODB_URL", "mongodb://localhost:27017")
# Initialize Redis client
try:
redis_client = redis.from_url(REDIS_URL)
except Exception as e:
redis_client = None
print(f"Warning: Could not connect to Redis: {e}")
# Initialize Celery
celery_app = Celery(
"file_processor",
broker=REDIS_URL,
backend=REDIS_URL
# Configure CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["http://localhost:3000"], # React frontend
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Pydantic models
class TestTaskRequest(BaseModel):
"""Request model for test task."""
message: str
def get_user_service() -> UserService:
"""
Dependency to get user service instance.
This should be properly implemented with database connection management
in your actual application.
"""
database = get_database()
user_repository = UserRepository(database)
return UserService(user_repository)
# Your API routes would use the service like this:
@app.post("/api/users")
async def create_user(
user_data: UserCreate,
user_service: UserService = Depends(get_user_service)
):
return user_service.create_user(user_data)
# Include routers
# app.include_router(auth_router, prefix="/auth", tags=["Authentication"])
# app.include_router(users_router, prefix="/users", tags=["User Management"])
# app.include_router(documents_router, prefix="/documents", tags=["Documents"])
# app.include_router(jobs_router, prefix="/jobs", tags=["Processing Jobs"])
@app.get("/health")
async def health_check():
"""
Health check endpoint.
Returns:
dict: Service health status with dependencies
Dictionary containing application health status
"""
health_status = {
return {
"status": "healthy",
"service": "file-processor",
"dependencies": {
"redis": "unknown",
"mongodb": "unknown"
},
"service": "MyDocManager",
"version": "1.0.0",
"file_watcher_running": file_watcher.is_running() if file_watcher else False
}
# Check Redis connection
if redis_client:
try:
redis_client.ping()
health_status["dependencies"]["redis"] = "connected"
except Exception:
health_status["dependencies"]["redis"] = "disconnected"
health_status["status"] = "degraded"
# check MongoDB connection
if test_database_connection():
health_status["dependencies"]["mongodb"] = "connected"
else:
health_status["dependencies"]["mongodb"] = "disconnected"
return health_status
@app.post("/test-task")
async def dispatch_test_task(request: TestTaskRequest):
"""
Dispatch a test task to Celery worker.
Args:
request: Test task request containing message
Returns:
dict: Task dispatch information
Raises:
HTTPException: If task dispatch fails
"""
try:
# Send task to worker
task = celery_app.send_task(
"main.test_task",
args=[request.message]
)
return {
"status": "dispatched",
"task_id": task.id,
"message": f"Test task dispatched with message: {request.message}"
}
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"Failed to dispatch task: {str(e)}"
)
@app.get("/")
async def root():
"""
Root endpoint.
Root endpoint with basic application information.
Returns:
dict: Basic service information
Dictionary containing welcome message and available endpoints
"""
return {
"service": "MyDocManager File Processor",
"version": "1.0.0",
"status": "running"
"message": "Welcome to MyDocManager",
"description": "Real-time document processing application",
"docs": "/docs",
"health": "/health"
}
@app.get("/watcher/status")
async def watcher_status():
"""
Get file watcher status.
Returns:
Dictionary containing file watcher status information
"""
if not file_watcher:
return {
"status": "not_initialized",
"running": False
}
return {
"status": "initialized",
"running": file_watcher.is_running(),
"watch_directory": str(file_watcher.watch_directory),
"recursive": file_watcher.recursive
}

View File

@@ -25,7 +25,7 @@ class ProcessingJob(BaseModel):
"""
id: Optional[PyObjectId] = Field(default=None, alias="_id")
file_id: PyObjectId = Field(..., description="Reference to file document")
document_id: PyObjectId = Field(..., description="Reference to file document")
status: ProcessingStatus = Field(default=ProcessingStatus.PENDING, description="Current processing status")
task_id: Optional[str] = Field(default=None, description="Celery task UUID")
created_at: Optional[datetime] = Field(default=None, description="Timestamp when job was created")

View File

@@ -95,6 +95,28 @@ class DocumentService:
"""
return magic.from_buffer(file_bytes, mime=True)
@staticmethod
def _read_file_bytes(file_path: str | Path) -> bytes:
"""
Read file content as bytes asynchronously.
Args:
file_path (str | Path): Path of the file to read
Returns:
bytes: Content of the file
Raises:
FileNotFoundError: If the file does not exist
OSError: If any I/O error occurs
"""
path = Path(file_path)
if not path.exists():
raise FileNotFoundError(f"File not found: {file_path}")
return path.read_bytes()
def _get_document_path(self, file_hash):
"""
@@ -117,7 +139,7 @@ class DocumentService:
async def create_document(
self,
file_path: str,
file_bytes: bytes,
file_bytes: bytes | None = None,
encoding: str = "utf-8"
) -> FileDocument:
"""
@@ -140,6 +162,7 @@ class DocumentService:
PyMongoError: If database operation fails
"""
# Calculate automatic attributes
file_bytes = file_bytes or self._read_file_bytes(file_path)
file_hash = self._calculate_file_hash(file_bytes)
file_type = self._detect_file_type(file_path)
mime_type = self._detect_mime_type(file_bytes)

View File

@@ -8,8 +8,8 @@ creating default admin user if none exists.
import logging
from typing import Optional
from app.models.user import UserCreate, UserInDB, UserCreateNoValidation
from app.models.auth import UserRole
from app.models.user import UserInDB, UserCreateNoValidation
from app.services.user_service import UserService
logger = logging.getLogger(__name__)
@@ -31,7 +31,6 @@ class InitializationService:
user_service (UserService): Service for user operations
"""
self.user_service = user_service
async def ensure_admin_user_exists(self) -> Optional[UserInDB]:
"""
@@ -131,4 +130,23 @@ class InitializationService:
logger.error(error_msg)
initialization_summary["errors"].append(error_msg)
return initialization_summary
self.log_initialization_result(initialization_summary)
return initialization_summary
@staticmethod
def log_initialization_result(summary: dict) -> None:
"""
Log the result of the initialization process.
Args:
summary (dict): Summary of initialization tasks performed
"""
if summary["initialization_success"]:
logger.info("Application startup completed successfully")
if summary["admin_user_created"]:
logger.info("Default admin user was created during startup")
else:
logger.error("Application startup completed with errors:")
for error in summary["errors"]:
logger.error(f" - {error}")

View File

@@ -0,0 +1,182 @@
"""
Service layer for job processing business logic.
This module provides high-level operations for managing processing jobs
with strict status transition validation and business rules enforcement.
"""
from typing import Optional
from app.database.repositories.job_repository import JobRepository
from app.exceptions.job_exceptions import InvalidStatusTransitionError
from app.models.job import ProcessingJob, ProcessingStatus
from app.models.types import PyObjectId
class JobService:
"""
Service for processing job business logic operations.
Provides high-level job management with strict status transition
validation and business rule enforcement.
"""
def __init__(self, database):
"""
Initialize service with job repository.
Args:
repository: Optional JobRepository instance (creates default if None)
"""
self.db = database
self.repository = JobRepository(database)
async def initialize(self):
await self.repository.initialize()
return self
async def create_job(self, file_id: PyObjectId, task_id: Optional[str] = None) -> ProcessingJob:
"""
Create a new processing job.
Args:
file_id: Reference to the file document
task_id: Optional Celery task UUID
Returns:
The created ProcessingJob
Raises:
JobRepositoryError: If database operation fails
"""
return await self.repository.create_job(file_id, task_id)
async def get_job_by_id(self, job_id: PyObjectId) -> ProcessingJob:
"""
Retrieve a job by its ID.
Args:
job_id: The job ObjectId
Returns:
The ProcessingJob document
Raises:
JobNotFoundError: If job doesn't exist
JobRepositoryError: If database operation fails
"""
return await self.repository.find_job_by_id(job_id)
async def mark_job_as_started(self, job_id: PyObjectId) -> ProcessingJob:
"""
Mark a job as started (PENDING → PROCESSING).
Args:
job_id: The job ObjectId
Returns:
The updated ProcessingJob
Raises:
JobNotFoundError: If job doesn't exist
InvalidStatusTransitionError: If job is not in PENDING status
JobRepositoryError: If database operation fails
"""
# Get current job to validate transition
current_job = await self.repository.find_job_by_id(job_id)
# Validate status transition
if current_job.status != ProcessingStatus.PENDING:
raise InvalidStatusTransitionError(current_job.status, ProcessingStatus.PROCESSING)
# Update status
return await self.repository.update_job_status(job_id, ProcessingStatus.PROCESSING)
async def mark_job_as_completed(self, job_id: PyObjectId) -> ProcessingJob:
"""
Mark a job as completed (PROCESSING → COMPLETED).
Args:
job_id: The job ObjectId
Returns:
The updated ProcessingJob
Raises:
JobNotFoundError: If job doesn't exist
InvalidStatusTransitionError: If job is not in PROCESSING status
JobRepositoryError: If database operation fails
"""
# Get current job to validate transition
current_job = await self.repository.find_job_by_id(job_id)
# Validate status transition
if current_job.status != ProcessingStatus.PROCESSING:
raise InvalidStatusTransitionError(current_job.status, ProcessingStatus.COMPLETED)
# Update status
return await self.repository.update_job_status(job_id, ProcessingStatus.COMPLETED)
async def mark_job_as_failed(
self,
job_id: PyObjectId,
error_message: Optional[str] = None
) -> ProcessingJob:
"""
Mark a job as failed (PROCESSING → FAILED).
Args:
job_id: The job ObjectId
error_message: Optional error description
Returns:
The updated ProcessingJob
Raises:
JobNotFoundError: If job doesn't exist
InvalidStatusTransitionError: If job is not in PROCESSING status
JobRepositoryError: If database operation fails
"""
# Get current job to validate transition
current_job = await self.repository.find_job_by_id(job_id)
# Validate status transition
if current_job.status != ProcessingStatus.PROCESSING:
raise InvalidStatusTransitionError(current_job.status, ProcessingStatus.FAILED)
# Update status with error message
return await self.repository.update_job_status(
job_id,
ProcessingStatus.FAILED,
error_message
)
async def delete_job(self, job_id: PyObjectId) -> bool:
"""
Delete a job from the database.
Args:
job_id: The job ObjectId
Returns:
True if job was deleted, False if not found
Raises:
JobRepositoryError: If database operation fails
"""
return await self.repository.delete_job(job_id)
async def get_jobs_by_status(self, status: ProcessingStatus) -> list[ProcessingJob]:
"""
Retrieve all jobs with a specific status.
Args:
status: The processing status to filter by
Returns:
List of ProcessingJob documents
Raises:
JobRepositoryError: If database operation fails
"""
return await self.repository.get_jobs_by_status(status)

View File

@@ -8,4 +8,5 @@ pymongo==4.15.0
pydantic==2.11.9
redis==6.4.0
uvicorn==0.35.0
python-magic==0.4.27
python-magic==0.4.27
watchdog==6.0.0

View File

@@ -8,7 +8,7 @@ COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application code
COPY tasks/ .
COPY . .
# Command will be overridden by docker-compose
CMD ["celery", "-A", "main", "worker", "--loglevel=info"]

View File

@@ -0,0 +1,179 @@
"""
Celery tasks for document processing with ProcessingJob status management.
This module contains Celery tasks that handle document content extraction
and update processing job statuses throughout the task lifecycle.
"""
import logging
from typing import Any, Dict
from tasks.main import app as celery_app
logger = logging.getLogger(__name__)
# @celery_app.task(bind=True, autoretry_for=(Exception,), retry_kwargs={'max_retries': 3, 'countdown': 60})
# def process_document(self, document_service, job_service, filepath: str) -> Dict[str, Any]:
# """
# Process a document file and extract its content.
#
# This task:
# 1. Updates the processing job status to PROCESSING
# 2. Performs document content extraction
# 3. Updates job status to COMPLETED or FAILED based on result
#
# Args:
# self : Celery task instance
# job_service : Instance of JobService
# document_service : Instance of DocumentService
# filepath: Full path to the document file to process
#
# Returns:
# Dictionary containing processing results
#
# Raises:
# Exception: Any processing error (will trigger retry)
# """
# task_id = self.request.id
# logger.info(f"Starting document processing task {task_id} for file: {filepath}")
#
# try:
# # Step 1: Mark job as started
# await job_service.mark_job_as_started(task_id=task_id)
# logger.info(f"Job {task_id} marked as PROCESSING")
#
# # Step 2: Process the document (extract content, OCR, etc.)
# document = await self.document_service.create_document(filepath)
# logger.info(f"Created document record with ID: {document.id}")
#
# result = document_service.extract_document_content(filepath)
# logger.info(f"Document content extracted successfully for task {task_id}")
#
# # Step 3: Mark job as completed
# await job_service.mark_job_as_completed(task_id=task_id)
# logger.info(f"Job {task_id} marked as COMPLETED")
#
# return {
# "task_id": task_id,
# "filepath": filepath,
# "status": "completed",
# "content_length": len(result.get("content", "")),
# "extraction_method": result.get("extraction_method"),
# "processing_time": result.get("processing_time")
# }
#
# except Exception as e:
# error_message = f"Document processing failed: {str(e)}"
# logger.error(f"Task {task_id} failed: {error_message}")
#
# try:
# # Mark job as failed
# job_service.mark_job_as_failed(task_id=task_id, error_message=error_message)
# logger.info(f"Job {task_id} marked as FAILED")
# except Exception as job_error:
# logger.error(f"Failed to update job status for task {task_id}: {str(job_error)}")
#
# # Re-raise the exception to trigger Celery retry mechanism
# raise
@celery_app.task(name="tasks.document_processing.process_document",
bind=True,
autoretry_for=(Exception,),
retry_kwargs={'max_retries': 3, 'countdown': 60})
def process_document(self, filepath: str) -> Dict[str, Any]:
"""
Process a document file and extract its content.
This task:
1. Updates the processing job status to PROCESSING
2. Performs document content extraction
3. Updates job status to COMPLETED or FAILED based on result
Args:
self : Celery task instance
job_service : Instance of JobService
document_service : Instance of DocumentService
filepath: Full path to the document file to process
Returns:
Dictionary containing processing results
Raises:
Exception: Any processing error (will trigger retry)
"""
task_id = self.request.id
logger.info(f"Starting document processing task {task_id} for file: {filepath}")
@celery_app.task(bind=True)
def cleanup_old_processing_jobs(self, days_old: int = 30) -> Dict[str, Any]:
"""
Clean up old processing jobs from the database.
This maintenance task removes completed and failed jobs older than
the specified number of days.
Args:
days_old: Number of days after which to clean up jobs
Returns:
Dictionary containing cleanup statistics
"""
task_id = self.request.id
logger.info(f"Starting cleanup task {task_id} for jobs older than {days_old} days")
job_service = JobService()
try:
# Perform cleanup
cleanup_result = job_service.cleanup_old_jobs(days_old=days_old)
logger.info(
f"Cleanup task {task_id} completed: "
f"deleted {cleanup_result['deleted_count']} jobs"
)
return {
"task_id": task_id,
"status": "completed",
"deleted_count": cleanup_result["deleted_count"],
"days_old": days_old
}
except Exception as e:
error_message = f"Cleanup task failed: {str(e)}"
logger.error(f"Cleanup task {task_id} failed: {error_message}")
raise
@celery_app.task(bind=True)
def get_processing_statistics(self) -> Dict[str, Any]:
"""
Generate processing statistics for monitoring.
Returns:
Dictionary containing current processing statistics
"""
task_id = self.request.id
logger.info(f"Generating processing statistics for task {task_id}")
job_service = JobService()
try:
stats = job_service.get_processing_statistics()
logger.info(f"Statistics generated for task {task_id}")
return {
"task_id": task_id,
"status": "completed",
"statistics": stats,
"timestamp": stats.get("generated_at")
}
except Exception as e:
error_message = f"Statistics generation failed: {str(e)}"
logger.error(f"Statistics task {task_id} failed: {error_message}")
raise

View File

@@ -6,6 +6,7 @@ This module contains all Celery tasks for processing documents.
import os
import time
from celery import Celery
# Environment variables
@@ -110,4 +111,4 @@ def process_document_task(self, file_path: str):
if __name__ == "__main__":
app.start()
app.start()

View File

@@ -0,0 +1,523 @@
"""
Test suite for JobRepository with async/await support.
This module contains comprehensive tests for all JobRepository methods
using mongomock-motor for in-memory MongoDB testing.
"""
from datetime import datetime
import pytest
import pytest_asyncio
from bson import ObjectId
from mongomock_motor import AsyncMongoMockClient
from pymongo.errors import PyMongoError
from app.database.repositories.job_repository import JobRepository
from app.exceptions.job_exceptions import JobRepositoryError
from app.models.job import ProcessingJob, ProcessingStatus
from app.models.types import PyObjectId
@pytest_asyncio.fixture
async def in_memory_repository():
"""Create an in-memory JobRepository for testing."""
client = AsyncMongoMockClient()
db = client.test_database
repo = JobRepository(db)
await repo.initialize()
return repo
@pytest.fixture
def sample_document_id():
"""Sample document ObjectId for testing."""
return PyObjectId()
@pytest.fixture
def sample_task_id():
"""Sample Celery task ID for testing."""
return "celery-task-12345-abcde"
@pytest.fixture
def multiple_sample_jobs():
"""Multiple ProcessingJob objects for testing."""
doc_id_1 = ObjectId()
doc_id_2 = ObjectId()
base_time = datetime.utcnow()
return [
ProcessingJob(
document_id=doc_id_1,
status=ProcessingStatus.PENDING,
task_id="task-1",
created_at=base_time,
started_at=None,
completed_at=None,
error_message=None
),
ProcessingJob(
document_id=doc_id_2,
status=ProcessingStatus.PROCESSING,
task_id="task-2",
created_at=base_time,
started_at=base_time,
completed_at=None,
error_message=None
),
ProcessingJob(
document_id=doc_id_1,
status=ProcessingStatus.COMPLETED,
task_id="task-3",
created_at=base_time,
started_at=base_time,
completed_at=base_time,
error_message=None
)
]
class TestJobRepositoryInitialization:
"""Tests for repository initialization."""
@pytest.mark.asyncio
async def test_i_can_initialize_repository(self):
"""Test repository initialization."""
# Arrange
client = AsyncMongoMockClient()
db = client.test_database
repo = JobRepository(db)
# Act
initialized_repo = await repo.initialize()
# Assert
assert initialized_repo is repo
assert repo.db is not None
assert repo.collection is not None
class TestJobRepositoryCreation:
"""Tests for job creation functionality."""
@pytest.mark.asyncio
async def test_i_can_create_job_with_task_id(self, in_memory_repository, sample_document_id, sample_task_id):
"""Test successful job creation with task ID."""
# Act
created_job = await in_memory_repository.create_job(sample_document_id, sample_task_id)
# Assert
assert created_job is not None
assert created_job.document_id == sample_document_id
assert created_job.task_id == sample_task_id
assert created_job.status == ProcessingStatus.PENDING
assert created_job.created_at is not None
assert created_job.started_at is None
assert created_job.completed_at is None
assert created_job.error_message is None
assert created_job.id is not None
assert isinstance(created_job.id, ObjectId)
@pytest.mark.asyncio
async def test_i_can_create_job_without_task_id(self, in_memory_repository, sample_document_id):
"""Test successful job creation without task ID."""
# Act
created_job = await in_memory_repository.create_job(sample_document_id)
# Assert
assert created_job is not None
assert created_job.document_id == sample_document_id
assert created_job.task_id is None
assert created_job.status == ProcessingStatus.PENDING
assert created_job.created_at is not None
assert created_job.started_at is None
assert created_job.completed_at is None
assert created_job.error_message is None
assert created_job.id is not None
assert isinstance(created_job.id, ObjectId)
@pytest.mark.asyncio
async def test_i_cannot_create_duplicate_job_for_document(self, in_memory_repository, sample_document_id,
sample_task_id):
"""Test that creating job with duplicate document_id raises DuplicateKeyError."""
# Arrange
await in_memory_repository.create_job(sample_document_id, sample_task_id)
# Act & Assert
with pytest.raises(JobRepositoryError) as exc_info:
await in_memory_repository.create_job(sample_document_id, "different-task-id")
assert "create_job" in str(exc_info.value)
@pytest.mark.asyncio
async def test_i_cannot_create_job_with_pymongo_error(self, in_memory_repository, sample_document_id, mocker):
"""Test handling of PyMongo errors during job creation."""
# Arrange
mocker.patch.object(in_memory_repository.collection, 'insert_one', side_effect=PyMongoError("Database error"))
# Act & Assert
with pytest.raises(JobRepositoryError) as exc_info:
await in_memory_repository.create_job(sample_document_id)
assert "create_job" in str(exc_info.value)
class TestJobRepositoryFinding:
"""Tests for job finding functionality."""
@pytest.mark.asyncio
async def test_i_can_find_job_by_valid_id(self, in_memory_repository, sample_document_id, sample_task_id):
"""Test finding job by valid ObjectId."""
# Arrange
created_job = await in_memory_repository.create_job(sample_document_id, sample_task_id)
# Act
found_job = await in_memory_repository.find_job_by_id(created_job.id)
# Assert
assert found_job is not None
assert found_job.id == created_job.id
assert found_job.document_id == created_job.document_id
assert found_job.task_id == created_job.task_id
assert found_job.status == created_job.status
@pytest.mark.asyncio
async def test_i_cannot_find_job_by_nonexistent_id(self, in_memory_repository):
"""Test that nonexistent ObjectId returns None."""
# Arrange
nonexistent_id = PyObjectId()
# Act
found_job = await in_memory_repository.find_job_by_id(nonexistent_id)
# Assert
assert found_job is None
@pytest.mark.asyncio
async def test_i_cannot_find_job_with_pymongo_error(self, in_memory_repository, mocker):
"""Test handling of PyMongo errors during job finding."""
# Arrange
mocker.patch.object(in_memory_repository.collection, 'find_one', side_effect=PyMongoError("Database error"))
# Act & Assert
with pytest.raises(JobRepositoryError) as exc_info:
await in_memory_repository.find_job_by_id(PyObjectId())
assert "get_job_by_id" in str(exc_info.value)
@pytest.mark.asyncio
async def test_i_can_find_jobs_by_document_id(self, in_memory_repository, sample_document_id, sample_task_id):
"""Test finding jobs by document ID."""
# Arrange
created_job = await in_memory_repository.create_job(sample_document_id, sample_task_id)
# Act
found_jobs = await in_memory_repository.find_jobs_by_document_id(sample_document_id)
# Assert
assert len(found_jobs) == 1
assert found_jobs[0].id == created_job.id
assert found_jobs[0].document_id == sample_document_id
@pytest.mark.asyncio
async def test_i_can_find_empty_jobs_list_for_nonexistent_document(self, in_memory_repository):
"""Test that nonexistent document ID returns empty list."""
# Arrange
nonexistent_id = ObjectId()
# Act
found_jobs = await in_memory_repository.find_jobs_by_document_id(nonexistent_id)
# Assert
assert found_jobs == []
@pytest.mark.asyncio
async def test_i_cannot_find_jobs_by_document_with_pymongo_error(self, in_memory_repository, mocker):
"""Test handling of PyMongo errors during finding jobs by document ID."""
# Arrange
mocker.patch.object(in_memory_repository.collection, 'find', side_effect=PyMongoError("Database error"))
# Act & Assert
with pytest.raises(JobRepositoryError) as exc_info:
await in_memory_repository.find_jobs_by_document_id(PyObjectId())
assert "get_jobs_by_file_id" in str(exc_info.value)
@pytest.mark.asyncio
@pytest.mark.parametrize("status", [
ProcessingStatus.PENDING,
ProcessingStatus.PROCESSING,
ProcessingStatus.COMPLETED
])
async def test_i_can_find_jobs_by_pending_status(self, in_memory_repository, sample_document_id, status):
"""Test finding jobs by PENDING status."""
# Arrange
created_job = await in_memory_repository.create_job(sample_document_id)
await in_memory_repository.update_job_status(created_job.id, status)
# Act
found_jobs = await in_memory_repository.get_jobs_by_status(status)
# Assert
assert len(found_jobs) == 1
assert found_jobs[0].id == created_job.id
assert found_jobs[0].status == status
@pytest.mark.asyncio
async def test_i_can_find_jobs_by_failed_status(self, in_memory_repository, sample_document_id):
"""Test finding jobs by FAILED status."""
# Arrange
created_job = await in_memory_repository.create_job(sample_document_id)
await in_memory_repository.update_job_status(created_job.id, ProcessingStatus.FAILED, "Test error")
# Act
found_jobs = await in_memory_repository.get_jobs_by_status(ProcessingStatus.FAILED)
# Assert
assert len(found_jobs) == 1
assert found_jobs[0].id == created_job.id
assert found_jobs[0].status == ProcessingStatus.FAILED
assert found_jobs[0].error_message == "Test error"
@pytest.mark.asyncio
async def test_i_can_find_empty_jobs_list_for_unused_status(self, in_memory_repository):
"""Test that unused status returns empty list."""
# Act
found_jobs = await in_memory_repository.get_jobs_by_status(ProcessingStatus.COMPLETED)
# Assert
assert found_jobs == []
@pytest.mark.asyncio
async def test_i_cannot_find_jobs_by_status_with_pymongo_error(self, in_memory_repository, mocker):
"""Test handling of PyMongo errors during finding jobs by status."""
# Arrange
mocker.patch.object(in_memory_repository.collection, 'find', side_effect=PyMongoError("Database error"))
# Act & Assert
with pytest.raises(JobRepositoryError) as exc_info:
await in_memory_repository.get_jobs_by_status(ProcessingStatus.PENDING)
assert "get_jobs_by_status" in str(exc_info.value)
class TestJobRepositoryStatusUpdate:
"""Tests for job status update functionality."""
@pytest.mark.asyncio
async def test_i_can_update_job_status_to_processing(self, in_memory_repository, sample_document_id):
"""Test updating job status to PROCESSING with started_at timestamp."""
# Arrange
created_job = await in_memory_repository.create_job(sample_document_id)
# Act
updated_job = await in_memory_repository.update_job_status(created_job.id, ProcessingStatus.PROCESSING)
# Assert
assert updated_job is not None
assert updated_job.id == created_job.id
assert updated_job.status == ProcessingStatus.PROCESSING
assert updated_job.started_at is not None
assert updated_job.completed_at is None
assert updated_job.error_message is None
@pytest.mark.asyncio
async def test_i_can_update_job_status_to_completed(self, in_memory_repository, sample_document_id):
"""Test updating job status to COMPLETED with completed_at timestamp."""
# Arrange
created_job = await in_memory_repository.create_job(sample_document_id)
await in_memory_repository.update_job_status(created_job.id, ProcessingStatus.PROCESSING)
# Act
updated_job = await in_memory_repository.update_job_status(created_job.id, ProcessingStatus.COMPLETED)
# Assert
assert updated_job is not None
assert updated_job.id == created_job.id
assert updated_job.status == ProcessingStatus.COMPLETED
assert updated_job.started_at is not None
assert updated_job.completed_at is not None
assert updated_job.error_message is None
@pytest.mark.asyncio
async def test_i_can_update_job_status_to_failed_with_error(self, in_memory_repository, sample_document_id):
"""Test updating job status to FAILED with error message and completed_at timestamp."""
# Arrange
created_job = await in_memory_repository.create_job(sample_document_id)
error_message = "Processing failed due to invalid format"
# Act
updated_job = await in_memory_repository.update_job_status(
created_job.id, ProcessingStatus.FAILED, error_message
)
# Assert
assert updated_job is not None
assert updated_job.id == created_job.id
assert updated_job.status == ProcessingStatus.FAILED
assert updated_job.completed_at is not None
assert updated_job.error_message == error_message
@pytest.mark.asyncio
async def test_i_can_update_job_status_to_failed_without_error(self, in_memory_repository, sample_document_id):
"""Test updating job status to FAILED without error message."""
# Arrange
created_job = await in_memory_repository.create_job(sample_document_id)
# Act
updated_job = await in_memory_repository.update_job_status(created_job.id, ProcessingStatus.FAILED)
# Assert
assert updated_job is not None
assert updated_job.id == created_job.id
assert updated_job.status == ProcessingStatus.FAILED
assert updated_job.completed_at is not None
assert updated_job.error_message is None
@pytest.mark.asyncio
async def test_i_cannot_update_nonexistent_job_status(self, in_memory_repository):
"""Test that updating nonexistent job returns None."""
# Arrange
nonexistent_id = ObjectId()
# Act
result = await in_memory_repository.update_job_status(nonexistent_id, ProcessingStatus.COMPLETED)
# Assert
assert result is None
@pytest.mark.asyncio
async def test_i_cannot_update_job_status_with_pymongo_error(self, in_memory_repository, sample_document_id, mocker):
"""Test handling of PyMongo errors during job status update."""
# Arrange
created_job = await in_memory_repository.create_job(sample_document_id)
mocker.patch.object(in_memory_repository.collection, 'find_one_and_update',
side_effect=PyMongoError("Database error"))
# Act & Assert
with pytest.raises(JobRepositoryError) as exc_info:
await in_memory_repository.update_job_status(created_job.id, ProcessingStatus.COMPLETED)
assert "update_job_status" in str(exc_info.value)
class TestJobRepositoryDeletion:
"""Tests for job deletion functionality."""
@pytest.mark.asyncio
async def test_i_can_delete_existing_job(self, in_memory_repository, sample_document_id):
"""Test successful job deletion."""
# Arrange
created_job = await in_memory_repository.create_job(sample_document_id)
# Act
deletion_result = await in_memory_repository.delete_job(created_job.id)
# Assert
assert deletion_result is True
# Verify job is actually deleted
found_job = await in_memory_repository.find_job_by_id(created_job.id)
assert found_job is None
@pytest.mark.asyncio
async def test_i_cannot_delete_nonexistent_job(self, in_memory_repository):
"""Test that deleting nonexistent job returns False."""
# Arrange
nonexistent_id = ObjectId()
# Act
result = await in_memory_repository.delete_job(nonexistent_id)
# Assert
assert result is False
@pytest.mark.asyncio
async def test_i_cannot_delete_job_with_pymongo_error(self, in_memory_repository, sample_document_id, mocker):
"""Test handling of PyMongo errors during job deletion."""
# Arrange
created_job = await in_memory_repository.create_job(sample_document_id)
mocker.patch.object(in_memory_repository.collection, 'delete_one', side_effect=PyMongoError("Database error"))
# Act & Assert
with pytest.raises(JobRepositoryError) as exc_info:
await in_memory_repository.delete_job(created_job.id)
assert "delete_job" in str(exc_info.value)
class TestJobRepositoryComplexScenarios:
"""Tests for complex job repository scenarios."""
@pytest.mark.asyncio
async def test_i_can_handle_complete_job_lifecycle(self, in_memory_repository, sample_document_id, sample_task_id):
"""Test complete job lifecycle from creation to completion."""
# Create job
job = await in_memory_repository.create_job(sample_document_id, sample_task_id)
assert job.status == ProcessingStatus.PENDING
assert job.started_at is None
assert job.completed_at is None
# Start processing
job = await in_memory_repository.update_job_status(job.id, ProcessingStatus.PROCESSING)
assert job.status == ProcessingStatus.PROCESSING
assert job.started_at is not None
assert job.completed_at is None
# Complete job
job = await in_memory_repository.update_job_status(job.id, ProcessingStatus.COMPLETED)
assert job.status == ProcessingStatus.COMPLETED
assert job.started_at is not None
assert job.completed_at is not None
assert job.error_message is None
@pytest.mark.asyncio
async def test_i_can_handle_job_failure_scenario(self, in_memory_repository, sample_document_id, sample_task_id):
"""Test job failure scenario with error message."""
# Create and start job
job = await in_memory_repository.create_job(sample_document_id, sample_task_id)
job = await in_memory_repository.update_job_status(job.id, ProcessingStatus.PROCESSING)
# Fail job with error
error_msg = "File format not supported"
job = await in_memory_repository.update_job_status(job.id, ProcessingStatus.FAILED, error_msg)
# Assert failure state
assert job.status == ProcessingStatus.FAILED
assert job.started_at is not None
assert job.completed_at is not None
assert job.error_message == error_msg
@pytest.mark.asyncio
async def test_i_can_handle_multiple_documents_with_different_statuses(self, in_memory_repository):
"""Test managing multiple jobs for different documents with various statuses."""
# Create jobs for different documents
doc1 = PyObjectId()
doc2 = PyObjectId()
doc3 = PyObjectId()
job1 = await in_memory_repository.create_job(doc1, "task-1")
job2 = await in_memory_repository.create_job(doc2, "task-2")
job3 = await in_memory_repository.create_job(doc3, "task-3")
# Update to different statuses
await in_memory_repository.update_job_status(job1.id, ProcessingStatus.PROCESSING)
await in_memory_repository.update_job_status(job2.id, ProcessingStatus.COMPLETED)
await in_memory_repository.update_job_status(job3.id, ProcessingStatus.FAILED, "Error occurred")
# Verify status queries
pending_jobs = await in_memory_repository.get_jobs_by_status(ProcessingStatus.PENDING)
processing_jobs = await in_memory_repository.get_jobs_by_status(ProcessingStatus.PROCESSING)
completed_jobs = await in_memory_repository.get_jobs_by_status(ProcessingStatus.COMPLETED)
failed_jobs = await in_memory_repository.get_jobs_by_status(ProcessingStatus.FAILED)
assert len(pending_jobs) == 0
assert len(processing_jobs) == 1
assert len(completed_jobs) == 1
assert len(failed_jobs) == 1
assert processing_jobs[0].id == job1.id
assert completed_jobs[0].id == job2.id
assert failed_jobs[0].id == job3.id

View File

@@ -0,0 +1,578 @@
"""
Unit tests for JobService using in-memory MongoDB.
Tests the business logic operations with real MongoDB operations
using mongomock for better integration testing.
"""
import pytest
import pytest_asyncio
from bson import ObjectId
from mongomock_motor import AsyncMongoMockClient
from app.exceptions.job_exceptions import InvalidStatusTransitionError
from app.models.job import ProcessingStatus
from app.models.types import PyObjectId
from app.services.job_service import JobService
@pytest_asyncio.fixture
async def in_memory_database():
"""Create an in-memory database for testing."""
client = AsyncMongoMockClient()
return client.test_database
@pytest_asyncio.fixture
async def job_service(in_memory_database):
"""Create JobService with in-memory repositories."""
service = await JobService(in_memory_database).initialize()
return service
@pytest.fixture
def sample_document_id():
"""Sample file ObjectId."""
return PyObjectId()
@pytest.fixture
def sample_task_id():
"""Sample Celery task UUID."""
return "550e8400-e29b-41d4-a716-446655440000"
class TestCreateJob:
"""Tests for create_job method."""
@pytest.mark.asyncio
async def test_i_can_create_job_with_task_id(
self,
job_service,
sample_document_id,
sample_task_id
):
"""Test creating job with task ID."""
# Execute
result = await job_service.create_job(sample_document_id, sample_task_id)
# Verify job creation
assert result is not None
assert result.document_id == sample_document_id
assert result.task_id == sample_task_id
assert result.status == ProcessingStatus.PENDING
assert result.created_at is not None
assert result.started_at is None
assert result.error_message is None
# Verify job exists in database
job_in_db = await job_service.get_job_by_id(result.id)
assert job_in_db is not None
assert job_in_db.id == result.id
assert job_in_db.document_id == sample_document_id
assert job_in_db.task_id == sample_task_id
assert job_in_db.status == ProcessingStatus.PENDING
@pytest.mark.asyncio
async def test_i_can_create_job_without_task_id(
self,
job_service,
sample_document_id
):
"""Test creating job without task ID."""
# Execute
result = await job_service.create_job(sample_document_id)
# Verify job creation
assert result is not None
assert result.document_id == sample_document_id
assert result.task_id is None
assert result.status == ProcessingStatus.PENDING
assert result.created_at is not None
assert result.started_at is None
assert result.error_message is None
class TestGetJobMethods:
"""Tests for job retrieval methods."""
@pytest.mark.asyncio
async def test_i_can_get_job_by_id(
self,
job_service,
sample_document_id,
sample_task_id
):
"""Test retrieving job by ID."""
# Create a job first
created_job = await job_service.create_job(sample_document_id, sample_task_id)
# Execute
result = await job_service.get_job_by_id(created_job.id)
# Verify
assert result is not None
assert result.id == created_job.id
assert result.document_id == created_job.document_id
assert result.task_id == created_job.task_id
assert result.status == created_job.status
@pytest.mark.asyncio
async def test_i_can_get_jobs_by_status(
self,
job_service,
sample_document_id
):
"""Test retrieving jobs by status."""
# Create jobs with different statuses
pending_job = await job_service.create_job(sample_document_id, "pending-task")
processing_job = await job_service.create_job(ObjectId(), "processing-task")
await job_service.mark_job_as_started(processing_job.id)
completed_job = await job_service.create_job(ObjectId(), "completed-task")
await job_service.mark_job_as_started(completed_job.id)
await job_service.mark_job_as_completed(completed_job.id)
# Execute - get pending jobs
pending_results = await job_service.get_jobs_by_status(ProcessingStatus.PENDING)
# Verify
assert len(pending_results) == 1
assert pending_results[0].id == pending_job.id
assert pending_results[0].status == ProcessingStatus.PENDING
# Execute - get processing jobs
processing_results = await job_service.get_jobs_by_status(ProcessingStatus.PROCESSING)
assert len(processing_results) == 1
assert processing_results[0].status == ProcessingStatus.PROCESSING
# Execute - get completed jobs
completed_results = await job_service.get_jobs_by_status(ProcessingStatus.COMPLETED)
assert len(completed_results) == 1
assert completed_results[0].status == ProcessingStatus.COMPLETED
class TestUpdateStatus:
"""Tests for mark_job_as_started method."""
@pytest.mark.asyncio
async def test_i_can_mark_pending_job_as_started(
self,
job_service,
sample_document_id,
sample_task_id
):
"""Test marking pending job as started (PENDING → PROCESSING)."""
# Create a pending job
created_job = await job_service.create_job(sample_document_id, sample_task_id)
assert created_job.status == ProcessingStatus.PENDING
# Execute
result = await job_service.mark_job_as_started(created_job.id)
# Verify status transition
assert result is not None
assert result.id == created_job.id
assert result.status == ProcessingStatus.PROCESSING
# Verify in database
updated_job = await job_service.get_job_by_id(created_job.id)
assert updated_job.status == ProcessingStatus.PROCESSING
@pytest.mark.asyncio
async def test_i_cannot_mark_processing_job_as_started(
self,
job_service,
sample_document_id,
sample_task_id
):
"""Test that processing job cannot be marked as started."""
# Create and start a job
created_job = await job_service.create_job(sample_document_id, sample_task_id)
await job_service.mark_job_as_started(created_job.id)
# Try to start it again
with pytest.raises(InvalidStatusTransitionError) as exc_info:
await job_service.mark_job_as_started(created_job.id)
# Verify exception details
assert exc_info.value.current_status == ProcessingStatus.PROCESSING
assert exc_info.value.target_status == ProcessingStatus.PROCESSING
@pytest.mark.asyncio
async def test_i_cannot_mark_completed_job_as_started(
self,
job_service,
sample_document_id,
sample_task_id
):
"""Test that completed job cannot be marked as started."""
# Create, start, and complete a job
created_job = await job_service.create_job(sample_document_id, sample_task_id)
await job_service.mark_job_as_started(created_job.id)
await job_service.mark_job_as_completed(created_job.id)
# Try to start it again
with pytest.raises(InvalidStatusTransitionError) as exc_info:
await job_service.mark_job_as_started(created_job.id)
# Verify exception details
assert exc_info.value.current_status == ProcessingStatus.COMPLETED
assert exc_info.value.target_status == ProcessingStatus.PROCESSING
@pytest.mark.asyncio
async def test_i_cannot_mark_failed_job_as_started(
self,
job_service,
sample_document_id,
sample_task_id
):
"""Test that failed job cannot be marked as started."""
# Create, start, and fail a job
created_job = await job_service.create_job(sample_document_id, sample_task_id)
await job_service.mark_job_as_started(created_job.id)
await job_service.mark_job_as_failed(created_job.id, "Test error")
# Try to start it again
with pytest.raises(InvalidStatusTransitionError) as exc_info:
await job_service.mark_job_as_started(created_job.id)
# Verify exception details
assert exc_info.value.current_status == ProcessingStatus.FAILED
assert exc_info.value.target_status == ProcessingStatus.PROCESSING
@pytest.mark.asyncio
async def test_i_can_mark_processing_job_as_completed(
self,
job_service,
sample_document_id,
sample_task_id
):
"""Test marking processing job as completed (PROCESSING → COMPLETED)."""
# Create and start a job
created_job = await job_service.create_job(sample_document_id, sample_task_id)
started_job = await job_service.mark_job_as_started(created_job.id)
# Execute
result = await job_service.mark_job_as_completed(created_job.id)
# Verify status transition
assert result is not None
assert result.id == created_job.id
assert result.status == ProcessingStatus.COMPLETED
# Verify in database
updated_job = await job_service.get_job_by_id(created_job.id)
assert updated_job.status == ProcessingStatus.COMPLETED
@pytest.mark.asyncio
async def test_i_cannot_mark_pending_job_as_completed(
self,
job_service,
sample_document_id,
sample_task_id
):
"""Test that pending job cannot be marked as completed."""
# Create a pending job
created_job = await job_service.create_job(sample_document_id, sample_task_id)
# Try to complete it directly
with pytest.raises(InvalidStatusTransitionError) as exc_info:
await job_service.mark_job_as_completed(created_job.id)
# Verify exception details
assert exc_info.value.current_status == ProcessingStatus.PENDING
assert exc_info.value.target_status == ProcessingStatus.COMPLETED
@pytest.mark.asyncio
async def test_i_cannot_mark_completed_job_as_completed(
self,
job_service,
sample_document_id,
sample_task_id
):
"""Test that completed job cannot be marked as completed again."""
# Create, start, and complete a job
created_job = await job_service.create_job(sample_document_id, sample_task_id)
await job_service.mark_job_as_started(created_job.id)
await job_service.mark_job_as_completed(created_job.id)
# Try to complete it again
with pytest.raises(InvalidStatusTransitionError) as exc_info:
await job_service.mark_job_as_completed(created_job.id)
# Verify exception details
assert exc_info.value.current_status == ProcessingStatus.COMPLETED
assert exc_info.value.target_status == ProcessingStatus.COMPLETED
@pytest.mark.asyncio
async def test_i_cannot_mark_failed_job_as_completed(
self,
job_service,
sample_document_id,
sample_task_id
):
"""Test that failed job cannot be marked as completed."""
# Create, start, and fail a job
created_job = await job_service.create_job(sample_document_id, sample_task_id)
await job_service.mark_job_as_started(created_job.id)
await job_service.mark_job_as_failed(created_job.id, "Test error")
# Try to complete it
with pytest.raises(InvalidStatusTransitionError) as exc_info:
await job_service.mark_job_as_completed(created_job.id)
# Verify exception details
assert exc_info.value.current_status == ProcessingStatus.FAILED
assert exc_info.value.target_status == ProcessingStatus.COMPLETED
@pytest.mark.asyncio
async def test_i_can_mark_processing_job_as_failed_with_error_message(
self,
job_service,
sample_document_id,
sample_task_id
):
"""Test marking processing job as failed with error message."""
# Create and start a job
created_job = await job_service.create_job(sample_document_id, sample_task_id)
started_job = await job_service.mark_job_as_started(created_job.id)
error_message = "Processing failed due to invalid file format"
# Execute
result = await job_service.mark_job_as_failed(created_job.id, error_message)
# Verify status transition
assert result is not None
assert result.id == created_job.id
assert result.status == ProcessingStatus.FAILED
assert result.error_message == error_message
# Verify in database
updated_job = await job_service.get_job_by_id(created_job.id)
assert updated_job.status == ProcessingStatus.FAILED
assert updated_job.error_message == error_message
@pytest.mark.asyncio
async def test_i_can_mark_processing_job_as_failed_without_error_message(
self,
job_service,
sample_document_id,
sample_task_id
):
"""Test marking processing job as failed without error message."""
# Create and start a job
created_job = await job_service.create_job(sample_document_id, sample_task_id)
await job_service.mark_job_as_started(created_job.id)
# Execute without error message
result = await job_service.mark_job_as_failed(created_job.id)
# Verify status transition
assert result is not None
assert result.status == ProcessingStatus.FAILED
assert result.error_message is None
@pytest.mark.asyncio
async def test_i_cannot_mark_pending_job_as_failed(
self,
job_service,
sample_document_id,
sample_task_id
):
"""Test that pending job cannot be marked as failed."""
# Create a pending job
created_job = await job_service.create_job(sample_document_id, sample_task_id)
# Try to fail it directly
with pytest.raises(InvalidStatusTransitionError) as exc_info:
await job_service.mark_job_as_failed(created_job.id, "Test error")
# Verify exception details
assert exc_info.value.current_status == ProcessingStatus.PENDING
assert exc_info.value.target_status == ProcessingStatus.FAILED
@pytest.mark.asyncio
async def test_i_cannot_mark_completed_job_as_failed(
self,
job_service,
sample_document_id,
sample_task_id
):
"""Test that completed job cannot be marked as failed."""
# Create, start, and complete a job
created_job = await job_service.create_job(sample_document_id, sample_task_id)
await job_service.mark_job_as_started(created_job.id)
await job_service.mark_job_as_completed(created_job.id)
# Try to fail it
with pytest.raises(InvalidStatusTransitionError) as exc_info:
await job_service.mark_job_as_failed(created_job.id, "Test error")
# Verify exception details
assert exc_info.value.current_status == ProcessingStatus.COMPLETED
assert exc_info.value.target_status == ProcessingStatus.FAILED
@pytest.mark.asyncio
async def test_i_cannot_mark_failed_job_as_failed(
self,
job_service,
sample_document_id,
sample_task_id
):
"""Test that failed job cannot be marked as failed again."""
# Create, start, and fail a job
created_job = await job_service.create_job(sample_document_id, sample_task_id)
await job_service.mark_job_as_started(created_job.id)
await job_service.mark_job_as_failed(created_job.id, "First error")
# Try to fail it again
with pytest.raises(InvalidStatusTransitionError) as exc_info:
await job_service.mark_job_as_failed(created_job.id, "Second error")
# Verify exception details
assert exc_info.value.current_status == ProcessingStatus.FAILED
assert exc_info.value.target_status == ProcessingStatus.FAILED
class TestDeleteJob:
"""Tests for delete_job method."""
@pytest.mark.asyncio
async def test_i_can_delete_existing_job(
self,
job_service,
sample_document_id,
sample_task_id
):
"""Test deleting an existing job."""
# Create a job
created_job = await job_service.create_job(sample_document_id, sample_task_id)
# Verify job exists
job_before_delete = await job_service.get_job_by_id(created_job.id)
assert job_before_delete is not None
# Execute deletion
result = await job_service.delete_job(created_job.id)
# Verify deletion
assert result is True
# Verify job no longer exists
deleted_job = await job_service.get_job_by_id(created_job.id)
assert deleted_job is None
@pytest.mark.asyncio
async def test_i_cannot_delete_nonexistent_job(
self,
job_service
):
"""Test deleting a nonexistent job returns False."""
# Execute deletion with random ObjectId
result = await job_service.delete_job(ObjectId())
# Verify
assert result is False
class TestStatusTransitionValidation:
"""Tests for status transition validation across different scenarios."""
@pytest.mark.asyncio
async def test_valid_job_lifecycle_flow(
self,
job_service,
sample_document_id,
sample_task_id
):
"""Test complete valid job lifecycle: PENDING → PROCESSING → COMPLETED."""
# Create job (PENDING)
job = await job_service.create_job(sample_document_id, sample_task_id)
assert job.status == ProcessingStatus.PENDING
# Start job (PENDING → PROCESSING)
started_job = await job_service.mark_job_as_started(job.id)
assert started_job.status == ProcessingStatus.PROCESSING
# Complete job (PROCESSING → COMPLETED)
completed_job = await job_service.mark_job_as_completed(job.id)
assert completed_job.status == ProcessingStatus.COMPLETED
@pytest.mark.asyncio
async def test_valid_job_failure_flow(
self,
job_service,
sample_document_id,
sample_task_id
):
"""Test valid job failure: PENDING → PROCESSING → FAILED."""
# Create job (PENDING)
job = await job_service.create_job(sample_document_id, sample_task_id)
assert job.status == ProcessingStatus.PENDING
# Start job (PENDING → PROCESSING)
started_job = await job_service.mark_job_as_started(job.id)
assert started_job.status == ProcessingStatus.PROCESSING
# Fail job (PROCESSING → FAILED)
failed_job = await job_service.mark_job_as_failed(job.id, "Test failure")
assert failed_job.status == ProcessingStatus.FAILED
assert failed_job.error_message == "Test failure"
class TestEdgeCases:
"""Tests for edge cases and error conditions."""
#
# @pytest.mark.asyncio
# async def test_multiple_jobs_for_same_file(
# self,
# job_service,
# sample_document_id
# ):
# """Test handling multiple jobs for the same file."""
# # Create multiple jobs for same file
# job1 = await job_service.create_job(sample_document_id, "task-1")
# job2 = await job_service.create_job(sample_document_id, "task-2")
# job3 = await job_service.create_job(sample_document_id, "task-3")
#
# # Verify all jobs exist and are independent
# jobs_for_file = await job_service.get_jobs_by_file_id(sample_document_id)
# assert len(jobs_for_file) == 3
#
# job_ids = [job.id for job in jobs_for_file]
# assert job1.id in job_ids
# assert job2.id in job_ids
# assert job3.id in job_ids
#
# # Verify status transitions work independently
# await job_service.mark_job_as_started(job1.id)
# await job_service.mark_job_as_completed(job1.id)
#
# # Other jobs should still be pending
# updated_job2 = await job_service.get_job_by_id(job2.id)
# updated_job3 = await job_service.get_job_by_id(job3.id)
#
# assert updated_job2.status == ProcessingStatus.PENDING
# assert updated_job3.status == ProcessingStatus.PENDING
@pytest.mark.asyncio
async def test_job_operations_with_empty_database(
self,
job_service
):
"""Test job operations when database is empty."""
# Try to get nonexistent job
result = await job_service.get_job_by_id(ObjectId())
assert result is None
# Try to get jobs by status when none exist
pending_jobs = await job_service.get_jobs_by_status(ProcessingStatus.PENDING)
assert pending_jobs == []
# Try to delete nonexistent job
delete_result = await job_service.delete_job(ObjectId())
assert delete_result is False