Compare commits
2 Commits
9564cfadd5
...
34f7854b3c
| Author | SHA1 | Date | |
|---|---|---|---|
| 34f7854b3c | |||
| 98c43feadf |
2
.gitignore
vendored
2
.gitignore
vendored
@@ -1,3 +1,5 @@
|
|||||||
|
volumes
|
||||||
|
|
||||||
# Byte-compiled / optimized / DLL files
|
# Byte-compiled / optimized / DLL files
|
||||||
__pycache__/
|
__pycache__/
|
||||||
*.py[codz]
|
*.py[codz]
|
||||||
|
|||||||
69
Readme.md
69
Readme.md
@@ -95,8 +95,8 @@ MyDocManager/
|
|||||||
│ │ ├── requirements.txt
|
│ │ ├── requirements.txt
|
||||||
│ │ ├── app/
|
│ │ ├── app/
|
||||||
│ │ │ ├── main.py
|
│ │ │ ├── main.py
|
||||||
│ │ │ ├── file_watcher.py
|
│ │ │ ├── file_watcher.py # FileWatcher class with observer thread
|
||||||
│ │ │ ├── celery_app.py
|
│ │ │ ├── celery_app.py # Celery Configuration
|
||||||
│ │ │ ├── config/
|
│ │ │ ├── config/
|
||||||
│ │ │ │ ├── __init__.py
|
│ │ │ │ ├── __init__.py
|
||||||
│ │ │ │ └── settings.py # JWT, MongoDB config
|
│ │ │ │ └── settings.py # JWT, MongoDB config
|
||||||
@@ -342,6 +342,71 @@ class ProcessingJob(BaseModel):
|
|||||||
4. TODO : Watchdog file monitoring implementation
|
4. TODO : Watchdog file monitoring implementation
|
||||||
5. TODO : FastAPI integration and startup coordination
|
5. TODO : FastAPI integration and startup coordination
|
||||||
|
|
||||||
|
## Job Management Layer
|
||||||
|
|
||||||
|
### Repository Pattern Implementation
|
||||||
|
|
||||||
|
The job management system follows the repository pattern for clean separation between data access and business logic.
|
||||||
|
|
||||||
|
#### JobRepository
|
||||||
|
|
||||||
|
Handles direct MongoDB operations for processing jobs:
|
||||||
|
|
||||||
|
**CRUD Operations:**
|
||||||
|
- `create_job()` - Create new processing job with automatic `created_at` timestamp
|
||||||
|
- `get_job_by_id()` - Retrieve job by ObjectId
|
||||||
|
- `update_job_status()` - Update job status with automatic timestamp management
|
||||||
|
- `delete_job()` - Remove job from database
|
||||||
|
- `get_jobs_by_file_id()` - Get all jobs for specific file
|
||||||
|
- `get_jobs_by_status()` - Get jobs filtered by processing status
|
||||||
|
|
||||||
|
**Automatic Timestamp Management:**
|
||||||
|
- `created_at`: Set automatically during job creation
|
||||||
|
- `started_at`: Set automatically when status changes to PROCESSING
|
||||||
|
- `completed_at`: Set automatically when status changes to COMPLETED or FAILED
|
||||||
|
|
||||||
|
#### JobService
|
||||||
|
|
||||||
|
Provides business logic layer with strict status transition validation:
|
||||||
|
|
||||||
|
**Status Transition Methods:**
|
||||||
|
- `mark_job_as_started()` - PENDING → PROCESSING
|
||||||
|
- `mark_job_as_completed()` - PROCESSING → COMPLETED
|
||||||
|
- `mark_job_as_failed()` - PROCESSING → FAILED
|
||||||
|
|
||||||
|
**Validation Rules:**
|
||||||
|
- Strict status transitions (invalid transitions raise exceptions)
|
||||||
|
- Job existence verification before any operation
|
||||||
|
- Automatic timestamp management through repository layer
|
||||||
|
|
||||||
|
#### Custom Exceptions
|
||||||
|
|
||||||
|
**JobNotFoundError**: Raised when job ID doesn't exist
|
||||||
|
**InvalidStatusTransitionError**: Raised for invalid status transitions
|
||||||
|
**JobRepositoryError**: Raised for MongoDB operation failures
|
||||||
|
|
||||||
|
#### Valid Status Transitions
|
||||||
|
|
||||||
|
```
|
||||||
|
PENDING → PROCESSING (via mark_job_as_started)
|
||||||
|
PROCESSING → COMPLETED (via mark_job_as_completed)
|
||||||
|
PROCESSING → FAILED (via mark_job_as_failed)
|
||||||
|
```
|
||||||
|
|
||||||
|
All other transitions are forbidden and will raise `InvalidStatusTransitionError`.
|
||||||
|
|
||||||
|
### File Structure
|
||||||
|
|
||||||
|
```
|
||||||
|
src/file-processor/app/
|
||||||
|
├── database/repositories/
|
||||||
|
│ └── job_repository.py # JobRepository class
|
||||||
|
├── services/
|
||||||
|
│ └── job_service.py # JobService class
|
||||||
|
└── exceptions/
|
||||||
|
└── job_exceptions.py # Custom exceptions
|
||||||
|
```
|
||||||
|
|
||||||
### Processing Pipeline Features
|
### Processing Pipeline Features
|
||||||
|
|
||||||
- **Duplicate Detection**: SHA256 hashing prevents reprocessing same files
|
- **Duplicate Detection**: SHA256 hashing prevents reprocessing same files
|
||||||
|
|||||||
@@ -34,9 +34,10 @@ services:
|
|||||||
environment:
|
environment:
|
||||||
- REDIS_URL=redis://redis:6379/0
|
- REDIS_URL=redis://redis:6379/0
|
||||||
- MONGODB_URL=mongodb://admin:password123@mongodb:27017/mydocmanager?authSource=admin
|
- MONGODB_URL=mongodb://admin:password123@mongodb:27017/mydocmanager?authSource=admin
|
||||||
- PYTHONPATH=/app
|
- PYTHONPATH=/app:/tasks # Added /tasks to Python path
|
||||||
volumes:
|
volumes:
|
||||||
- ./src/file-processor:/app
|
- ./src/file-processor:/app
|
||||||
|
- ./src/worker/tasks:/app/tasks # <- Added: shared access to worker tasks
|
||||||
- ./volumes/watched_files:/watched_files
|
- ./volumes/watched_files:/watched_files
|
||||||
- ./volumes/objects:/objects
|
- ./volumes/objects:/objects
|
||||||
depends_on:
|
depends_on:
|
||||||
@@ -57,14 +58,15 @@ services:
|
|||||||
- MONGODB_URL=mongodb://admin:password123@mongodb:27017/mydocmanager?authSource=admin
|
- MONGODB_URL=mongodb://admin:password123@mongodb:27017/mydocmanager?authSource=admin
|
||||||
- PYTHONPATH=/app
|
- PYTHONPATH=/app
|
||||||
volumes:
|
volumes:
|
||||||
- ./src/worker/tasks:/app
|
- ./src/worker:/app
|
||||||
- ./volumes/watched_files:/watched_files
|
- ./volumes/watched_files:/watched_files
|
||||||
depends_on:
|
depends_on:
|
||||||
- redis
|
- redis
|
||||||
- mongodb
|
- mongodb
|
||||||
networks:
|
networks:
|
||||||
- mydocmanager-network
|
- mydocmanager-network
|
||||||
command: celery -A main worker --loglevel=info
|
command: celery -A tasks.main worker --loglevel=info
|
||||||
|
#command: celery -A main --loglevel=info # pour la production
|
||||||
|
|
||||||
volumes:
|
volumes:
|
||||||
mongodb-data:
|
mongodb-data:
|
||||||
|
|||||||
@@ -45,6 +45,7 @@ tzdata==2025.2
|
|||||||
uvicorn==0.35.0
|
uvicorn==0.35.0
|
||||||
uvloop==0.21.0
|
uvloop==0.21.0
|
||||||
vine==5.1.0
|
vine==5.1.0
|
||||||
|
watchdog==6.0.0
|
||||||
watchfiles==1.1.0
|
watchfiles==1.1.0
|
||||||
wcwidth==0.2.13
|
wcwidth==0.2.13
|
||||||
websockets==15.0.1
|
websockets==15.0.1
|
||||||
|
|||||||
@@ -3,6 +3,12 @@ FROM python:3.12-slim
|
|||||||
# Set working directory
|
# Set working directory
|
||||||
WORKDIR /app
|
WORKDIR /app
|
||||||
|
|
||||||
|
# Install libmagic
|
||||||
|
RUN apt-get update && apt-get install -y --no-install-recommends \
|
||||||
|
libmagic1 \
|
||||||
|
file \
|
||||||
|
&& rm -rf /var/lib/apt/lists/*
|
||||||
|
|
||||||
# Copy requirements and install dependencies
|
# Copy requirements and install dependencies
|
||||||
COPY requirements.txt .
|
COPY requirements.txt .
|
||||||
RUN pip install --no-cache-dir -r requirements.txt
|
RUN pip install --no-cache-dir -r requirements.txt
|
||||||
|
|||||||
@@ -6,7 +6,6 @@ using simple os.getenv() approach without external validation libraries.
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
import os
|
import os
|
||||||
from typing import Optional
|
|
||||||
|
|
||||||
|
|
||||||
def get_mongodb_url() -> str:
|
def get_mongodb_url() -> str:
|
||||||
@@ -51,15 +50,6 @@ def get_jwt_secret_key() -> str:
|
|||||||
raise ValueError("JWT_SECRET environment variable must be set in production")
|
raise ValueError("JWT_SECRET environment variable must be set in production")
|
||||||
return secret
|
return secret
|
||||||
|
|
||||||
def get_objects_folder() -> str:
|
|
||||||
"""
|
|
||||||
Get Vault path from environment variables.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
str: Vault path
|
|
||||||
"""
|
|
||||||
return os.getenv("OBJECTS_FOLDER", "/objects")
|
|
||||||
|
|
||||||
|
|
||||||
def get_jwt_algorithm() -> str:
|
def get_jwt_algorithm() -> str:
|
||||||
"""
|
"""
|
||||||
@@ -91,4 +81,19 @@ def is_development_environment() -> bool:
|
|||||||
Returns:
|
Returns:
|
||||||
bool: True if development environment
|
bool: True if development environment
|
||||||
"""
|
"""
|
||||||
return os.getenv("ENVIRONMENT", "development").lower() == "development"
|
return os.getenv("ENVIRONMENT", "development").lower() == "development"
|
||||||
|
|
||||||
|
|
||||||
|
def get_objects_folder() -> str:
|
||||||
|
"""
|
||||||
|
Get Vault path from environment variables.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
str: Vault path
|
||||||
|
"""
|
||||||
|
return os.getenv("OBJECTS_FOLDER", "/objects")
|
||||||
|
|
||||||
|
|
||||||
|
def watch_directory() -> str:
|
||||||
|
"""Directory to monitor for new files"""
|
||||||
|
return os.getenv("WATCH_DIRECTORY", "/watched_files")
|
||||||
|
|||||||
229
src/file-processor/app/database/repositories/job_repository.py
Normal file
229
src/file-processor/app/database/repositories/job_repository.py
Normal file
@@ -0,0 +1,229 @@
|
|||||||
|
"""
|
||||||
|
Repository for managing processing jobs in MongoDB.
|
||||||
|
|
||||||
|
This module provides data access layer for ProcessingJob operations
|
||||||
|
with automatic timestamp management and error handling.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from datetime import datetime
|
||||||
|
from typing import List, Optional
|
||||||
|
|
||||||
|
from motor.motor_asyncio import AsyncIOMotorCollection, AsyncIOMotorDatabase
|
||||||
|
from pymongo.errors import PyMongoError
|
||||||
|
|
||||||
|
from app.exceptions.job_exceptions import JobRepositoryError
|
||||||
|
from app.models.job import ProcessingJob, ProcessingStatus
|
||||||
|
from app.models.types import PyObjectId
|
||||||
|
|
||||||
|
|
||||||
|
class JobRepository:
|
||||||
|
"""
|
||||||
|
Repository for processing job data access operations.
|
||||||
|
|
||||||
|
Provides CRUD operations for ProcessingJob documents with automatic
|
||||||
|
timestamp management and proper error handling.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, database: AsyncIOMotorDatabase):
|
||||||
|
"""Initialize repository with MongoDB collection reference."""
|
||||||
|
self.db = database
|
||||||
|
self.collection: AsyncIOMotorCollection = self.db.processing_jobs
|
||||||
|
|
||||||
|
async def _ensure_indexes(self):
|
||||||
|
"""
|
||||||
|
Ensure required database indexes exist.
|
||||||
|
|
||||||
|
Creates unique index on username field to prevent duplicates.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
await self.collection.create_index("document_id", unique=True)
|
||||||
|
except PyMongoError:
|
||||||
|
# Index might already exist, ignore error
|
||||||
|
pass
|
||||||
|
|
||||||
|
async def initialize(self):
|
||||||
|
"""
|
||||||
|
Initialize repository by ensuring required indexes exist.
|
||||||
|
|
||||||
|
Should be called after repository instantiation to setup database indexes.
|
||||||
|
"""
|
||||||
|
await self._ensure_indexes()
|
||||||
|
return self
|
||||||
|
|
||||||
|
async def create_job(self, document_id: PyObjectId, task_id: Optional[str] = None) -> ProcessingJob:
|
||||||
|
"""
|
||||||
|
Create a new processing job.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
file_id: Reference to the file document
|
||||||
|
task_id: Optional Celery task UUID
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
The created ProcessingJob
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
JobRepositoryError: If database operation fails
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
job_data = {
|
||||||
|
"document_id": document_id,
|
||||||
|
"status": ProcessingStatus.PENDING,
|
||||||
|
"task_id": task_id,
|
||||||
|
"created_at": datetime.now(),
|
||||||
|
"started_at": None,
|
||||||
|
"completed_at": None,
|
||||||
|
"error_message": None
|
||||||
|
}
|
||||||
|
|
||||||
|
result = await self.collection.insert_one(job_data)
|
||||||
|
job_data["_id"] = result.inserted_id
|
||||||
|
|
||||||
|
return ProcessingJob(**job_data)
|
||||||
|
|
||||||
|
except PyMongoError as e:
|
||||||
|
raise JobRepositoryError("create_job", e)
|
||||||
|
|
||||||
|
async def find_job_by_id(self, job_id: PyObjectId) -> Optional[ProcessingJob]:
|
||||||
|
"""
|
||||||
|
Retrieve a job by its ID.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
job_id: The job ObjectId
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
The ProcessingJob document
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
JobNotFoundError: If job doesn't exist
|
||||||
|
JobRepositoryError: If database operation fails
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
job_data = await self.collection.find_one({"_id": job_id})
|
||||||
|
if job_data:
|
||||||
|
return ProcessingJob(**job_data)
|
||||||
|
|
||||||
|
return None
|
||||||
|
|
||||||
|
except PyMongoError as e:
|
||||||
|
raise JobRepositoryError("get_job_by_id", e)
|
||||||
|
|
||||||
|
async def update_job_status(
|
||||||
|
self,
|
||||||
|
job_id: PyObjectId,
|
||||||
|
status: ProcessingStatus,
|
||||||
|
error_message: Optional[str] = None
|
||||||
|
) -> Optional[ProcessingJob]:
|
||||||
|
"""
|
||||||
|
Update job status with automatic timestamp management.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
job_id: The job ObjectId
|
||||||
|
status: New processing status
|
||||||
|
error_message: Optional error message for failed jobs
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
The updated ProcessingJob
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
JobNotFoundError: If job doesn't exist
|
||||||
|
JobRepositoryError: If database operation fails
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
# Prepare update data
|
||||||
|
update_data = {"status": status}
|
||||||
|
|
||||||
|
# Set appropriate timestamp based on status
|
||||||
|
current_time = datetime.now()
|
||||||
|
if status == ProcessingStatus.PROCESSING:
|
||||||
|
update_data["started_at"] = current_time
|
||||||
|
elif status in (ProcessingStatus.COMPLETED, ProcessingStatus.FAILED):
|
||||||
|
update_data["completed_at"] = current_time
|
||||||
|
|
||||||
|
# Add error message if provided
|
||||||
|
if error_message is not None:
|
||||||
|
update_data["error_message"] = error_message
|
||||||
|
|
||||||
|
result = await self.collection.find_one_and_update(
|
||||||
|
{"_id": job_id},
|
||||||
|
{"$set": update_data},
|
||||||
|
return_document=True
|
||||||
|
)
|
||||||
|
|
||||||
|
if result:
|
||||||
|
return ProcessingJob(**result)
|
||||||
|
|
||||||
|
return None
|
||||||
|
|
||||||
|
except PyMongoError as e:
|
||||||
|
raise JobRepositoryError("update_job_status", e)
|
||||||
|
|
||||||
|
async def delete_job(self, job_id: PyObjectId) -> bool:
|
||||||
|
"""
|
||||||
|
Delete a job from the database.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
job_id: The job ObjectId
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
True if job was deleted, False if not found
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
JobRepositoryError: If database operation fails
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
result = await self.collection.delete_one({"_id": job_id})
|
||||||
|
|
||||||
|
return result.deleted_count > 0
|
||||||
|
|
||||||
|
except PyMongoError as e:
|
||||||
|
raise JobRepositoryError("delete_job", e)
|
||||||
|
|
||||||
|
async def find_jobs_by_document_id(self, document_id: PyObjectId) -> List[ProcessingJob]:
|
||||||
|
"""
|
||||||
|
Retrieve all jobs for a specific file.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
document_id: The file ObjectId
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
List of ProcessingJob documents
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
JobRepositoryError: If database operation fails
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
cursor = self.collection.find({"document_id": document_id})
|
||||||
|
|
||||||
|
jobs = []
|
||||||
|
async for job_data in cursor:
|
||||||
|
jobs.append(ProcessingJob(**job_data))
|
||||||
|
|
||||||
|
return jobs
|
||||||
|
|
||||||
|
except PyMongoError as e:
|
||||||
|
raise JobRepositoryError("get_jobs_by_file_id", e)
|
||||||
|
|
||||||
|
async def get_jobs_by_status(self, status: ProcessingStatus) -> List[ProcessingJob]:
|
||||||
|
"""
|
||||||
|
Retrieve all jobs with a specific status.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
status: The processing status to filter by
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
List of ProcessingJob documents
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
JobRepositoryError: If database operation fails
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
cursor = self.collection.find({"status": status})
|
||||||
|
|
||||||
|
jobs = []
|
||||||
|
async for job_data in cursor:
|
||||||
|
jobs.append(ProcessingJob(**job_data))
|
||||||
|
|
||||||
|
return jobs
|
||||||
|
|
||||||
|
except PyMongoError as e:
|
||||||
|
raise JobRepositoryError("get_jobs_by_status", e)
|
||||||
0
src/file-processor/app/exceptions/__init__.py
Normal file
0
src/file-processor/app/exceptions/__init__.py
Normal file
38
src/file-processor/app/exceptions/job_exceptions.py
Normal file
38
src/file-processor/app/exceptions/job_exceptions.py
Normal file
@@ -0,0 +1,38 @@
|
|||||||
|
"""
|
||||||
|
Custom exceptions for job management operations.
|
||||||
|
|
||||||
|
This module defines specific exceptions for job processing lifecycle
|
||||||
|
and repository operations to provide clear error handling.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from app.models.job import ProcessingStatus
|
||||||
|
|
||||||
|
|
||||||
|
class InvalidStatusTransitionError(Exception):
|
||||||
|
"""
|
||||||
|
Raised when an invalid status transition is attempted.
|
||||||
|
|
||||||
|
This exception indicates that an attempt was made to change a job's
|
||||||
|
status to an invalid target status given the current status.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, current_status: ProcessingStatus, target_status: ProcessingStatus):
|
||||||
|
self.current_status = current_status
|
||||||
|
self.target_status = target_status
|
||||||
|
super().__init__(
|
||||||
|
f"Invalid status transition from '{current_status}' to '{target_status}'"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class JobRepositoryError(Exception):
|
||||||
|
"""
|
||||||
|
Raised when a MongoDB operation fails in the job repository.
|
||||||
|
|
||||||
|
This exception wraps database-related errors that occur during
|
||||||
|
job repository operations.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, operation: str, original_error: Exception):
|
||||||
|
self.operation = operation
|
||||||
|
self.original_error = original_error
|
||||||
|
super().__init__(f"Repository operation '{operation}' failed: {str(original_error)}")
|
||||||
241
src/file-processor/app/file_watcher.py
Normal file
241
src/file-processor/app/file_watcher.py
Normal file
@@ -0,0 +1,241 @@
|
|||||||
|
"""
|
||||||
|
File watcher implementation with Watchdog observer and ProcessingJob management.
|
||||||
|
|
||||||
|
This module provides real-time file monitoring for document processing.
|
||||||
|
When a file is created in the watched directory, it:
|
||||||
|
1. Creates a document record via DocumentService
|
||||||
|
2. Dispatches a Celery task for processing
|
||||||
|
3. Creates a ProcessingJob to track the task lifecycle
|
||||||
|
"""
|
||||||
|
|
||||||
|
import logging
|
||||||
|
import threading
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
|
from watchdog.events import FileSystemEventHandler, FileCreatedEvent
|
||||||
|
from watchdog.observers import Observer
|
||||||
|
|
||||||
|
from app.services.document_service import DocumentService
|
||||||
|
from app.services.job_service import JobService
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class DocumentFileEventHandler(FileSystemEventHandler):
|
||||||
|
"""
|
||||||
|
Event handler for document file creation events.
|
||||||
|
|
||||||
|
Processes newly created files by creating document records,
|
||||||
|
dispatching Celery tasks, and managing processing jobs.
|
||||||
|
"""
|
||||||
|
|
||||||
|
SUPPORTED_EXTENSIONS = {'.txt', '.pdf', '.docx'}
|
||||||
|
|
||||||
|
def __init__(self, document_service: DocumentService, job_service: JobService):
|
||||||
|
"""
|
||||||
|
Initialize the event handler.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
document_service: Service for document management
|
||||||
|
job_service: Service for processing job management
|
||||||
|
"""
|
||||||
|
super().__init__()
|
||||||
|
self.document_service = document_service
|
||||||
|
self.job_service = job_service
|
||||||
|
|
||||||
|
def on_created(self, event: FileCreatedEvent) -> None:
|
||||||
|
"""
|
||||||
|
Handle file creation events.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
event: File system event containing file path information
|
||||||
|
"""
|
||||||
|
if event.is_directory:
|
||||||
|
return
|
||||||
|
|
||||||
|
filepath = event.src_path
|
||||||
|
file_extension = Path(filepath).suffix.lower()
|
||||||
|
|
||||||
|
if file_extension not in self.SUPPORTED_EXTENSIONS:
|
||||||
|
logger.info(f"Ignoring unsupported file type: {filepath}")
|
||||||
|
return
|
||||||
|
|
||||||
|
logger.info(f"Processing new file: {filepath}")
|
||||||
|
|
||||||
|
try:
|
||||||
|
from tasks.document_processing import process_document
|
||||||
|
celery_result = process_document.delay(filepath)
|
||||||
|
celery_task_id = celery_result.id
|
||||||
|
logger.info(f"Dispatched Celery task with ID: {celery_task_id}")
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Failed to process file {filepath}: {str(e)}")
|
||||||
|
# Note: We don't re-raise the exception to keep the watcher running
|
||||||
|
|
||||||
|
|
||||||
|
class FileWatcher:
|
||||||
|
"""
|
||||||
|
File system watcher for automatic document processing.
|
||||||
|
|
||||||
|
Monitors a directory for new files and triggers processing pipeline
|
||||||
|
using a dedicated observer thread.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
watch_directory: str,
|
||||||
|
document_service: DocumentService,
|
||||||
|
job_service: JobService,
|
||||||
|
recursive: bool = True
|
||||||
|
):
|
||||||
|
"""
|
||||||
|
Initialize the file watcher.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
watch_directory: Directory path to monitor
|
||||||
|
document_service: Service for document management
|
||||||
|
job_service: Service for processing job management
|
||||||
|
recursive: Whether to watch subdirectories recursively
|
||||||
|
"""
|
||||||
|
self.watch_directory = Path(watch_directory)
|
||||||
|
self.recursive = recursive
|
||||||
|
self.observer: Optional[Observer] = None
|
||||||
|
self._observer_thread: Optional[threading.Thread] = None
|
||||||
|
self._stop_event = threading.Event()
|
||||||
|
|
||||||
|
# Validate watch directory
|
||||||
|
if not self.watch_directory.exists():
|
||||||
|
raise ValueError(f"Watch directory does not exist: {watch_directory}")
|
||||||
|
|
||||||
|
if not self.watch_directory.is_dir():
|
||||||
|
raise ValueError(f"Watch path is not a directory: {watch_directory}")
|
||||||
|
|
||||||
|
# Create event handler
|
||||||
|
self.event_handler = DocumentFileEventHandler(
|
||||||
|
document_service=document_service,
|
||||||
|
job_service=job_service
|
||||||
|
)
|
||||||
|
|
||||||
|
logger.info(f"FileWatcher initialized for directory: {self.watch_directory}")
|
||||||
|
|
||||||
|
def start(self) -> None:
|
||||||
|
"""
|
||||||
|
Start the file watcher in a separate thread.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
RuntimeError: If the watcher is already running
|
||||||
|
"""
|
||||||
|
if self.is_running():
|
||||||
|
raise RuntimeError("FileWatcher is already running")
|
||||||
|
|
||||||
|
self.observer = Observer()
|
||||||
|
self.observer.schedule(
|
||||||
|
self.event_handler,
|
||||||
|
str(self.watch_directory),
|
||||||
|
recursive=self.recursive
|
||||||
|
)
|
||||||
|
|
||||||
|
# Start observer in separate thread
|
||||||
|
self._observer_thread = threading.Thread(
|
||||||
|
target=self._run_observer,
|
||||||
|
name="FileWatcher-Observer"
|
||||||
|
)
|
||||||
|
self._stop_event.clear()
|
||||||
|
self._observer_thread.start()
|
||||||
|
|
||||||
|
logger.info("FileWatcher started successfully")
|
||||||
|
|
||||||
|
def stop(self, timeout: float = 5.0) -> None:
|
||||||
|
"""
|
||||||
|
Stop the file watcher gracefully.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
timeout: Maximum time to wait for graceful shutdown
|
||||||
|
"""
|
||||||
|
if not self.is_running():
|
||||||
|
logger.warning("FileWatcher is not running")
|
||||||
|
return
|
||||||
|
|
||||||
|
logger.info("Stopping FileWatcher...")
|
||||||
|
|
||||||
|
# Signal stop and wait for observer thread
|
||||||
|
self._stop_event.set()
|
||||||
|
|
||||||
|
if self.observer:
|
||||||
|
self.observer.stop()
|
||||||
|
|
||||||
|
if self._observer_thread and self._observer_thread.is_alive():
|
||||||
|
self._observer_thread.join(timeout=timeout)
|
||||||
|
|
||||||
|
if self._observer_thread.is_alive():
|
||||||
|
logger.warning("FileWatcher thread did not stop gracefully within timeout")
|
||||||
|
else:
|
||||||
|
logger.info("FileWatcher stopped gracefully")
|
||||||
|
|
||||||
|
# Clean up
|
||||||
|
self.observer = None
|
||||||
|
self._observer_thread = None
|
||||||
|
|
||||||
|
def is_running(self) -> bool:
|
||||||
|
"""
|
||||||
|
Check if the file watcher is currently running.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
True if the watcher is running, False otherwise
|
||||||
|
"""
|
||||||
|
return (
|
||||||
|
self.observer is not None
|
||||||
|
and self._observer_thread is not None
|
||||||
|
and self._observer_thread.is_alive()
|
||||||
|
)
|
||||||
|
|
||||||
|
def _run_observer(self) -> None:
|
||||||
|
"""
|
||||||
|
Internal method to run the observer in a separate thread.
|
||||||
|
|
||||||
|
This method should not be called directly.
|
||||||
|
"""
|
||||||
|
if not self.observer:
|
||||||
|
logger.error("Observer not initialized")
|
||||||
|
return
|
||||||
|
|
||||||
|
try:
|
||||||
|
self.observer.start()
|
||||||
|
logger.info("Observer thread started")
|
||||||
|
|
||||||
|
# Keep the observer running until stop is requested
|
||||||
|
while not self._stop_event.is_set():
|
||||||
|
self._stop_event.wait(timeout=1.0)
|
||||||
|
|
||||||
|
logger.info("Observer thread stopping...")
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Observer thread error: {str(e)}")
|
||||||
|
finally:
|
||||||
|
if self.observer:
|
||||||
|
self.observer.join()
|
||||||
|
logger.info("Observer thread stopped")
|
||||||
|
|
||||||
|
|
||||||
|
def create_file_watcher(
|
||||||
|
watch_directory: str,
|
||||||
|
document_service: DocumentService,
|
||||||
|
job_service: JobService
|
||||||
|
) -> FileWatcher:
|
||||||
|
"""
|
||||||
|
Factory function to create a FileWatcher instance.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
watch_directory: Directory path to monitor
|
||||||
|
document_service: Service for document management
|
||||||
|
job_service: Service for processing job management
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Configured FileWatcher instance
|
||||||
|
"""
|
||||||
|
return FileWatcher(
|
||||||
|
watch_directory=watch_directory,
|
||||||
|
document_service=document_service,
|
||||||
|
job_service=job_service
|
||||||
|
)
|
||||||
@@ -1,205 +1,174 @@
|
|||||||
"""
|
"""
|
||||||
FastAPI application for MyDocManager file processor service.
|
FastAPI application with integrated FileWatcher for document processing.
|
||||||
|
|
||||||
This service provides API endpoints for health checks and task dispatching.
|
This module provides the main FastAPI application with:
|
||||||
|
- JWT authentication
|
||||||
|
- User management APIs
|
||||||
|
- Real-time file monitoring via FileWatcher
|
||||||
|
- Document processing via Celery tasks
|
||||||
"""
|
"""
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
import os
|
|
||||||
from contextlib import asynccontextmanager
|
from contextlib import asynccontextmanager
|
||||||
|
from typing import AsyncGenerator
|
||||||
|
|
||||||
import redis
|
from fastapi import FastAPI
|
||||||
from celery import Celery
|
from fastapi.middleware.cors import CORSMiddleware
|
||||||
from fastapi import FastAPI, HTTPException, Depends
|
|
||||||
from pydantic import BaseModel
|
|
||||||
|
|
||||||
from app.database.connection import test_database_connection, get_database
|
from app.config import settings
|
||||||
from app.database.repositories.user_repository import UserRepository
|
from app.database.connection import get_database
|
||||||
from app.models.user import UserCreate
|
from app.file_watcher import create_file_watcher, FileWatcher
|
||||||
|
from app.services.document_service import DocumentService
|
||||||
from app.services.init_service import InitializationService
|
from app.services.init_service import InitializationService
|
||||||
|
from app.services.job_service import JobService
|
||||||
from app.services.user_service import UserService
|
from app.services.user_service import UserService
|
||||||
|
|
||||||
|
# from api.routes.auth import router as auth_router
|
||||||
|
# from api.routes.users import router as users_router
|
||||||
|
# from api.routes.documents import router as documents_router
|
||||||
|
# from api.routes.jobs import router as jobs_router
|
||||||
|
|
||||||
|
|
||||||
# Configure logging
|
# Configure logging
|
||||||
logging.basicConfig(level=logging.INFO)
|
logging.basicConfig(level=logging.INFO)
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
# Global file watcher instance
|
||||||
|
file_watcher: FileWatcher = None
|
||||||
|
|
||||||
|
|
||||||
@asynccontextmanager
|
@asynccontextmanager
|
||||||
async def lifespan(app: FastAPI):
|
async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
|
||||||
"""
|
"""
|
||||||
Application lifespan manager for startup and shutdown tasks.
|
FastAPI lifespan context manager.
|
||||||
|
|
||||||
Handles initialization tasks that need to run when the application starts,
|
Handles application startup and shutdown events including:
|
||||||
including admin user creation and other setup procedures.
|
- Database connection
|
||||||
|
- Default admin user creation
|
||||||
|
- FileWatcher startup/shutdown
|
||||||
"""
|
"""
|
||||||
# Startup tasks
|
global file_watcher
|
||||||
|
|
||||||
|
# Startup
|
||||||
logger.info("Starting MyDocManager application...")
|
logger.info("Starting MyDocManager application...")
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# Initialize database connection
|
# Initialize database connection
|
||||||
database = get_database()
|
database = get_database()
|
||||||
|
logger.info("Database connection established")
|
||||||
|
|
||||||
# Initialize repositories and services
|
document_service = DocumentService(database=database, objects_folder=settings.get_objects_folder())
|
||||||
user_service = await UserService(database).initialize()
|
job_service = JobService(database=database)
|
||||||
|
user_service = UserService(database=database)
|
||||||
|
logger.info("Service created")
|
||||||
|
|
||||||
|
# Create default admin user
|
||||||
init_service = InitializationService(user_service)
|
init_service = InitializationService(user_service)
|
||||||
|
await init_service.initialize_application()
|
||||||
|
logger.info("Default admin user initialization completed")
|
||||||
|
|
||||||
# Run initialization tasks
|
# Create and start file watcher
|
||||||
initialization_result = await init_service.initialize_application()
|
file_watcher = create_file_watcher(
|
||||||
|
watch_directory=settings.watch_directory(),
|
||||||
|
document_service=document_service,
|
||||||
|
job_service=job_service
|
||||||
|
)
|
||||||
|
file_watcher.start()
|
||||||
|
logger.info(f"FileWatcher started for directory: {settings.watch_directory()}")
|
||||||
|
|
||||||
if initialization_result["initialization_success"]:
|
logger.info("Application startup completed successfully")
|
||||||
logger.info("Application startup completed successfully")
|
|
||||||
if initialization_result["admin_user_created"]:
|
yield
|
||||||
logger.info("Default admin user was created during startup")
|
|
||||||
else:
|
|
||||||
logger.error("Application startup completed with errors:")
|
|
||||||
for error in initialization_result["errors"]:
|
|
||||||
logger.error(f" - {error}")
|
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
raise e
|
logger.error(f"Application startup failed: {str(e)}")
|
||||||
logger.error(f"Critical error during application startup: {str(e)}")
|
raise
|
||||||
# You might want to decide if the app should continue or exit here
|
|
||||||
# For now, we log the error but continue
|
|
||||||
|
|
||||||
yield # Application is running
|
finally:
|
||||||
|
# Shutdown
|
||||||
# Shutdown tasks (if needed)
|
logger.info("Shutting down MyDocManager application...")
|
||||||
logger.info("Shutting down MyDocManager application...")
|
|
||||||
|
if file_watcher and file_watcher.is_running():
|
||||||
|
file_watcher.stop()
|
||||||
|
logger.info("FileWatcher stopped")
|
||||||
|
|
||||||
|
logger.info("Application shutdown completed")
|
||||||
|
|
||||||
|
|
||||||
# Initialize FastAPI app
|
# Create FastAPI application
|
||||||
app = FastAPI(
|
app = FastAPI(
|
||||||
title="MyDocManager File Processor",
|
title="MyDocManager",
|
||||||
description="File processing and task dispatch service",
|
description="Real-time document processing application with authentication",
|
||||||
version="1.0.0",
|
version="0.1.0",
|
||||||
lifespan=lifespan
|
lifespan=lifespan
|
||||||
)
|
)
|
||||||
|
|
||||||
# Environment variables
|
# Configure CORS
|
||||||
REDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379/0")
|
app.add_middleware(
|
||||||
MONGODB_URL = os.getenv("MONGODB_URL", "mongodb://localhost:27017")
|
CORSMiddleware,
|
||||||
|
allow_origins=["http://localhost:3000"], # React frontend
|
||||||
# Initialize Redis client
|
allow_credentials=True,
|
||||||
try:
|
allow_methods=["*"],
|
||||||
redis_client = redis.from_url(REDIS_URL)
|
allow_headers=["*"],
|
||||||
except Exception as e:
|
|
||||||
redis_client = None
|
|
||||||
print(f"Warning: Could not connect to Redis: {e}")
|
|
||||||
|
|
||||||
# Initialize Celery
|
|
||||||
celery_app = Celery(
|
|
||||||
"file_processor",
|
|
||||||
broker=REDIS_URL,
|
|
||||||
backend=REDIS_URL
|
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
# Pydantic models
|
# Include routers
|
||||||
class TestTaskRequest(BaseModel):
|
# app.include_router(auth_router, prefix="/auth", tags=["Authentication"])
|
||||||
"""Request model for test task."""
|
# app.include_router(users_router, prefix="/users", tags=["User Management"])
|
||||||
message: str
|
# app.include_router(documents_router, prefix="/documents", tags=["Documents"])
|
||||||
|
# app.include_router(jobs_router, prefix="/jobs", tags=["Processing Jobs"])
|
||||||
|
|
||||||
def get_user_service() -> UserService:
|
|
||||||
"""
|
|
||||||
Dependency to get user service instance.
|
|
||||||
|
|
||||||
This should be properly implemented with database connection management
|
|
||||||
in your actual application.
|
|
||||||
"""
|
|
||||||
database = get_database()
|
|
||||||
user_repository = UserRepository(database)
|
|
||||||
return UserService(user_repository)
|
|
||||||
|
|
||||||
|
|
||||||
# Your API routes would use the service like this:
|
|
||||||
@app.post("/api/users")
|
|
||||||
async def create_user(
|
|
||||||
user_data: UserCreate,
|
|
||||||
user_service: UserService = Depends(get_user_service)
|
|
||||||
):
|
|
||||||
return user_service.create_user(user_data)
|
|
||||||
|
|
||||||
|
|
||||||
@app.get("/health")
|
@app.get("/health")
|
||||||
async def health_check():
|
async def health_check():
|
||||||
"""
|
"""
|
||||||
Health check endpoint.
|
Health check endpoint.
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
dict: Service health status with dependencies
|
Dictionary containing application health status
|
||||||
"""
|
"""
|
||||||
health_status = {
|
return {
|
||||||
"status": "healthy",
|
"status": "healthy",
|
||||||
"service": "file-processor",
|
"service": "MyDocManager",
|
||||||
"dependencies": {
|
"version": "1.0.0",
|
||||||
"redis": "unknown",
|
"file_watcher_running": file_watcher.is_running() if file_watcher else False
|
||||||
"mongodb": "unknown"
|
|
||||||
},
|
|
||||||
}
|
}
|
||||||
|
|
||||||
# Check Redis connection
|
|
||||||
if redis_client:
|
|
||||||
try:
|
|
||||||
redis_client.ping()
|
|
||||||
health_status["dependencies"]["redis"] = "connected"
|
|
||||||
except Exception:
|
|
||||||
health_status["dependencies"]["redis"] = "disconnected"
|
|
||||||
health_status["status"] = "degraded"
|
|
||||||
|
|
||||||
# check MongoDB connection
|
|
||||||
if test_database_connection():
|
|
||||||
health_status["dependencies"]["mongodb"] = "connected"
|
|
||||||
else:
|
|
||||||
health_status["dependencies"]["mongodb"] = "disconnected"
|
|
||||||
|
|
||||||
return health_status
|
|
||||||
|
|
||||||
|
|
||||||
@app.post("/test-task")
|
|
||||||
async def dispatch_test_task(request: TestTaskRequest):
|
|
||||||
"""
|
|
||||||
Dispatch a test task to Celery worker.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
request: Test task request containing message
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
dict: Task dispatch information
|
|
||||||
|
|
||||||
Raises:
|
|
||||||
HTTPException: If task dispatch fails
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
# Send task to worker
|
|
||||||
task = celery_app.send_task(
|
|
||||||
"main.test_task",
|
|
||||||
args=[request.message]
|
|
||||||
)
|
|
||||||
|
|
||||||
return {
|
|
||||||
"status": "dispatched",
|
|
||||||
"task_id": task.id,
|
|
||||||
"message": f"Test task dispatched with message: {request.message}"
|
|
||||||
}
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
raise HTTPException(
|
|
||||||
status_code=500,
|
|
||||||
detail=f"Failed to dispatch task: {str(e)}"
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
@app.get("/")
|
@app.get("/")
|
||||||
async def root():
|
async def root():
|
||||||
"""
|
"""
|
||||||
Root endpoint.
|
Root endpoint with basic application information.
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
dict: Basic service information
|
Dictionary containing welcome message and available endpoints
|
||||||
"""
|
"""
|
||||||
return {
|
return {
|
||||||
"service": "MyDocManager File Processor",
|
"message": "Welcome to MyDocManager",
|
||||||
"version": "1.0.0",
|
"description": "Real-time document processing application",
|
||||||
"status": "running"
|
"docs": "/docs",
|
||||||
|
"health": "/health"
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@app.get("/watcher/status")
|
||||||
|
async def watcher_status():
|
||||||
|
"""
|
||||||
|
Get file watcher status.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Dictionary containing file watcher status information
|
||||||
|
"""
|
||||||
|
if not file_watcher:
|
||||||
|
return {
|
||||||
|
"status": "not_initialized",
|
||||||
|
"running": False
|
||||||
|
}
|
||||||
|
|
||||||
|
return {
|
||||||
|
"status": "initialized",
|
||||||
|
"running": file_watcher.is_running(),
|
||||||
|
"watch_directory": str(file_watcher.watch_directory),
|
||||||
|
"recursive": file_watcher.recursive
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -25,7 +25,7 @@ class ProcessingJob(BaseModel):
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
id: Optional[PyObjectId] = Field(default=None, alias="_id")
|
id: Optional[PyObjectId] = Field(default=None, alias="_id")
|
||||||
file_id: PyObjectId = Field(..., description="Reference to file document")
|
document_id: PyObjectId = Field(..., description="Reference to file document")
|
||||||
status: ProcessingStatus = Field(default=ProcessingStatus.PENDING, description="Current processing status")
|
status: ProcessingStatus = Field(default=ProcessingStatus.PENDING, description="Current processing status")
|
||||||
task_id: Optional[str] = Field(default=None, description="Celery task UUID")
|
task_id: Optional[str] = Field(default=None, description="Celery task UUID")
|
||||||
created_at: Optional[datetime] = Field(default=None, description="Timestamp when job was created")
|
created_at: Optional[datetime] = Field(default=None, description="Timestamp when job was created")
|
||||||
|
|||||||
@@ -95,6 +95,28 @@ class DocumentService:
|
|||||||
"""
|
"""
|
||||||
return magic.from_buffer(file_bytes, mime=True)
|
return magic.from_buffer(file_bytes, mime=True)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _read_file_bytes(file_path: str | Path) -> bytes:
|
||||||
|
"""
|
||||||
|
Read file content as bytes asynchronously.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
file_path (str | Path): Path of the file to read
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
bytes: Content of the file
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
FileNotFoundError: If the file does not exist
|
||||||
|
OSError: If any I/O error occurs
|
||||||
|
"""
|
||||||
|
path = Path(file_path)
|
||||||
|
|
||||||
|
if not path.exists():
|
||||||
|
raise FileNotFoundError(f"File not found: {file_path}")
|
||||||
|
|
||||||
|
return path.read_bytes()
|
||||||
|
|
||||||
def _get_document_path(self, file_hash):
|
def _get_document_path(self, file_hash):
|
||||||
"""
|
"""
|
||||||
|
|
||||||
@@ -117,7 +139,7 @@ class DocumentService:
|
|||||||
async def create_document(
|
async def create_document(
|
||||||
self,
|
self,
|
||||||
file_path: str,
|
file_path: str,
|
||||||
file_bytes: bytes,
|
file_bytes: bytes | None = None,
|
||||||
encoding: str = "utf-8"
|
encoding: str = "utf-8"
|
||||||
) -> FileDocument:
|
) -> FileDocument:
|
||||||
"""
|
"""
|
||||||
@@ -140,6 +162,7 @@ class DocumentService:
|
|||||||
PyMongoError: If database operation fails
|
PyMongoError: If database operation fails
|
||||||
"""
|
"""
|
||||||
# Calculate automatic attributes
|
# Calculate automatic attributes
|
||||||
|
file_bytes = file_bytes or self._read_file_bytes(file_path)
|
||||||
file_hash = self._calculate_file_hash(file_bytes)
|
file_hash = self._calculate_file_hash(file_bytes)
|
||||||
file_type = self._detect_file_type(file_path)
|
file_type = self._detect_file_type(file_path)
|
||||||
mime_type = self._detect_mime_type(file_bytes)
|
mime_type = self._detect_mime_type(file_bytes)
|
||||||
|
|||||||
@@ -8,8 +8,8 @@ creating default admin user if none exists.
|
|||||||
import logging
|
import logging
|
||||||
from typing import Optional
|
from typing import Optional
|
||||||
|
|
||||||
from app.models.user import UserCreate, UserInDB, UserCreateNoValidation
|
|
||||||
from app.models.auth import UserRole
|
from app.models.auth import UserRole
|
||||||
|
from app.models.user import UserInDB, UserCreateNoValidation
|
||||||
from app.services.user_service import UserService
|
from app.services.user_service import UserService
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
@@ -31,7 +31,6 @@ class InitializationService:
|
|||||||
user_service (UserService): Service for user operations
|
user_service (UserService): Service for user operations
|
||||||
"""
|
"""
|
||||||
self.user_service = user_service
|
self.user_service = user_service
|
||||||
|
|
||||||
|
|
||||||
async def ensure_admin_user_exists(self) -> Optional[UserInDB]:
|
async def ensure_admin_user_exists(self) -> Optional[UserInDB]:
|
||||||
"""
|
"""
|
||||||
@@ -131,4 +130,23 @@ class InitializationService:
|
|||||||
logger.error(error_msg)
|
logger.error(error_msg)
|
||||||
initialization_summary["errors"].append(error_msg)
|
initialization_summary["errors"].append(error_msg)
|
||||||
|
|
||||||
return initialization_summary
|
self.log_initialization_result(initialization_summary)
|
||||||
|
|
||||||
|
return initialization_summary
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def log_initialization_result(summary: dict) -> None:
|
||||||
|
"""
|
||||||
|
Log the result of the initialization process.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
summary (dict): Summary of initialization tasks performed
|
||||||
|
"""
|
||||||
|
if summary["initialization_success"]:
|
||||||
|
logger.info("Application startup completed successfully")
|
||||||
|
if summary["admin_user_created"]:
|
||||||
|
logger.info("Default admin user was created during startup")
|
||||||
|
else:
|
||||||
|
logger.error("Application startup completed with errors:")
|
||||||
|
for error in summary["errors"]:
|
||||||
|
logger.error(f" - {error}")
|
||||||
|
|||||||
182
src/file-processor/app/services/job_service.py
Normal file
182
src/file-processor/app/services/job_service.py
Normal file
@@ -0,0 +1,182 @@
|
|||||||
|
"""
|
||||||
|
Service layer for job processing business logic.
|
||||||
|
|
||||||
|
This module provides high-level operations for managing processing jobs
|
||||||
|
with strict status transition validation and business rules enforcement.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
|
from app.database.repositories.job_repository import JobRepository
|
||||||
|
from app.exceptions.job_exceptions import InvalidStatusTransitionError
|
||||||
|
from app.models.job import ProcessingJob, ProcessingStatus
|
||||||
|
from app.models.types import PyObjectId
|
||||||
|
|
||||||
|
|
||||||
|
class JobService:
|
||||||
|
"""
|
||||||
|
Service for processing job business logic operations.
|
||||||
|
|
||||||
|
Provides high-level job management with strict status transition
|
||||||
|
validation and business rule enforcement.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, database):
|
||||||
|
"""
|
||||||
|
Initialize service with job repository.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
repository: Optional JobRepository instance (creates default if None)
|
||||||
|
"""
|
||||||
|
self.db = database
|
||||||
|
self.repository = JobRepository(database)
|
||||||
|
|
||||||
|
async def initialize(self):
|
||||||
|
await self.repository.initialize()
|
||||||
|
return self
|
||||||
|
|
||||||
|
async def create_job(self, file_id: PyObjectId, task_id: Optional[str] = None) -> ProcessingJob:
|
||||||
|
"""
|
||||||
|
Create a new processing job.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
file_id: Reference to the file document
|
||||||
|
task_id: Optional Celery task UUID
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
The created ProcessingJob
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
JobRepositoryError: If database operation fails
|
||||||
|
"""
|
||||||
|
return await self.repository.create_job(file_id, task_id)
|
||||||
|
|
||||||
|
async def get_job_by_id(self, job_id: PyObjectId) -> ProcessingJob:
|
||||||
|
"""
|
||||||
|
Retrieve a job by its ID.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
job_id: The job ObjectId
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
The ProcessingJob document
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
JobNotFoundError: If job doesn't exist
|
||||||
|
JobRepositoryError: If database operation fails
|
||||||
|
"""
|
||||||
|
return await self.repository.find_job_by_id(job_id)
|
||||||
|
|
||||||
|
async def mark_job_as_started(self, job_id: PyObjectId) -> ProcessingJob:
|
||||||
|
"""
|
||||||
|
Mark a job as started (PENDING → PROCESSING).
|
||||||
|
|
||||||
|
Args:
|
||||||
|
job_id: The job ObjectId
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
The updated ProcessingJob
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
JobNotFoundError: If job doesn't exist
|
||||||
|
InvalidStatusTransitionError: If job is not in PENDING status
|
||||||
|
JobRepositoryError: If database operation fails
|
||||||
|
"""
|
||||||
|
# Get current job to validate transition
|
||||||
|
current_job = await self.repository.find_job_by_id(job_id)
|
||||||
|
|
||||||
|
# Validate status transition
|
||||||
|
if current_job.status != ProcessingStatus.PENDING:
|
||||||
|
raise InvalidStatusTransitionError(current_job.status, ProcessingStatus.PROCESSING)
|
||||||
|
|
||||||
|
# Update status
|
||||||
|
return await self.repository.update_job_status(job_id, ProcessingStatus.PROCESSING)
|
||||||
|
|
||||||
|
async def mark_job_as_completed(self, job_id: PyObjectId) -> ProcessingJob:
|
||||||
|
"""
|
||||||
|
Mark a job as completed (PROCESSING → COMPLETED).
|
||||||
|
|
||||||
|
Args:
|
||||||
|
job_id: The job ObjectId
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
The updated ProcessingJob
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
JobNotFoundError: If job doesn't exist
|
||||||
|
InvalidStatusTransitionError: If job is not in PROCESSING status
|
||||||
|
JobRepositoryError: If database operation fails
|
||||||
|
"""
|
||||||
|
# Get current job to validate transition
|
||||||
|
current_job = await self.repository.find_job_by_id(job_id)
|
||||||
|
|
||||||
|
# Validate status transition
|
||||||
|
if current_job.status != ProcessingStatus.PROCESSING:
|
||||||
|
raise InvalidStatusTransitionError(current_job.status, ProcessingStatus.COMPLETED)
|
||||||
|
|
||||||
|
# Update status
|
||||||
|
return await self.repository.update_job_status(job_id, ProcessingStatus.COMPLETED)
|
||||||
|
|
||||||
|
async def mark_job_as_failed(
|
||||||
|
self,
|
||||||
|
job_id: PyObjectId,
|
||||||
|
error_message: Optional[str] = None
|
||||||
|
) -> ProcessingJob:
|
||||||
|
"""
|
||||||
|
Mark a job as failed (PROCESSING → FAILED).
|
||||||
|
|
||||||
|
Args:
|
||||||
|
job_id: The job ObjectId
|
||||||
|
error_message: Optional error description
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
The updated ProcessingJob
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
JobNotFoundError: If job doesn't exist
|
||||||
|
InvalidStatusTransitionError: If job is not in PROCESSING status
|
||||||
|
JobRepositoryError: If database operation fails
|
||||||
|
"""
|
||||||
|
# Get current job to validate transition
|
||||||
|
current_job = await self.repository.find_job_by_id(job_id)
|
||||||
|
|
||||||
|
# Validate status transition
|
||||||
|
if current_job.status != ProcessingStatus.PROCESSING:
|
||||||
|
raise InvalidStatusTransitionError(current_job.status, ProcessingStatus.FAILED)
|
||||||
|
|
||||||
|
# Update status with error message
|
||||||
|
return await self.repository.update_job_status(
|
||||||
|
job_id,
|
||||||
|
ProcessingStatus.FAILED,
|
||||||
|
error_message
|
||||||
|
)
|
||||||
|
|
||||||
|
async def delete_job(self, job_id: PyObjectId) -> bool:
|
||||||
|
"""
|
||||||
|
Delete a job from the database.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
job_id: The job ObjectId
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
True if job was deleted, False if not found
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
JobRepositoryError: If database operation fails
|
||||||
|
"""
|
||||||
|
return await self.repository.delete_job(job_id)
|
||||||
|
|
||||||
|
async def get_jobs_by_status(self, status: ProcessingStatus) -> list[ProcessingJob]:
|
||||||
|
"""
|
||||||
|
Retrieve all jobs with a specific status.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
status: The processing status to filter by
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
List of ProcessingJob documents
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
JobRepositoryError: If database operation fails
|
||||||
|
"""
|
||||||
|
return await self.repository.get_jobs_by_status(status)
|
||||||
@@ -8,4 +8,5 @@ pymongo==4.15.0
|
|||||||
pydantic==2.11.9
|
pydantic==2.11.9
|
||||||
redis==6.4.0
|
redis==6.4.0
|
||||||
uvicorn==0.35.0
|
uvicorn==0.35.0
|
||||||
python-magic==0.4.27
|
python-magic==0.4.27
|
||||||
|
watchdog==6.0.0
|
||||||
@@ -8,7 +8,7 @@ COPY requirements.txt .
|
|||||||
RUN pip install --no-cache-dir -r requirements.txt
|
RUN pip install --no-cache-dir -r requirements.txt
|
||||||
|
|
||||||
# Copy application code
|
# Copy application code
|
||||||
COPY tasks/ .
|
COPY . .
|
||||||
|
|
||||||
# Command will be overridden by docker-compose
|
# Command will be overridden by docker-compose
|
||||||
CMD ["celery", "-A", "main", "worker", "--loglevel=info"]
|
CMD ["celery", "-A", "main", "worker", "--loglevel=info"]
|
||||||
179
src/worker/tasks/document_processing.py
Normal file
179
src/worker/tasks/document_processing.py
Normal file
@@ -0,0 +1,179 @@
|
|||||||
|
"""
|
||||||
|
Celery tasks for document processing with ProcessingJob status management.
|
||||||
|
|
||||||
|
This module contains Celery tasks that handle document content extraction
|
||||||
|
and update processing job statuses throughout the task lifecycle.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import logging
|
||||||
|
from typing import Any, Dict
|
||||||
|
|
||||||
|
from tasks.main import app as celery_app
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
# @celery_app.task(bind=True, autoretry_for=(Exception,), retry_kwargs={'max_retries': 3, 'countdown': 60})
|
||||||
|
# def process_document(self, document_service, job_service, filepath: str) -> Dict[str, Any]:
|
||||||
|
# """
|
||||||
|
# Process a document file and extract its content.
|
||||||
|
#
|
||||||
|
# This task:
|
||||||
|
# 1. Updates the processing job status to PROCESSING
|
||||||
|
# 2. Performs document content extraction
|
||||||
|
# 3. Updates job status to COMPLETED or FAILED based on result
|
||||||
|
#
|
||||||
|
# Args:
|
||||||
|
# self : Celery task instance
|
||||||
|
# job_service : Instance of JobService
|
||||||
|
# document_service : Instance of DocumentService
|
||||||
|
# filepath: Full path to the document file to process
|
||||||
|
#
|
||||||
|
# Returns:
|
||||||
|
# Dictionary containing processing results
|
||||||
|
#
|
||||||
|
# Raises:
|
||||||
|
# Exception: Any processing error (will trigger retry)
|
||||||
|
# """
|
||||||
|
# task_id = self.request.id
|
||||||
|
# logger.info(f"Starting document processing task {task_id} for file: {filepath}")
|
||||||
|
#
|
||||||
|
# try:
|
||||||
|
# # Step 1: Mark job as started
|
||||||
|
# await job_service.mark_job_as_started(task_id=task_id)
|
||||||
|
# logger.info(f"Job {task_id} marked as PROCESSING")
|
||||||
|
#
|
||||||
|
# # Step 2: Process the document (extract content, OCR, etc.)
|
||||||
|
# document = await self.document_service.create_document(filepath)
|
||||||
|
# logger.info(f"Created document record with ID: {document.id}")
|
||||||
|
#
|
||||||
|
# result = document_service.extract_document_content(filepath)
|
||||||
|
# logger.info(f"Document content extracted successfully for task {task_id}")
|
||||||
|
#
|
||||||
|
# # Step 3: Mark job as completed
|
||||||
|
# await job_service.mark_job_as_completed(task_id=task_id)
|
||||||
|
# logger.info(f"Job {task_id} marked as COMPLETED")
|
||||||
|
#
|
||||||
|
# return {
|
||||||
|
# "task_id": task_id,
|
||||||
|
# "filepath": filepath,
|
||||||
|
# "status": "completed",
|
||||||
|
# "content_length": len(result.get("content", "")),
|
||||||
|
# "extraction_method": result.get("extraction_method"),
|
||||||
|
# "processing_time": result.get("processing_time")
|
||||||
|
# }
|
||||||
|
#
|
||||||
|
# except Exception as e:
|
||||||
|
# error_message = f"Document processing failed: {str(e)}"
|
||||||
|
# logger.error(f"Task {task_id} failed: {error_message}")
|
||||||
|
#
|
||||||
|
# try:
|
||||||
|
# # Mark job as failed
|
||||||
|
# job_service.mark_job_as_failed(task_id=task_id, error_message=error_message)
|
||||||
|
# logger.info(f"Job {task_id} marked as FAILED")
|
||||||
|
# except Exception as job_error:
|
||||||
|
# logger.error(f"Failed to update job status for task {task_id}: {str(job_error)}")
|
||||||
|
#
|
||||||
|
# # Re-raise the exception to trigger Celery retry mechanism
|
||||||
|
# raise
|
||||||
|
|
||||||
|
|
||||||
|
@celery_app.task(name="tasks.document_processing.process_document",
|
||||||
|
bind=True,
|
||||||
|
autoretry_for=(Exception,),
|
||||||
|
retry_kwargs={'max_retries': 3, 'countdown': 60})
|
||||||
|
def process_document(self, filepath: str) -> Dict[str, Any]:
|
||||||
|
"""
|
||||||
|
Process a document file and extract its content.
|
||||||
|
|
||||||
|
This task:
|
||||||
|
1. Updates the processing job status to PROCESSING
|
||||||
|
2. Performs document content extraction
|
||||||
|
3. Updates job status to COMPLETED or FAILED based on result
|
||||||
|
|
||||||
|
Args:
|
||||||
|
self : Celery task instance
|
||||||
|
job_service : Instance of JobService
|
||||||
|
document_service : Instance of DocumentService
|
||||||
|
filepath: Full path to the document file to process
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Dictionary containing processing results
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
Exception: Any processing error (will trigger retry)
|
||||||
|
"""
|
||||||
|
task_id = self.request.id
|
||||||
|
logger.info(f"Starting document processing task {task_id} for file: {filepath}")
|
||||||
|
|
||||||
|
|
||||||
|
@celery_app.task(bind=True)
|
||||||
|
def cleanup_old_processing_jobs(self, days_old: int = 30) -> Dict[str, Any]:
|
||||||
|
"""
|
||||||
|
Clean up old processing jobs from the database.
|
||||||
|
|
||||||
|
This maintenance task removes completed and failed jobs older than
|
||||||
|
the specified number of days.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
days_old: Number of days after which to clean up jobs
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Dictionary containing cleanup statistics
|
||||||
|
"""
|
||||||
|
task_id = self.request.id
|
||||||
|
logger.info(f"Starting cleanup task {task_id} for jobs older than {days_old} days")
|
||||||
|
|
||||||
|
job_service = JobService()
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Perform cleanup
|
||||||
|
cleanup_result = job_service.cleanup_old_jobs(days_old=days_old)
|
||||||
|
|
||||||
|
logger.info(
|
||||||
|
f"Cleanup task {task_id} completed: "
|
||||||
|
f"deleted {cleanup_result['deleted_count']} jobs"
|
||||||
|
)
|
||||||
|
|
||||||
|
return {
|
||||||
|
"task_id": task_id,
|
||||||
|
"status": "completed",
|
||||||
|
"deleted_count": cleanup_result["deleted_count"],
|
||||||
|
"days_old": days_old
|
||||||
|
}
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
error_message = f"Cleanup task failed: {str(e)}"
|
||||||
|
logger.error(f"Cleanup task {task_id} failed: {error_message}")
|
||||||
|
raise
|
||||||
|
|
||||||
|
|
||||||
|
@celery_app.task(bind=True)
|
||||||
|
def get_processing_statistics(self) -> Dict[str, Any]:
|
||||||
|
"""
|
||||||
|
Generate processing statistics for monitoring.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Dictionary containing current processing statistics
|
||||||
|
"""
|
||||||
|
task_id = self.request.id
|
||||||
|
logger.info(f"Generating processing statistics for task {task_id}")
|
||||||
|
|
||||||
|
job_service = JobService()
|
||||||
|
|
||||||
|
try:
|
||||||
|
stats = job_service.get_processing_statistics()
|
||||||
|
|
||||||
|
logger.info(f"Statistics generated for task {task_id}")
|
||||||
|
|
||||||
|
return {
|
||||||
|
"task_id": task_id,
|
||||||
|
"status": "completed",
|
||||||
|
"statistics": stats,
|
||||||
|
"timestamp": stats.get("generated_at")
|
||||||
|
}
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
error_message = f"Statistics generation failed: {str(e)}"
|
||||||
|
logger.error(f"Statistics task {task_id} failed: {error_message}")
|
||||||
|
raise
|
||||||
@@ -6,6 +6,7 @@ This module contains all Celery tasks for processing documents.
|
|||||||
|
|
||||||
import os
|
import os
|
||||||
import time
|
import time
|
||||||
|
|
||||||
from celery import Celery
|
from celery import Celery
|
||||||
|
|
||||||
# Environment variables
|
# Environment variables
|
||||||
@@ -110,4 +111,4 @@ def process_document_task(self, file_path: str):
|
|||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
app.start()
|
app.start()
|
||||||
|
|||||||
523
tests/repositories/test_job_repository.py
Normal file
523
tests/repositories/test_job_repository.py
Normal file
@@ -0,0 +1,523 @@
|
|||||||
|
"""
|
||||||
|
Test suite for JobRepository with async/await support.
|
||||||
|
|
||||||
|
This module contains comprehensive tests for all JobRepository methods
|
||||||
|
using mongomock-motor for in-memory MongoDB testing.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from datetime import datetime
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
import pytest_asyncio
|
||||||
|
from bson import ObjectId
|
||||||
|
from mongomock_motor import AsyncMongoMockClient
|
||||||
|
from pymongo.errors import PyMongoError
|
||||||
|
|
||||||
|
from app.database.repositories.job_repository import JobRepository
|
||||||
|
from app.exceptions.job_exceptions import JobRepositoryError
|
||||||
|
from app.models.job import ProcessingJob, ProcessingStatus
|
||||||
|
from app.models.types import PyObjectId
|
||||||
|
|
||||||
|
|
||||||
|
@pytest_asyncio.fixture
|
||||||
|
async def in_memory_repository():
|
||||||
|
"""Create an in-memory JobRepository for testing."""
|
||||||
|
client = AsyncMongoMockClient()
|
||||||
|
db = client.test_database
|
||||||
|
repo = JobRepository(db)
|
||||||
|
await repo.initialize()
|
||||||
|
return repo
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def sample_document_id():
|
||||||
|
"""Sample document ObjectId for testing."""
|
||||||
|
return PyObjectId()
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def sample_task_id():
|
||||||
|
"""Sample Celery task ID for testing."""
|
||||||
|
return "celery-task-12345-abcde"
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def multiple_sample_jobs():
|
||||||
|
"""Multiple ProcessingJob objects for testing."""
|
||||||
|
doc_id_1 = ObjectId()
|
||||||
|
doc_id_2 = ObjectId()
|
||||||
|
base_time = datetime.utcnow()
|
||||||
|
|
||||||
|
return [
|
||||||
|
ProcessingJob(
|
||||||
|
document_id=doc_id_1,
|
||||||
|
status=ProcessingStatus.PENDING,
|
||||||
|
task_id="task-1",
|
||||||
|
created_at=base_time,
|
||||||
|
started_at=None,
|
||||||
|
completed_at=None,
|
||||||
|
error_message=None
|
||||||
|
),
|
||||||
|
ProcessingJob(
|
||||||
|
document_id=doc_id_2,
|
||||||
|
status=ProcessingStatus.PROCESSING,
|
||||||
|
task_id="task-2",
|
||||||
|
created_at=base_time,
|
||||||
|
started_at=base_time,
|
||||||
|
completed_at=None,
|
||||||
|
error_message=None
|
||||||
|
),
|
||||||
|
ProcessingJob(
|
||||||
|
document_id=doc_id_1,
|
||||||
|
status=ProcessingStatus.COMPLETED,
|
||||||
|
task_id="task-3",
|
||||||
|
created_at=base_time,
|
||||||
|
started_at=base_time,
|
||||||
|
completed_at=base_time,
|
||||||
|
error_message=None
|
||||||
|
)
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
class TestJobRepositoryInitialization:
|
||||||
|
"""Tests for repository initialization."""
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_i_can_initialize_repository(self):
|
||||||
|
"""Test repository initialization."""
|
||||||
|
# Arrange
|
||||||
|
client = AsyncMongoMockClient()
|
||||||
|
db = client.test_database
|
||||||
|
repo = JobRepository(db)
|
||||||
|
|
||||||
|
# Act
|
||||||
|
initialized_repo = await repo.initialize()
|
||||||
|
|
||||||
|
# Assert
|
||||||
|
assert initialized_repo is repo
|
||||||
|
assert repo.db is not None
|
||||||
|
assert repo.collection is not None
|
||||||
|
|
||||||
|
|
||||||
|
class TestJobRepositoryCreation:
|
||||||
|
"""Tests for job creation functionality."""
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_i_can_create_job_with_task_id(self, in_memory_repository, sample_document_id, sample_task_id):
|
||||||
|
"""Test successful job creation with task ID."""
|
||||||
|
# Act
|
||||||
|
created_job = await in_memory_repository.create_job(sample_document_id, sample_task_id)
|
||||||
|
|
||||||
|
# Assert
|
||||||
|
assert created_job is not None
|
||||||
|
assert created_job.document_id == sample_document_id
|
||||||
|
assert created_job.task_id == sample_task_id
|
||||||
|
assert created_job.status == ProcessingStatus.PENDING
|
||||||
|
assert created_job.created_at is not None
|
||||||
|
assert created_job.started_at is None
|
||||||
|
assert created_job.completed_at is None
|
||||||
|
assert created_job.error_message is None
|
||||||
|
assert created_job.id is not None
|
||||||
|
assert isinstance(created_job.id, ObjectId)
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_i_can_create_job_without_task_id(self, in_memory_repository, sample_document_id):
|
||||||
|
"""Test successful job creation without task ID."""
|
||||||
|
# Act
|
||||||
|
created_job = await in_memory_repository.create_job(sample_document_id)
|
||||||
|
|
||||||
|
# Assert
|
||||||
|
assert created_job is not None
|
||||||
|
assert created_job.document_id == sample_document_id
|
||||||
|
assert created_job.task_id is None
|
||||||
|
assert created_job.status == ProcessingStatus.PENDING
|
||||||
|
assert created_job.created_at is not None
|
||||||
|
assert created_job.started_at is None
|
||||||
|
assert created_job.completed_at is None
|
||||||
|
assert created_job.error_message is None
|
||||||
|
assert created_job.id is not None
|
||||||
|
assert isinstance(created_job.id, ObjectId)
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_i_cannot_create_duplicate_job_for_document(self, in_memory_repository, sample_document_id,
|
||||||
|
sample_task_id):
|
||||||
|
"""Test that creating job with duplicate document_id raises DuplicateKeyError."""
|
||||||
|
# Arrange
|
||||||
|
await in_memory_repository.create_job(sample_document_id, sample_task_id)
|
||||||
|
|
||||||
|
# Act & Assert
|
||||||
|
with pytest.raises(JobRepositoryError) as exc_info:
|
||||||
|
await in_memory_repository.create_job(sample_document_id, "different-task-id")
|
||||||
|
|
||||||
|
assert "create_job" in str(exc_info.value)
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_i_cannot_create_job_with_pymongo_error(self, in_memory_repository, sample_document_id, mocker):
|
||||||
|
"""Test handling of PyMongo errors during job creation."""
|
||||||
|
# Arrange
|
||||||
|
mocker.patch.object(in_memory_repository.collection, 'insert_one', side_effect=PyMongoError("Database error"))
|
||||||
|
|
||||||
|
# Act & Assert
|
||||||
|
with pytest.raises(JobRepositoryError) as exc_info:
|
||||||
|
await in_memory_repository.create_job(sample_document_id)
|
||||||
|
|
||||||
|
assert "create_job" in str(exc_info.value)
|
||||||
|
|
||||||
|
|
||||||
|
class TestJobRepositoryFinding:
|
||||||
|
"""Tests for job finding functionality."""
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_i_can_find_job_by_valid_id(self, in_memory_repository, sample_document_id, sample_task_id):
|
||||||
|
"""Test finding job by valid ObjectId."""
|
||||||
|
# Arrange
|
||||||
|
created_job = await in_memory_repository.create_job(sample_document_id, sample_task_id)
|
||||||
|
|
||||||
|
# Act
|
||||||
|
found_job = await in_memory_repository.find_job_by_id(created_job.id)
|
||||||
|
|
||||||
|
# Assert
|
||||||
|
assert found_job is not None
|
||||||
|
assert found_job.id == created_job.id
|
||||||
|
assert found_job.document_id == created_job.document_id
|
||||||
|
assert found_job.task_id == created_job.task_id
|
||||||
|
assert found_job.status == created_job.status
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_i_cannot_find_job_by_nonexistent_id(self, in_memory_repository):
|
||||||
|
"""Test that nonexistent ObjectId returns None."""
|
||||||
|
# Arrange
|
||||||
|
nonexistent_id = PyObjectId()
|
||||||
|
|
||||||
|
# Act
|
||||||
|
found_job = await in_memory_repository.find_job_by_id(nonexistent_id)
|
||||||
|
|
||||||
|
# Assert
|
||||||
|
assert found_job is None
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_i_cannot_find_job_with_pymongo_error(self, in_memory_repository, mocker):
|
||||||
|
"""Test handling of PyMongo errors during job finding."""
|
||||||
|
# Arrange
|
||||||
|
mocker.patch.object(in_memory_repository.collection, 'find_one', side_effect=PyMongoError("Database error"))
|
||||||
|
|
||||||
|
# Act & Assert
|
||||||
|
with pytest.raises(JobRepositoryError) as exc_info:
|
||||||
|
await in_memory_repository.find_job_by_id(PyObjectId())
|
||||||
|
|
||||||
|
assert "get_job_by_id" in str(exc_info.value)
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_i_can_find_jobs_by_document_id(self, in_memory_repository, sample_document_id, sample_task_id):
|
||||||
|
"""Test finding jobs by document ID."""
|
||||||
|
# Arrange
|
||||||
|
created_job = await in_memory_repository.create_job(sample_document_id, sample_task_id)
|
||||||
|
|
||||||
|
# Act
|
||||||
|
found_jobs = await in_memory_repository.find_jobs_by_document_id(sample_document_id)
|
||||||
|
|
||||||
|
# Assert
|
||||||
|
assert len(found_jobs) == 1
|
||||||
|
assert found_jobs[0].id == created_job.id
|
||||||
|
assert found_jobs[0].document_id == sample_document_id
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_i_can_find_empty_jobs_list_for_nonexistent_document(self, in_memory_repository):
|
||||||
|
"""Test that nonexistent document ID returns empty list."""
|
||||||
|
# Arrange
|
||||||
|
nonexistent_id = ObjectId()
|
||||||
|
|
||||||
|
# Act
|
||||||
|
found_jobs = await in_memory_repository.find_jobs_by_document_id(nonexistent_id)
|
||||||
|
|
||||||
|
# Assert
|
||||||
|
assert found_jobs == []
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_i_cannot_find_jobs_by_document_with_pymongo_error(self, in_memory_repository, mocker):
|
||||||
|
"""Test handling of PyMongo errors during finding jobs by document ID."""
|
||||||
|
# Arrange
|
||||||
|
mocker.patch.object(in_memory_repository.collection, 'find', side_effect=PyMongoError("Database error"))
|
||||||
|
|
||||||
|
# Act & Assert
|
||||||
|
with pytest.raises(JobRepositoryError) as exc_info:
|
||||||
|
await in_memory_repository.find_jobs_by_document_id(PyObjectId())
|
||||||
|
|
||||||
|
assert "get_jobs_by_file_id" in str(exc_info.value)
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
@pytest.mark.parametrize("status", [
|
||||||
|
ProcessingStatus.PENDING,
|
||||||
|
ProcessingStatus.PROCESSING,
|
||||||
|
ProcessingStatus.COMPLETED
|
||||||
|
])
|
||||||
|
async def test_i_can_find_jobs_by_pending_status(self, in_memory_repository, sample_document_id, status):
|
||||||
|
"""Test finding jobs by PENDING status."""
|
||||||
|
# Arrange
|
||||||
|
created_job = await in_memory_repository.create_job(sample_document_id)
|
||||||
|
await in_memory_repository.update_job_status(created_job.id, status)
|
||||||
|
|
||||||
|
# Act
|
||||||
|
found_jobs = await in_memory_repository.get_jobs_by_status(status)
|
||||||
|
|
||||||
|
# Assert
|
||||||
|
assert len(found_jobs) == 1
|
||||||
|
assert found_jobs[0].id == created_job.id
|
||||||
|
assert found_jobs[0].status == status
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_i_can_find_jobs_by_failed_status(self, in_memory_repository, sample_document_id):
|
||||||
|
"""Test finding jobs by FAILED status."""
|
||||||
|
# Arrange
|
||||||
|
created_job = await in_memory_repository.create_job(sample_document_id)
|
||||||
|
await in_memory_repository.update_job_status(created_job.id, ProcessingStatus.FAILED, "Test error")
|
||||||
|
|
||||||
|
# Act
|
||||||
|
found_jobs = await in_memory_repository.get_jobs_by_status(ProcessingStatus.FAILED)
|
||||||
|
|
||||||
|
# Assert
|
||||||
|
assert len(found_jobs) == 1
|
||||||
|
assert found_jobs[0].id == created_job.id
|
||||||
|
assert found_jobs[0].status == ProcessingStatus.FAILED
|
||||||
|
assert found_jobs[0].error_message == "Test error"
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_i_can_find_empty_jobs_list_for_unused_status(self, in_memory_repository):
|
||||||
|
"""Test that unused status returns empty list."""
|
||||||
|
# Act
|
||||||
|
found_jobs = await in_memory_repository.get_jobs_by_status(ProcessingStatus.COMPLETED)
|
||||||
|
|
||||||
|
# Assert
|
||||||
|
assert found_jobs == []
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_i_cannot_find_jobs_by_status_with_pymongo_error(self, in_memory_repository, mocker):
|
||||||
|
"""Test handling of PyMongo errors during finding jobs by status."""
|
||||||
|
# Arrange
|
||||||
|
mocker.patch.object(in_memory_repository.collection, 'find', side_effect=PyMongoError("Database error"))
|
||||||
|
|
||||||
|
# Act & Assert
|
||||||
|
with pytest.raises(JobRepositoryError) as exc_info:
|
||||||
|
await in_memory_repository.get_jobs_by_status(ProcessingStatus.PENDING)
|
||||||
|
|
||||||
|
assert "get_jobs_by_status" in str(exc_info.value)
|
||||||
|
|
||||||
|
|
||||||
|
class TestJobRepositoryStatusUpdate:
|
||||||
|
"""Tests for job status update functionality."""
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_i_can_update_job_status_to_processing(self, in_memory_repository, sample_document_id):
|
||||||
|
"""Test updating job status to PROCESSING with started_at timestamp."""
|
||||||
|
# Arrange
|
||||||
|
created_job = await in_memory_repository.create_job(sample_document_id)
|
||||||
|
|
||||||
|
# Act
|
||||||
|
updated_job = await in_memory_repository.update_job_status(created_job.id, ProcessingStatus.PROCESSING)
|
||||||
|
|
||||||
|
# Assert
|
||||||
|
assert updated_job is not None
|
||||||
|
assert updated_job.id == created_job.id
|
||||||
|
assert updated_job.status == ProcessingStatus.PROCESSING
|
||||||
|
assert updated_job.started_at is not None
|
||||||
|
assert updated_job.completed_at is None
|
||||||
|
assert updated_job.error_message is None
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_i_can_update_job_status_to_completed(self, in_memory_repository, sample_document_id):
|
||||||
|
"""Test updating job status to COMPLETED with completed_at timestamp."""
|
||||||
|
# Arrange
|
||||||
|
created_job = await in_memory_repository.create_job(sample_document_id)
|
||||||
|
await in_memory_repository.update_job_status(created_job.id, ProcessingStatus.PROCESSING)
|
||||||
|
|
||||||
|
# Act
|
||||||
|
updated_job = await in_memory_repository.update_job_status(created_job.id, ProcessingStatus.COMPLETED)
|
||||||
|
|
||||||
|
# Assert
|
||||||
|
assert updated_job is not None
|
||||||
|
assert updated_job.id == created_job.id
|
||||||
|
assert updated_job.status == ProcessingStatus.COMPLETED
|
||||||
|
assert updated_job.started_at is not None
|
||||||
|
assert updated_job.completed_at is not None
|
||||||
|
assert updated_job.error_message is None
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_i_can_update_job_status_to_failed_with_error(self, in_memory_repository, sample_document_id):
|
||||||
|
"""Test updating job status to FAILED with error message and completed_at timestamp."""
|
||||||
|
# Arrange
|
||||||
|
created_job = await in_memory_repository.create_job(sample_document_id)
|
||||||
|
error_message = "Processing failed due to invalid format"
|
||||||
|
|
||||||
|
# Act
|
||||||
|
updated_job = await in_memory_repository.update_job_status(
|
||||||
|
created_job.id, ProcessingStatus.FAILED, error_message
|
||||||
|
)
|
||||||
|
|
||||||
|
# Assert
|
||||||
|
assert updated_job is not None
|
||||||
|
assert updated_job.id == created_job.id
|
||||||
|
assert updated_job.status == ProcessingStatus.FAILED
|
||||||
|
assert updated_job.completed_at is not None
|
||||||
|
assert updated_job.error_message == error_message
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_i_can_update_job_status_to_failed_without_error(self, in_memory_repository, sample_document_id):
|
||||||
|
"""Test updating job status to FAILED without error message."""
|
||||||
|
# Arrange
|
||||||
|
created_job = await in_memory_repository.create_job(sample_document_id)
|
||||||
|
|
||||||
|
# Act
|
||||||
|
updated_job = await in_memory_repository.update_job_status(created_job.id, ProcessingStatus.FAILED)
|
||||||
|
|
||||||
|
# Assert
|
||||||
|
assert updated_job is not None
|
||||||
|
assert updated_job.id == created_job.id
|
||||||
|
assert updated_job.status == ProcessingStatus.FAILED
|
||||||
|
assert updated_job.completed_at is not None
|
||||||
|
assert updated_job.error_message is None
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_i_cannot_update_nonexistent_job_status(self, in_memory_repository):
|
||||||
|
"""Test that updating nonexistent job returns None."""
|
||||||
|
# Arrange
|
||||||
|
nonexistent_id = ObjectId()
|
||||||
|
|
||||||
|
# Act
|
||||||
|
result = await in_memory_repository.update_job_status(nonexistent_id, ProcessingStatus.COMPLETED)
|
||||||
|
|
||||||
|
# Assert
|
||||||
|
assert result is None
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_i_cannot_update_job_status_with_pymongo_error(self, in_memory_repository, sample_document_id, mocker):
|
||||||
|
"""Test handling of PyMongo errors during job status update."""
|
||||||
|
# Arrange
|
||||||
|
created_job = await in_memory_repository.create_job(sample_document_id)
|
||||||
|
mocker.patch.object(in_memory_repository.collection, 'find_one_and_update',
|
||||||
|
side_effect=PyMongoError("Database error"))
|
||||||
|
|
||||||
|
# Act & Assert
|
||||||
|
with pytest.raises(JobRepositoryError) as exc_info:
|
||||||
|
await in_memory_repository.update_job_status(created_job.id, ProcessingStatus.COMPLETED)
|
||||||
|
|
||||||
|
assert "update_job_status" in str(exc_info.value)
|
||||||
|
|
||||||
|
|
||||||
|
class TestJobRepositoryDeletion:
|
||||||
|
"""Tests for job deletion functionality."""
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_i_can_delete_existing_job(self, in_memory_repository, sample_document_id):
|
||||||
|
"""Test successful job deletion."""
|
||||||
|
# Arrange
|
||||||
|
created_job = await in_memory_repository.create_job(sample_document_id)
|
||||||
|
|
||||||
|
# Act
|
||||||
|
deletion_result = await in_memory_repository.delete_job(created_job.id)
|
||||||
|
|
||||||
|
# Assert
|
||||||
|
assert deletion_result is True
|
||||||
|
|
||||||
|
# Verify job is actually deleted
|
||||||
|
found_job = await in_memory_repository.find_job_by_id(created_job.id)
|
||||||
|
assert found_job is None
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_i_cannot_delete_nonexistent_job(self, in_memory_repository):
|
||||||
|
"""Test that deleting nonexistent job returns False."""
|
||||||
|
# Arrange
|
||||||
|
nonexistent_id = ObjectId()
|
||||||
|
|
||||||
|
# Act
|
||||||
|
result = await in_memory_repository.delete_job(nonexistent_id)
|
||||||
|
|
||||||
|
# Assert
|
||||||
|
assert result is False
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_i_cannot_delete_job_with_pymongo_error(self, in_memory_repository, sample_document_id, mocker):
|
||||||
|
"""Test handling of PyMongo errors during job deletion."""
|
||||||
|
# Arrange
|
||||||
|
created_job = await in_memory_repository.create_job(sample_document_id)
|
||||||
|
mocker.patch.object(in_memory_repository.collection, 'delete_one', side_effect=PyMongoError("Database error"))
|
||||||
|
|
||||||
|
# Act & Assert
|
||||||
|
with pytest.raises(JobRepositoryError) as exc_info:
|
||||||
|
await in_memory_repository.delete_job(created_job.id)
|
||||||
|
|
||||||
|
assert "delete_job" in str(exc_info.value)
|
||||||
|
|
||||||
|
|
||||||
|
class TestJobRepositoryComplexScenarios:
|
||||||
|
"""Tests for complex job repository scenarios."""
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_i_can_handle_complete_job_lifecycle(self, in_memory_repository, sample_document_id, sample_task_id):
|
||||||
|
"""Test complete job lifecycle from creation to completion."""
|
||||||
|
# Create job
|
||||||
|
job = await in_memory_repository.create_job(sample_document_id, sample_task_id)
|
||||||
|
assert job.status == ProcessingStatus.PENDING
|
||||||
|
assert job.started_at is None
|
||||||
|
assert job.completed_at is None
|
||||||
|
|
||||||
|
# Start processing
|
||||||
|
job = await in_memory_repository.update_job_status(job.id, ProcessingStatus.PROCESSING)
|
||||||
|
assert job.status == ProcessingStatus.PROCESSING
|
||||||
|
assert job.started_at is not None
|
||||||
|
assert job.completed_at is None
|
||||||
|
|
||||||
|
# Complete job
|
||||||
|
job = await in_memory_repository.update_job_status(job.id, ProcessingStatus.COMPLETED)
|
||||||
|
assert job.status == ProcessingStatus.COMPLETED
|
||||||
|
assert job.started_at is not None
|
||||||
|
assert job.completed_at is not None
|
||||||
|
assert job.error_message is None
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_i_can_handle_job_failure_scenario(self, in_memory_repository, sample_document_id, sample_task_id):
|
||||||
|
"""Test job failure scenario with error message."""
|
||||||
|
# Create and start job
|
||||||
|
job = await in_memory_repository.create_job(sample_document_id, sample_task_id)
|
||||||
|
job = await in_memory_repository.update_job_status(job.id, ProcessingStatus.PROCESSING)
|
||||||
|
|
||||||
|
# Fail job with error
|
||||||
|
error_msg = "File format not supported"
|
||||||
|
job = await in_memory_repository.update_job_status(job.id, ProcessingStatus.FAILED, error_msg)
|
||||||
|
|
||||||
|
# Assert failure state
|
||||||
|
assert job.status == ProcessingStatus.FAILED
|
||||||
|
assert job.started_at is not None
|
||||||
|
assert job.completed_at is not None
|
||||||
|
assert job.error_message == error_msg
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_i_can_handle_multiple_documents_with_different_statuses(self, in_memory_repository):
|
||||||
|
"""Test managing multiple jobs for different documents with various statuses."""
|
||||||
|
# Create jobs for different documents
|
||||||
|
doc1 = PyObjectId()
|
||||||
|
doc2 = PyObjectId()
|
||||||
|
doc3 = PyObjectId()
|
||||||
|
|
||||||
|
job1 = await in_memory_repository.create_job(doc1, "task-1")
|
||||||
|
job2 = await in_memory_repository.create_job(doc2, "task-2")
|
||||||
|
job3 = await in_memory_repository.create_job(doc3, "task-3")
|
||||||
|
|
||||||
|
# Update to different statuses
|
||||||
|
await in_memory_repository.update_job_status(job1.id, ProcessingStatus.PROCESSING)
|
||||||
|
await in_memory_repository.update_job_status(job2.id, ProcessingStatus.COMPLETED)
|
||||||
|
await in_memory_repository.update_job_status(job3.id, ProcessingStatus.FAILED, "Error occurred")
|
||||||
|
|
||||||
|
# Verify status queries
|
||||||
|
pending_jobs = await in_memory_repository.get_jobs_by_status(ProcessingStatus.PENDING)
|
||||||
|
processing_jobs = await in_memory_repository.get_jobs_by_status(ProcessingStatus.PROCESSING)
|
||||||
|
completed_jobs = await in_memory_repository.get_jobs_by_status(ProcessingStatus.COMPLETED)
|
||||||
|
failed_jobs = await in_memory_repository.get_jobs_by_status(ProcessingStatus.FAILED)
|
||||||
|
|
||||||
|
assert len(pending_jobs) == 0
|
||||||
|
assert len(processing_jobs) == 1
|
||||||
|
assert len(completed_jobs) == 1
|
||||||
|
assert len(failed_jobs) == 1
|
||||||
|
|
||||||
|
assert processing_jobs[0].id == job1.id
|
||||||
|
assert completed_jobs[0].id == job2.id
|
||||||
|
assert failed_jobs[0].id == job3.id
|
||||||
578
tests/services/test_job_service.py
Normal file
578
tests/services/test_job_service.py
Normal file
@@ -0,0 +1,578 @@
|
|||||||
|
"""
|
||||||
|
Unit tests for JobService using in-memory MongoDB.
|
||||||
|
|
||||||
|
Tests the business logic operations with real MongoDB operations
|
||||||
|
using mongomock for better integration testing.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
import pytest_asyncio
|
||||||
|
from bson import ObjectId
|
||||||
|
from mongomock_motor import AsyncMongoMockClient
|
||||||
|
|
||||||
|
from app.exceptions.job_exceptions import InvalidStatusTransitionError
|
||||||
|
from app.models.job import ProcessingStatus
|
||||||
|
from app.models.types import PyObjectId
|
||||||
|
from app.services.job_service import JobService
|
||||||
|
|
||||||
|
|
||||||
|
@pytest_asyncio.fixture
|
||||||
|
async def in_memory_database():
|
||||||
|
"""Create an in-memory database for testing."""
|
||||||
|
client = AsyncMongoMockClient()
|
||||||
|
return client.test_database
|
||||||
|
|
||||||
|
|
||||||
|
@pytest_asyncio.fixture
|
||||||
|
async def job_service(in_memory_database):
|
||||||
|
"""Create JobService with in-memory repositories."""
|
||||||
|
service = await JobService(in_memory_database).initialize()
|
||||||
|
return service
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def sample_document_id():
|
||||||
|
"""Sample file ObjectId."""
|
||||||
|
return PyObjectId()
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def sample_task_id():
|
||||||
|
"""Sample Celery task UUID."""
|
||||||
|
return "550e8400-e29b-41d4-a716-446655440000"
|
||||||
|
|
||||||
|
|
||||||
|
class TestCreateJob:
|
||||||
|
"""Tests for create_job method."""
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_i_can_create_job_with_task_id(
|
||||||
|
self,
|
||||||
|
job_service,
|
||||||
|
sample_document_id,
|
||||||
|
sample_task_id
|
||||||
|
):
|
||||||
|
"""Test creating job with task ID."""
|
||||||
|
# Execute
|
||||||
|
result = await job_service.create_job(sample_document_id, sample_task_id)
|
||||||
|
|
||||||
|
# Verify job creation
|
||||||
|
assert result is not None
|
||||||
|
assert result.document_id == sample_document_id
|
||||||
|
assert result.task_id == sample_task_id
|
||||||
|
assert result.status == ProcessingStatus.PENDING
|
||||||
|
assert result.created_at is not None
|
||||||
|
assert result.started_at is None
|
||||||
|
assert result.error_message is None
|
||||||
|
|
||||||
|
# Verify job exists in database
|
||||||
|
job_in_db = await job_service.get_job_by_id(result.id)
|
||||||
|
assert job_in_db is not None
|
||||||
|
assert job_in_db.id == result.id
|
||||||
|
assert job_in_db.document_id == sample_document_id
|
||||||
|
assert job_in_db.task_id == sample_task_id
|
||||||
|
assert job_in_db.status == ProcessingStatus.PENDING
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_i_can_create_job_without_task_id(
|
||||||
|
self,
|
||||||
|
job_service,
|
||||||
|
sample_document_id
|
||||||
|
):
|
||||||
|
"""Test creating job without task ID."""
|
||||||
|
# Execute
|
||||||
|
result = await job_service.create_job(sample_document_id)
|
||||||
|
|
||||||
|
# Verify job creation
|
||||||
|
assert result is not None
|
||||||
|
assert result.document_id == sample_document_id
|
||||||
|
assert result.task_id is None
|
||||||
|
assert result.status == ProcessingStatus.PENDING
|
||||||
|
assert result.created_at is not None
|
||||||
|
assert result.started_at is None
|
||||||
|
assert result.error_message is None
|
||||||
|
|
||||||
|
|
||||||
|
class TestGetJobMethods:
|
||||||
|
"""Tests for job retrieval methods."""
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_i_can_get_job_by_id(
|
||||||
|
self,
|
||||||
|
job_service,
|
||||||
|
sample_document_id,
|
||||||
|
sample_task_id
|
||||||
|
):
|
||||||
|
"""Test retrieving job by ID."""
|
||||||
|
# Create a job first
|
||||||
|
created_job = await job_service.create_job(sample_document_id, sample_task_id)
|
||||||
|
|
||||||
|
# Execute
|
||||||
|
result = await job_service.get_job_by_id(created_job.id)
|
||||||
|
|
||||||
|
# Verify
|
||||||
|
assert result is not None
|
||||||
|
assert result.id == created_job.id
|
||||||
|
assert result.document_id == created_job.document_id
|
||||||
|
assert result.task_id == created_job.task_id
|
||||||
|
assert result.status == created_job.status
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_i_can_get_jobs_by_status(
|
||||||
|
self,
|
||||||
|
job_service,
|
||||||
|
sample_document_id
|
||||||
|
):
|
||||||
|
"""Test retrieving jobs by status."""
|
||||||
|
# Create jobs with different statuses
|
||||||
|
pending_job = await job_service.create_job(sample_document_id, "pending-task")
|
||||||
|
|
||||||
|
processing_job = await job_service.create_job(ObjectId(), "processing-task")
|
||||||
|
await job_service.mark_job_as_started(processing_job.id)
|
||||||
|
|
||||||
|
completed_job = await job_service.create_job(ObjectId(), "completed-task")
|
||||||
|
await job_service.mark_job_as_started(completed_job.id)
|
||||||
|
await job_service.mark_job_as_completed(completed_job.id)
|
||||||
|
|
||||||
|
# Execute - get pending jobs
|
||||||
|
pending_results = await job_service.get_jobs_by_status(ProcessingStatus.PENDING)
|
||||||
|
|
||||||
|
# Verify
|
||||||
|
assert len(pending_results) == 1
|
||||||
|
assert pending_results[0].id == pending_job.id
|
||||||
|
assert pending_results[0].status == ProcessingStatus.PENDING
|
||||||
|
|
||||||
|
# Execute - get processing jobs
|
||||||
|
processing_results = await job_service.get_jobs_by_status(ProcessingStatus.PROCESSING)
|
||||||
|
assert len(processing_results) == 1
|
||||||
|
assert processing_results[0].status == ProcessingStatus.PROCESSING
|
||||||
|
|
||||||
|
# Execute - get completed jobs
|
||||||
|
completed_results = await job_service.get_jobs_by_status(ProcessingStatus.COMPLETED)
|
||||||
|
assert len(completed_results) == 1
|
||||||
|
assert completed_results[0].status == ProcessingStatus.COMPLETED
|
||||||
|
|
||||||
|
|
||||||
|
class TestUpdateStatus:
|
||||||
|
"""Tests for mark_job_as_started method."""
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_i_can_mark_pending_job_as_started(
|
||||||
|
self,
|
||||||
|
job_service,
|
||||||
|
sample_document_id,
|
||||||
|
sample_task_id
|
||||||
|
):
|
||||||
|
"""Test marking pending job as started (PENDING → PROCESSING)."""
|
||||||
|
# Create a pending job
|
||||||
|
created_job = await job_service.create_job(sample_document_id, sample_task_id)
|
||||||
|
assert created_job.status == ProcessingStatus.PENDING
|
||||||
|
|
||||||
|
# Execute
|
||||||
|
result = await job_service.mark_job_as_started(created_job.id)
|
||||||
|
|
||||||
|
# Verify status transition
|
||||||
|
assert result is not None
|
||||||
|
assert result.id == created_job.id
|
||||||
|
assert result.status == ProcessingStatus.PROCESSING
|
||||||
|
|
||||||
|
# Verify in database
|
||||||
|
updated_job = await job_service.get_job_by_id(created_job.id)
|
||||||
|
assert updated_job.status == ProcessingStatus.PROCESSING
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_i_cannot_mark_processing_job_as_started(
|
||||||
|
self,
|
||||||
|
job_service,
|
||||||
|
sample_document_id,
|
||||||
|
sample_task_id
|
||||||
|
):
|
||||||
|
"""Test that processing job cannot be marked as started."""
|
||||||
|
# Create and start a job
|
||||||
|
created_job = await job_service.create_job(sample_document_id, sample_task_id)
|
||||||
|
await job_service.mark_job_as_started(created_job.id)
|
||||||
|
|
||||||
|
# Try to start it again
|
||||||
|
with pytest.raises(InvalidStatusTransitionError) as exc_info:
|
||||||
|
await job_service.mark_job_as_started(created_job.id)
|
||||||
|
|
||||||
|
# Verify exception details
|
||||||
|
assert exc_info.value.current_status == ProcessingStatus.PROCESSING
|
||||||
|
assert exc_info.value.target_status == ProcessingStatus.PROCESSING
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_i_cannot_mark_completed_job_as_started(
|
||||||
|
self,
|
||||||
|
job_service,
|
||||||
|
sample_document_id,
|
||||||
|
sample_task_id
|
||||||
|
):
|
||||||
|
"""Test that completed job cannot be marked as started."""
|
||||||
|
# Create, start, and complete a job
|
||||||
|
created_job = await job_service.create_job(sample_document_id, sample_task_id)
|
||||||
|
await job_service.mark_job_as_started(created_job.id)
|
||||||
|
await job_service.mark_job_as_completed(created_job.id)
|
||||||
|
|
||||||
|
# Try to start it again
|
||||||
|
with pytest.raises(InvalidStatusTransitionError) as exc_info:
|
||||||
|
await job_service.mark_job_as_started(created_job.id)
|
||||||
|
|
||||||
|
# Verify exception details
|
||||||
|
assert exc_info.value.current_status == ProcessingStatus.COMPLETED
|
||||||
|
assert exc_info.value.target_status == ProcessingStatus.PROCESSING
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_i_cannot_mark_failed_job_as_started(
|
||||||
|
self,
|
||||||
|
job_service,
|
||||||
|
sample_document_id,
|
||||||
|
sample_task_id
|
||||||
|
):
|
||||||
|
"""Test that failed job cannot be marked as started."""
|
||||||
|
# Create, start, and fail a job
|
||||||
|
created_job = await job_service.create_job(sample_document_id, sample_task_id)
|
||||||
|
await job_service.mark_job_as_started(created_job.id)
|
||||||
|
await job_service.mark_job_as_failed(created_job.id, "Test error")
|
||||||
|
|
||||||
|
# Try to start it again
|
||||||
|
with pytest.raises(InvalidStatusTransitionError) as exc_info:
|
||||||
|
await job_service.mark_job_as_started(created_job.id)
|
||||||
|
|
||||||
|
# Verify exception details
|
||||||
|
assert exc_info.value.current_status == ProcessingStatus.FAILED
|
||||||
|
assert exc_info.value.target_status == ProcessingStatus.PROCESSING
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_i_can_mark_processing_job_as_completed(
|
||||||
|
self,
|
||||||
|
job_service,
|
||||||
|
sample_document_id,
|
||||||
|
sample_task_id
|
||||||
|
):
|
||||||
|
"""Test marking processing job as completed (PROCESSING → COMPLETED)."""
|
||||||
|
# Create and start a job
|
||||||
|
created_job = await job_service.create_job(sample_document_id, sample_task_id)
|
||||||
|
started_job = await job_service.mark_job_as_started(created_job.id)
|
||||||
|
|
||||||
|
# Execute
|
||||||
|
result = await job_service.mark_job_as_completed(created_job.id)
|
||||||
|
|
||||||
|
# Verify status transition
|
||||||
|
assert result is not None
|
||||||
|
assert result.id == created_job.id
|
||||||
|
assert result.status == ProcessingStatus.COMPLETED
|
||||||
|
|
||||||
|
# Verify in database
|
||||||
|
updated_job = await job_service.get_job_by_id(created_job.id)
|
||||||
|
assert updated_job.status == ProcessingStatus.COMPLETED
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_i_cannot_mark_pending_job_as_completed(
|
||||||
|
self,
|
||||||
|
job_service,
|
||||||
|
sample_document_id,
|
||||||
|
sample_task_id
|
||||||
|
):
|
||||||
|
"""Test that pending job cannot be marked as completed."""
|
||||||
|
# Create a pending job
|
||||||
|
created_job = await job_service.create_job(sample_document_id, sample_task_id)
|
||||||
|
|
||||||
|
# Try to complete it directly
|
||||||
|
with pytest.raises(InvalidStatusTransitionError) as exc_info:
|
||||||
|
await job_service.mark_job_as_completed(created_job.id)
|
||||||
|
|
||||||
|
# Verify exception details
|
||||||
|
assert exc_info.value.current_status == ProcessingStatus.PENDING
|
||||||
|
assert exc_info.value.target_status == ProcessingStatus.COMPLETED
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_i_cannot_mark_completed_job_as_completed(
|
||||||
|
self,
|
||||||
|
job_service,
|
||||||
|
sample_document_id,
|
||||||
|
sample_task_id
|
||||||
|
):
|
||||||
|
"""Test that completed job cannot be marked as completed again."""
|
||||||
|
# Create, start, and complete a job
|
||||||
|
created_job = await job_service.create_job(sample_document_id, sample_task_id)
|
||||||
|
await job_service.mark_job_as_started(created_job.id)
|
||||||
|
await job_service.mark_job_as_completed(created_job.id)
|
||||||
|
|
||||||
|
# Try to complete it again
|
||||||
|
with pytest.raises(InvalidStatusTransitionError) as exc_info:
|
||||||
|
await job_service.mark_job_as_completed(created_job.id)
|
||||||
|
|
||||||
|
# Verify exception details
|
||||||
|
assert exc_info.value.current_status == ProcessingStatus.COMPLETED
|
||||||
|
assert exc_info.value.target_status == ProcessingStatus.COMPLETED
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_i_cannot_mark_failed_job_as_completed(
|
||||||
|
self,
|
||||||
|
job_service,
|
||||||
|
sample_document_id,
|
||||||
|
sample_task_id
|
||||||
|
):
|
||||||
|
"""Test that failed job cannot be marked as completed."""
|
||||||
|
# Create, start, and fail a job
|
||||||
|
created_job = await job_service.create_job(sample_document_id, sample_task_id)
|
||||||
|
await job_service.mark_job_as_started(created_job.id)
|
||||||
|
await job_service.mark_job_as_failed(created_job.id, "Test error")
|
||||||
|
|
||||||
|
# Try to complete it
|
||||||
|
with pytest.raises(InvalidStatusTransitionError) as exc_info:
|
||||||
|
await job_service.mark_job_as_completed(created_job.id)
|
||||||
|
|
||||||
|
# Verify exception details
|
||||||
|
assert exc_info.value.current_status == ProcessingStatus.FAILED
|
||||||
|
assert exc_info.value.target_status == ProcessingStatus.COMPLETED
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_i_can_mark_processing_job_as_failed_with_error_message(
|
||||||
|
self,
|
||||||
|
job_service,
|
||||||
|
sample_document_id,
|
||||||
|
sample_task_id
|
||||||
|
):
|
||||||
|
"""Test marking processing job as failed with error message."""
|
||||||
|
# Create and start a job
|
||||||
|
created_job = await job_service.create_job(sample_document_id, sample_task_id)
|
||||||
|
started_job = await job_service.mark_job_as_started(created_job.id)
|
||||||
|
|
||||||
|
error_message = "Processing failed due to invalid file format"
|
||||||
|
|
||||||
|
# Execute
|
||||||
|
result = await job_service.mark_job_as_failed(created_job.id, error_message)
|
||||||
|
|
||||||
|
# Verify status transition
|
||||||
|
assert result is not None
|
||||||
|
assert result.id == created_job.id
|
||||||
|
assert result.status == ProcessingStatus.FAILED
|
||||||
|
assert result.error_message == error_message
|
||||||
|
|
||||||
|
# Verify in database
|
||||||
|
updated_job = await job_service.get_job_by_id(created_job.id)
|
||||||
|
assert updated_job.status == ProcessingStatus.FAILED
|
||||||
|
assert updated_job.error_message == error_message
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_i_can_mark_processing_job_as_failed_without_error_message(
|
||||||
|
self,
|
||||||
|
job_service,
|
||||||
|
sample_document_id,
|
||||||
|
sample_task_id
|
||||||
|
):
|
||||||
|
"""Test marking processing job as failed without error message."""
|
||||||
|
# Create and start a job
|
||||||
|
created_job = await job_service.create_job(sample_document_id, sample_task_id)
|
||||||
|
await job_service.mark_job_as_started(created_job.id)
|
||||||
|
|
||||||
|
# Execute without error message
|
||||||
|
result = await job_service.mark_job_as_failed(created_job.id)
|
||||||
|
|
||||||
|
# Verify status transition
|
||||||
|
assert result is not None
|
||||||
|
assert result.status == ProcessingStatus.FAILED
|
||||||
|
assert result.error_message is None
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_i_cannot_mark_pending_job_as_failed(
|
||||||
|
self,
|
||||||
|
job_service,
|
||||||
|
sample_document_id,
|
||||||
|
sample_task_id
|
||||||
|
):
|
||||||
|
"""Test that pending job cannot be marked as failed."""
|
||||||
|
# Create a pending job
|
||||||
|
created_job = await job_service.create_job(sample_document_id, sample_task_id)
|
||||||
|
|
||||||
|
# Try to fail it directly
|
||||||
|
with pytest.raises(InvalidStatusTransitionError) as exc_info:
|
||||||
|
await job_service.mark_job_as_failed(created_job.id, "Test error")
|
||||||
|
|
||||||
|
# Verify exception details
|
||||||
|
assert exc_info.value.current_status == ProcessingStatus.PENDING
|
||||||
|
assert exc_info.value.target_status == ProcessingStatus.FAILED
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_i_cannot_mark_completed_job_as_failed(
|
||||||
|
self,
|
||||||
|
job_service,
|
||||||
|
sample_document_id,
|
||||||
|
sample_task_id
|
||||||
|
):
|
||||||
|
"""Test that completed job cannot be marked as failed."""
|
||||||
|
# Create, start, and complete a job
|
||||||
|
created_job = await job_service.create_job(sample_document_id, sample_task_id)
|
||||||
|
await job_service.mark_job_as_started(created_job.id)
|
||||||
|
await job_service.mark_job_as_completed(created_job.id)
|
||||||
|
|
||||||
|
# Try to fail it
|
||||||
|
with pytest.raises(InvalidStatusTransitionError) as exc_info:
|
||||||
|
await job_service.mark_job_as_failed(created_job.id, "Test error")
|
||||||
|
|
||||||
|
# Verify exception details
|
||||||
|
assert exc_info.value.current_status == ProcessingStatus.COMPLETED
|
||||||
|
assert exc_info.value.target_status == ProcessingStatus.FAILED
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_i_cannot_mark_failed_job_as_failed(
|
||||||
|
self,
|
||||||
|
job_service,
|
||||||
|
sample_document_id,
|
||||||
|
sample_task_id
|
||||||
|
):
|
||||||
|
"""Test that failed job cannot be marked as failed again."""
|
||||||
|
# Create, start, and fail a job
|
||||||
|
created_job = await job_service.create_job(sample_document_id, sample_task_id)
|
||||||
|
await job_service.mark_job_as_started(created_job.id)
|
||||||
|
await job_service.mark_job_as_failed(created_job.id, "First error")
|
||||||
|
|
||||||
|
# Try to fail it again
|
||||||
|
with pytest.raises(InvalidStatusTransitionError) as exc_info:
|
||||||
|
await job_service.mark_job_as_failed(created_job.id, "Second error")
|
||||||
|
|
||||||
|
# Verify exception details
|
||||||
|
assert exc_info.value.current_status == ProcessingStatus.FAILED
|
||||||
|
assert exc_info.value.target_status == ProcessingStatus.FAILED
|
||||||
|
|
||||||
|
|
||||||
|
class TestDeleteJob:
|
||||||
|
"""Tests for delete_job method."""
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_i_can_delete_existing_job(
|
||||||
|
self,
|
||||||
|
job_service,
|
||||||
|
sample_document_id,
|
||||||
|
sample_task_id
|
||||||
|
):
|
||||||
|
"""Test deleting an existing job."""
|
||||||
|
# Create a job
|
||||||
|
created_job = await job_service.create_job(sample_document_id, sample_task_id)
|
||||||
|
|
||||||
|
# Verify job exists
|
||||||
|
job_before_delete = await job_service.get_job_by_id(created_job.id)
|
||||||
|
assert job_before_delete is not None
|
||||||
|
|
||||||
|
# Execute deletion
|
||||||
|
result = await job_service.delete_job(created_job.id)
|
||||||
|
|
||||||
|
# Verify deletion
|
||||||
|
assert result is True
|
||||||
|
|
||||||
|
# Verify job no longer exists
|
||||||
|
deleted_job = await job_service.get_job_by_id(created_job.id)
|
||||||
|
assert deleted_job is None
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_i_cannot_delete_nonexistent_job(
|
||||||
|
self,
|
||||||
|
job_service
|
||||||
|
):
|
||||||
|
"""Test deleting a nonexistent job returns False."""
|
||||||
|
# Execute deletion with random ObjectId
|
||||||
|
result = await job_service.delete_job(ObjectId())
|
||||||
|
|
||||||
|
# Verify
|
||||||
|
assert result is False
|
||||||
|
|
||||||
|
|
||||||
|
class TestStatusTransitionValidation:
|
||||||
|
"""Tests for status transition validation across different scenarios."""
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_valid_job_lifecycle_flow(
|
||||||
|
self,
|
||||||
|
job_service,
|
||||||
|
sample_document_id,
|
||||||
|
sample_task_id
|
||||||
|
):
|
||||||
|
"""Test complete valid job lifecycle: PENDING → PROCESSING → COMPLETED."""
|
||||||
|
# Create job (PENDING)
|
||||||
|
job = await job_service.create_job(sample_document_id, sample_task_id)
|
||||||
|
assert job.status == ProcessingStatus.PENDING
|
||||||
|
|
||||||
|
# Start job (PENDING → PROCESSING)
|
||||||
|
started_job = await job_service.mark_job_as_started(job.id)
|
||||||
|
assert started_job.status == ProcessingStatus.PROCESSING
|
||||||
|
|
||||||
|
# Complete job (PROCESSING → COMPLETED)
|
||||||
|
completed_job = await job_service.mark_job_as_completed(job.id)
|
||||||
|
assert completed_job.status == ProcessingStatus.COMPLETED
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_valid_job_failure_flow(
|
||||||
|
self,
|
||||||
|
job_service,
|
||||||
|
sample_document_id,
|
||||||
|
sample_task_id
|
||||||
|
):
|
||||||
|
"""Test valid job failure: PENDING → PROCESSING → FAILED."""
|
||||||
|
# Create job (PENDING)
|
||||||
|
job = await job_service.create_job(sample_document_id, sample_task_id)
|
||||||
|
assert job.status == ProcessingStatus.PENDING
|
||||||
|
|
||||||
|
# Start job (PENDING → PROCESSING)
|
||||||
|
started_job = await job_service.mark_job_as_started(job.id)
|
||||||
|
assert started_job.status == ProcessingStatus.PROCESSING
|
||||||
|
|
||||||
|
# Fail job (PROCESSING → FAILED)
|
||||||
|
failed_job = await job_service.mark_job_as_failed(job.id, "Test failure")
|
||||||
|
assert failed_job.status == ProcessingStatus.FAILED
|
||||||
|
assert failed_job.error_message == "Test failure"
|
||||||
|
|
||||||
|
|
||||||
|
class TestEdgeCases:
|
||||||
|
"""Tests for edge cases and error conditions."""
|
||||||
|
#
|
||||||
|
# @pytest.mark.asyncio
|
||||||
|
# async def test_multiple_jobs_for_same_file(
|
||||||
|
# self,
|
||||||
|
# job_service,
|
||||||
|
# sample_document_id
|
||||||
|
# ):
|
||||||
|
# """Test handling multiple jobs for the same file."""
|
||||||
|
# # Create multiple jobs for same file
|
||||||
|
# job1 = await job_service.create_job(sample_document_id, "task-1")
|
||||||
|
# job2 = await job_service.create_job(sample_document_id, "task-2")
|
||||||
|
# job3 = await job_service.create_job(sample_document_id, "task-3")
|
||||||
|
#
|
||||||
|
# # Verify all jobs exist and are independent
|
||||||
|
# jobs_for_file = await job_service.get_jobs_by_file_id(sample_document_id)
|
||||||
|
# assert len(jobs_for_file) == 3
|
||||||
|
#
|
||||||
|
# job_ids = [job.id for job in jobs_for_file]
|
||||||
|
# assert job1.id in job_ids
|
||||||
|
# assert job2.id in job_ids
|
||||||
|
# assert job3.id in job_ids
|
||||||
|
#
|
||||||
|
# # Verify status transitions work independently
|
||||||
|
# await job_service.mark_job_as_started(job1.id)
|
||||||
|
# await job_service.mark_job_as_completed(job1.id)
|
||||||
|
#
|
||||||
|
# # Other jobs should still be pending
|
||||||
|
# updated_job2 = await job_service.get_job_by_id(job2.id)
|
||||||
|
# updated_job3 = await job_service.get_job_by_id(job3.id)
|
||||||
|
#
|
||||||
|
# assert updated_job2.status == ProcessingStatus.PENDING
|
||||||
|
# assert updated_job3.status == ProcessingStatus.PENDING
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_job_operations_with_empty_database(
|
||||||
|
self,
|
||||||
|
job_service
|
||||||
|
):
|
||||||
|
"""Test job operations when database is empty."""
|
||||||
|
# Try to get nonexistent job
|
||||||
|
result = await job_service.get_job_by_id(ObjectId())
|
||||||
|
assert result is None
|
||||||
|
|
||||||
|
|
||||||
|
# Try to get jobs by status when none exist
|
||||||
|
pending_jobs = await job_service.get_jobs_by_status(ProcessingStatus.PENDING)
|
||||||
|
assert pending_jobs == []
|
||||||
|
|
||||||
|
# Try to delete nonexistent job
|
||||||
|
delete_result = await job_service.delete_job(ObjectId())
|
||||||
|
assert delete_result is False
|
||||||
Reference in New Issue
Block a user