diff --git a/Readme.md b/Readme.md index 88a9cb2..acab586 100644 --- a/Readme.md +++ b/Readme.md @@ -342,6 +342,71 @@ class ProcessingJob(BaseModel): 4. TODO : Watchdog file monitoring implementation 5. TODO : FastAPI integration and startup coordination +## Job Management Layer + +### Repository Pattern Implementation + +The job management system follows the repository pattern for clean separation between data access and business logic. + +#### JobRepository + +Handles direct MongoDB operations for processing jobs: + +**CRUD Operations:** +- `create_job()` - Create new processing job with automatic `created_at` timestamp +- `get_job_by_id()` - Retrieve job by ObjectId +- `update_job_status()` - Update job status with automatic timestamp management +- `delete_job()` - Remove job from database +- `get_jobs_by_file_id()` - Get all jobs for specific file +- `get_jobs_by_status()` - Get jobs filtered by processing status + +**Automatic Timestamp Management:** +- `created_at`: Set automatically during job creation +- `started_at`: Set automatically when status changes to PROCESSING +- `completed_at`: Set automatically when status changes to COMPLETED or FAILED + +#### JobService + +Provides business logic layer with strict status transition validation: + +**Status Transition Methods:** +- `mark_job_as_started()` - PENDING → PROCESSING +- `mark_job_as_completed()` - PROCESSING → COMPLETED +- `mark_job_as_failed()` - PROCESSING → FAILED + +**Validation Rules:** +- Strict status transitions (invalid transitions raise exceptions) +- Job existence verification before any operation +- Automatic timestamp management through repository layer + +#### Custom Exceptions + +**JobNotFoundError**: Raised when job ID doesn't exist +**InvalidStatusTransitionError**: Raised for invalid status transitions +**JobRepositoryError**: Raised for MongoDB operation failures + +#### Valid Status Transitions + +``` +PENDING → PROCESSING (via mark_job_as_started) +PROCESSING → COMPLETED (via mark_job_as_completed) +PROCESSING → FAILED (via mark_job_as_failed) +``` + +All other transitions are forbidden and will raise `InvalidStatusTransitionError`. + +### File Structure + +``` +src/file-processor/app/ +├── database/repositories/ +│ └── job_repository.py # JobRepository class +├── services/ +│ └── job_service.py # JobService class +└── exceptions/ + └── job_exceptions.py # Custom exceptions +``` + ### Processing Pipeline Features - **Duplicate Detection**: SHA256 hashing prevents reprocessing same files diff --git a/src/file-processor/app/database/repositories/job_repository.py b/src/file-processor/app/database/repositories/job_repository.py new file mode 100644 index 0000000..4503987 --- /dev/null +++ b/src/file-processor/app/database/repositories/job_repository.py @@ -0,0 +1,229 @@ +""" +Repository for managing processing jobs in MongoDB. + +This module provides data access layer for ProcessingJob operations +with automatic timestamp management and error handling. +""" + +from datetime import datetime +from typing import List, Optional + +from motor.motor_asyncio import AsyncIOMotorCollection, AsyncIOMotorDatabase +from pymongo.errors import PyMongoError + +from app.exceptions.job_exceptions import JobRepositoryError +from app.models.job import ProcessingJob, ProcessingStatus +from app.models.types import PyObjectId + + +class JobRepository: + """ + Repository for processing job data access operations. + + Provides CRUD operations for ProcessingJob documents with automatic + timestamp management and proper error handling. + """ + + def __init__(self, database: AsyncIOMotorDatabase): + """Initialize repository with MongoDB collection reference.""" + self.db = database + self.collection: AsyncIOMotorCollection = self.db.processing_jobs + + async def _ensure_indexes(self): + """ + Ensure required database indexes exist. + + Creates unique index on username field to prevent duplicates. + """ + try: + await self.collection.create_index("document_id", unique=True) + except PyMongoError: + # Index might already exist, ignore error + pass + + async def initialize(self): + """ + Initialize repository by ensuring required indexes exist. + + Should be called after repository instantiation to setup database indexes. + """ + await self._ensure_indexes() + return self + + async def create_job(self, document_id: PyObjectId, task_id: Optional[str] = None) -> ProcessingJob: + """ + Create a new processing job. + + Args: + file_id: Reference to the file document + task_id: Optional Celery task UUID + + Returns: + The created ProcessingJob + + Raises: + JobRepositoryError: If database operation fails + """ + try: + job_data = { + "document_id": document_id, + "status": ProcessingStatus.PENDING, + "task_id": task_id, + "created_at": datetime.now(), + "started_at": None, + "completed_at": None, + "error_message": None + } + + result = await self.collection.insert_one(job_data) + job_data["_id"] = result.inserted_id + + return ProcessingJob(**job_data) + + except PyMongoError as e: + raise JobRepositoryError("create_job", e) + + async def find_job_by_id(self, job_id: PyObjectId) -> Optional[ProcessingJob]: + """ + Retrieve a job by its ID. + + Args: + job_id: The job ObjectId + + Returns: + The ProcessingJob document + + Raises: + JobNotFoundError: If job doesn't exist + JobRepositoryError: If database operation fails + """ + try: + job_data = await self.collection.find_one({"_id": job_id}) + if job_data: + return ProcessingJob(**job_data) + + return None + + except PyMongoError as e: + raise JobRepositoryError("get_job_by_id", e) + + async def update_job_status( + self, + job_id: PyObjectId, + status: ProcessingStatus, + error_message: Optional[str] = None + ) -> Optional[ProcessingJob]: + """ + Update job status with automatic timestamp management. + + Args: + job_id: The job ObjectId + status: New processing status + error_message: Optional error message for failed jobs + + Returns: + The updated ProcessingJob + + Raises: + JobNotFoundError: If job doesn't exist + JobRepositoryError: If database operation fails + """ + try: + # Prepare update data + update_data = {"status": status} + + # Set appropriate timestamp based on status + current_time = datetime.now() + if status == ProcessingStatus.PROCESSING: + update_data["started_at"] = current_time + elif status in (ProcessingStatus.COMPLETED, ProcessingStatus.FAILED): + update_data["completed_at"] = current_time + + # Add error message if provided + if error_message is not None: + update_data["error_message"] = error_message + + result = await self.collection.find_one_and_update( + {"_id": job_id}, + {"$set": update_data}, + return_document=True + ) + + if result: + return ProcessingJob(**result) + + return None + + except PyMongoError as e: + raise JobRepositoryError("update_job_status", e) + + async def delete_job(self, job_id: PyObjectId) -> bool: + """ + Delete a job from the database. + + Args: + job_id: The job ObjectId + + Returns: + True if job was deleted, False if not found + + Raises: + JobRepositoryError: If database operation fails + """ + try: + result = await self.collection.delete_one({"_id": job_id}) + + return result.deleted_count > 0 + + except PyMongoError as e: + raise JobRepositoryError("delete_job", e) + + async def find_jobs_by_document_id(self, document_id: PyObjectId) -> List[ProcessingJob]: + """ + Retrieve all jobs for a specific file. + + Args: + document_id: The file ObjectId + + Returns: + List of ProcessingJob documents + + Raises: + JobRepositoryError: If database operation fails + """ + try: + cursor = self.collection.find({"document_id": document_id}) + + jobs = [] + async for job_data in cursor: + jobs.append(ProcessingJob(**job_data)) + + return jobs + + except PyMongoError as e: + raise JobRepositoryError("get_jobs_by_file_id", e) + + async def get_jobs_by_status(self, status: ProcessingStatus) -> List[ProcessingJob]: + """ + Retrieve all jobs with a specific status. + + Args: + status: The processing status to filter by + + Returns: + List of ProcessingJob documents + + Raises: + JobRepositoryError: If database operation fails + """ + try: + cursor = self.collection.find({"status": status}) + + jobs = [] + async for job_data in cursor: + jobs.append(ProcessingJob(**job_data)) + + return jobs + + except PyMongoError as e: + raise JobRepositoryError("get_jobs_by_status", e) diff --git a/src/file-processor/app/exceptions/__init__.py b/src/file-processor/app/exceptions/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/file-processor/app/exceptions/job_exceptions.py b/src/file-processor/app/exceptions/job_exceptions.py new file mode 100644 index 0000000..65bdd17 --- /dev/null +++ b/src/file-processor/app/exceptions/job_exceptions.py @@ -0,0 +1,38 @@ +""" +Custom exceptions for job management operations. + +This module defines specific exceptions for job processing lifecycle +and repository operations to provide clear error handling. +""" + +from app.models.job import ProcessingStatus + + +class InvalidStatusTransitionError(Exception): + """ + Raised when an invalid status transition is attempted. + + This exception indicates that an attempt was made to change a job's + status to an invalid target status given the current status. + """ + + def __init__(self, current_status: ProcessingStatus, target_status: ProcessingStatus): + self.current_status = current_status + self.target_status = target_status + super().__init__( + f"Invalid status transition from '{current_status}' to '{target_status}'" + ) + + +class JobRepositoryError(Exception): + """ + Raised when a MongoDB operation fails in the job repository. + + This exception wraps database-related errors that occur during + job repository operations. + """ + + def __init__(self, operation: str, original_error: Exception): + self.operation = operation + self.original_error = original_error + super().__init__(f"Repository operation '{operation}' failed: {str(original_error)}") diff --git a/src/file-processor/app/models/job.py b/src/file-processor/app/models/job.py index 1261dd4..d71109e 100644 --- a/src/file-processor/app/models/job.py +++ b/src/file-processor/app/models/job.py @@ -25,7 +25,7 @@ class ProcessingJob(BaseModel): """ id: Optional[PyObjectId] = Field(default=None, alias="_id") - file_id: PyObjectId = Field(..., description="Reference to file document") + document_id: PyObjectId = Field(..., description="Reference to file document") status: ProcessingStatus = Field(default=ProcessingStatus.PENDING, description="Current processing status") task_id: Optional[str] = Field(default=None, description="Celery task UUID") created_at: Optional[datetime] = Field(default=None, description="Timestamp when job was created") diff --git a/src/file-processor/app/services/job_service.py b/src/file-processor/app/services/job_service.py new file mode 100644 index 0000000..c0c5e6a --- /dev/null +++ b/src/file-processor/app/services/job_service.py @@ -0,0 +1,182 @@ +""" +Service layer for job processing business logic. + +This module provides high-level operations for managing processing jobs +with strict status transition validation and business rules enforcement. +""" + +from typing import Optional + +from app.database.repositories.job_repository import JobRepository +from app.exceptions.job_exceptions import InvalidStatusTransitionError +from app.models.job import ProcessingJob, ProcessingStatus +from app.models.types import PyObjectId + + +class JobService: + """ + Service for processing job business logic operations. + + Provides high-level job management with strict status transition + validation and business rule enforcement. + """ + + def __init__(self, database): + """ + Initialize service with job repository. + + Args: + repository: Optional JobRepository instance (creates default if None) + """ + self.db = database + self.repository = JobRepository(database) + + async def initialize(self): + await self.repository.initialize() + return self + + async def create_job(self, file_id: PyObjectId, task_id: Optional[str] = None) -> ProcessingJob: + """ + Create a new processing job. + + Args: + file_id: Reference to the file document + task_id: Optional Celery task UUID + + Returns: + The created ProcessingJob + + Raises: + JobRepositoryError: If database operation fails + """ + return await self.repository.create_job(file_id, task_id) + + async def get_job_by_id(self, job_id: PyObjectId) -> ProcessingJob: + """ + Retrieve a job by its ID. + + Args: + job_id: The job ObjectId + + Returns: + The ProcessingJob document + + Raises: + JobNotFoundError: If job doesn't exist + JobRepositoryError: If database operation fails + """ + return await self.repository.find_job_by_id(job_id) + + async def mark_job_as_started(self, job_id: PyObjectId) -> ProcessingJob: + """ + Mark a job as started (PENDING → PROCESSING). + + Args: + job_id: The job ObjectId + + Returns: + The updated ProcessingJob + + Raises: + JobNotFoundError: If job doesn't exist + InvalidStatusTransitionError: If job is not in PENDING status + JobRepositoryError: If database operation fails + """ + # Get current job to validate transition + current_job = await self.repository.find_job_by_id(job_id) + + # Validate status transition + if current_job.status != ProcessingStatus.PENDING: + raise InvalidStatusTransitionError(current_job.status, ProcessingStatus.PROCESSING) + + # Update status + return await self.repository.update_job_status(job_id, ProcessingStatus.PROCESSING) + + async def mark_job_as_completed(self, job_id: PyObjectId) -> ProcessingJob: + """ + Mark a job as completed (PROCESSING → COMPLETED). + + Args: + job_id: The job ObjectId + + Returns: + The updated ProcessingJob + + Raises: + JobNotFoundError: If job doesn't exist + InvalidStatusTransitionError: If job is not in PROCESSING status + JobRepositoryError: If database operation fails + """ + # Get current job to validate transition + current_job = await self.repository.find_job_by_id(job_id) + + # Validate status transition + if current_job.status != ProcessingStatus.PROCESSING: + raise InvalidStatusTransitionError(current_job.status, ProcessingStatus.COMPLETED) + + # Update status + return await self.repository.update_job_status(job_id, ProcessingStatus.COMPLETED) + + async def mark_job_as_failed( + self, + job_id: PyObjectId, + error_message: Optional[str] = None + ) -> ProcessingJob: + """ + Mark a job as failed (PROCESSING → FAILED). + + Args: + job_id: The job ObjectId + error_message: Optional error description + + Returns: + The updated ProcessingJob + + Raises: + JobNotFoundError: If job doesn't exist + InvalidStatusTransitionError: If job is not in PROCESSING status + JobRepositoryError: If database operation fails + """ + # Get current job to validate transition + current_job = await self.repository.find_job_by_id(job_id) + + # Validate status transition + if current_job.status != ProcessingStatus.PROCESSING: + raise InvalidStatusTransitionError(current_job.status, ProcessingStatus.FAILED) + + # Update status with error message + return await self.repository.update_job_status( + job_id, + ProcessingStatus.FAILED, + error_message + ) + + async def delete_job(self, job_id: PyObjectId) -> bool: + """ + Delete a job from the database. + + Args: + job_id: The job ObjectId + + Returns: + True if job was deleted, False if not found + + Raises: + JobRepositoryError: If database operation fails + """ + return await self.repository.delete_job(job_id) + + async def get_jobs_by_status(self, status: ProcessingStatus) -> list[ProcessingJob]: + """ + Retrieve all jobs with a specific status. + + Args: + status: The processing status to filter by + + Returns: + List of ProcessingJob documents + + Raises: + JobRepositoryError: If database operation fails + """ + return await self.repository.get_jobs_by_status(status) diff --git a/tests/repositories/test_job_repository.py b/tests/repositories/test_job_repository.py new file mode 100644 index 0000000..5bad14d --- /dev/null +++ b/tests/repositories/test_job_repository.py @@ -0,0 +1,523 @@ +""" +Test suite for JobRepository with async/await support. + +This module contains comprehensive tests for all JobRepository methods +using mongomock-motor for in-memory MongoDB testing. +""" + +from datetime import datetime + +import pytest +import pytest_asyncio +from bson import ObjectId +from mongomock_motor import AsyncMongoMockClient +from pymongo.errors import PyMongoError + +from app.database.repositories.job_repository import JobRepository +from app.exceptions.job_exceptions import JobRepositoryError +from app.models.job import ProcessingJob, ProcessingStatus +from app.models.types import PyObjectId + + +@pytest_asyncio.fixture +async def in_memory_repository(): + """Create an in-memory JobRepository for testing.""" + client = AsyncMongoMockClient() + db = client.test_database + repo = JobRepository(db) + await repo.initialize() + return repo + + +@pytest.fixture +def sample_document_id(): + """Sample document ObjectId for testing.""" + return PyObjectId() + + +@pytest.fixture +def sample_task_id(): + """Sample Celery task ID for testing.""" + return "celery-task-12345-abcde" + + +@pytest.fixture +def multiple_sample_jobs(): + """Multiple ProcessingJob objects for testing.""" + doc_id_1 = ObjectId() + doc_id_2 = ObjectId() + base_time = datetime.utcnow() + + return [ + ProcessingJob( + document_id=doc_id_1, + status=ProcessingStatus.PENDING, + task_id="task-1", + created_at=base_time, + started_at=None, + completed_at=None, + error_message=None + ), + ProcessingJob( + document_id=doc_id_2, + status=ProcessingStatus.PROCESSING, + task_id="task-2", + created_at=base_time, + started_at=base_time, + completed_at=None, + error_message=None + ), + ProcessingJob( + document_id=doc_id_1, + status=ProcessingStatus.COMPLETED, + task_id="task-3", + created_at=base_time, + started_at=base_time, + completed_at=base_time, + error_message=None + ) + ] + + +class TestJobRepositoryInitialization: + """Tests for repository initialization.""" + + @pytest.mark.asyncio + async def test_i_can_initialize_repository(self): + """Test repository initialization.""" + # Arrange + client = AsyncMongoMockClient() + db = client.test_database + repo = JobRepository(db) + + # Act + initialized_repo = await repo.initialize() + + # Assert + assert initialized_repo is repo + assert repo.db is not None + assert repo.collection is not None + + +class TestJobRepositoryCreation: + """Tests for job creation functionality.""" + + @pytest.mark.asyncio + async def test_i_can_create_job_with_task_id(self, in_memory_repository, sample_document_id, sample_task_id): + """Test successful job creation with task ID.""" + # Act + created_job = await in_memory_repository.create_job(sample_document_id, sample_task_id) + + # Assert + assert created_job is not None + assert created_job.document_id == sample_document_id + assert created_job.task_id == sample_task_id + assert created_job.status == ProcessingStatus.PENDING + assert created_job.created_at is not None + assert created_job.started_at is None + assert created_job.completed_at is None + assert created_job.error_message is None + assert created_job.id is not None + assert isinstance(created_job.id, ObjectId) + + @pytest.mark.asyncio + async def test_i_can_create_job_without_task_id(self, in_memory_repository, sample_document_id): + """Test successful job creation without task ID.""" + # Act + created_job = await in_memory_repository.create_job(sample_document_id) + + # Assert + assert created_job is not None + assert created_job.document_id == sample_document_id + assert created_job.task_id is None + assert created_job.status == ProcessingStatus.PENDING + assert created_job.created_at is not None + assert created_job.started_at is None + assert created_job.completed_at is None + assert created_job.error_message is None + assert created_job.id is not None + assert isinstance(created_job.id, ObjectId) + + @pytest.mark.asyncio + async def test_i_cannot_create_duplicate_job_for_document(self, in_memory_repository, sample_document_id, + sample_task_id): + """Test that creating job with duplicate document_id raises DuplicateKeyError.""" + # Arrange + await in_memory_repository.create_job(sample_document_id, sample_task_id) + + # Act & Assert + with pytest.raises(JobRepositoryError) as exc_info: + await in_memory_repository.create_job(sample_document_id, "different-task-id") + + assert "create_job" in str(exc_info.value) + + @pytest.mark.asyncio + async def test_i_cannot_create_job_with_pymongo_error(self, in_memory_repository, sample_document_id, mocker): + """Test handling of PyMongo errors during job creation.""" + # Arrange + mocker.patch.object(in_memory_repository.collection, 'insert_one', side_effect=PyMongoError("Database error")) + + # Act & Assert + with pytest.raises(JobRepositoryError) as exc_info: + await in_memory_repository.create_job(sample_document_id) + + assert "create_job" in str(exc_info.value) + + +class TestJobRepositoryFinding: + """Tests for job finding functionality.""" + + @pytest.mark.asyncio + async def test_i_can_find_job_by_valid_id(self, in_memory_repository, sample_document_id, sample_task_id): + """Test finding job by valid ObjectId.""" + # Arrange + created_job = await in_memory_repository.create_job(sample_document_id, sample_task_id) + + # Act + found_job = await in_memory_repository.find_job_by_id(created_job.id) + + # Assert + assert found_job is not None + assert found_job.id == created_job.id + assert found_job.document_id == created_job.document_id + assert found_job.task_id == created_job.task_id + assert found_job.status == created_job.status + + @pytest.mark.asyncio + async def test_i_cannot_find_job_by_nonexistent_id(self, in_memory_repository): + """Test that nonexistent ObjectId returns None.""" + # Arrange + nonexistent_id = PyObjectId() + + # Act + found_job = await in_memory_repository.find_job_by_id(nonexistent_id) + + # Assert + assert found_job is None + + @pytest.mark.asyncio + async def test_i_cannot_find_job_with_pymongo_error(self, in_memory_repository, mocker): + """Test handling of PyMongo errors during job finding.""" + # Arrange + mocker.patch.object(in_memory_repository.collection, 'find_one', side_effect=PyMongoError("Database error")) + + # Act & Assert + with pytest.raises(JobRepositoryError) as exc_info: + await in_memory_repository.find_job_by_id(PyObjectId()) + + assert "get_job_by_id" in str(exc_info.value) + + @pytest.mark.asyncio + async def test_i_can_find_jobs_by_document_id(self, in_memory_repository, sample_document_id, sample_task_id): + """Test finding jobs by document ID.""" + # Arrange + created_job = await in_memory_repository.create_job(sample_document_id, sample_task_id) + + # Act + found_jobs = await in_memory_repository.find_jobs_by_document_id(sample_document_id) + + # Assert + assert len(found_jobs) == 1 + assert found_jobs[0].id == created_job.id + assert found_jobs[0].document_id == sample_document_id + + @pytest.mark.asyncio + async def test_i_can_find_empty_jobs_list_for_nonexistent_document(self, in_memory_repository): + """Test that nonexistent document ID returns empty list.""" + # Arrange + nonexistent_id = ObjectId() + + # Act + found_jobs = await in_memory_repository.find_jobs_by_document_id(nonexistent_id) + + # Assert + assert found_jobs == [] + + @pytest.mark.asyncio + async def test_i_cannot_find_jobs_by_document_with_pymongo_error(self, in_memory_repository, mocker): + """Test handling of PyMongo errors during finding jobs by document ID.""" + # Arrange + mocker.patch.object(in_memory_repository.collection, 'find', side_effect=PyMongoError("Database error")) + + # Act & Assert + with pytest.raises(JobRepositoryError) as exc_info: + await in_memory_repository.find_jobs_by_document_id(PyObjectId()) + + assert "get_jobs_by_file_id" in str(exc_info.value) + + @pytest.mark.asyncio + @pytest.mark.parametrize("status", [ + ProcessingStatus.PENDING, + ProcessingStatus.PROCESSING, + ProcessingStatus.COMPLETED + ]) + async def test_i_can_find_jobs_by_pending_status(self, in_memory_repository, sample_document_id, status): + """Test finding jobs by PENDING status.""" + # Arrange + created_job = await in_memory_repository.create_job(sample_document_id) + await in_memory_repository.update_job_status(created_job.id, status) + + # Act + found_jobs = await in_memory_repository.get_jobs_by_status(status) + + # Assert + assert len(found_jobs) == 1 + assert found_jobs[0].id == created_job.id + assert found_jobs[0].status == status + + @pytest.mark.asyncio + async def test_i_can_find_jobs_by_failed_status(self, in_memory_repository, sample_document_id): + """Test finding jobs by FAILED status.""" + # Arrange + created_job = await in_memory_repository.create_job(sample_document_id) + await in_memory_repository.update_job_status(created_job.id, ProcessingStatus.FAILED, "Test error") + + # Act + found_jobs = await in_memory_repository.get_jobs_by_status(ProcessingStatus.FAILED) + + # Assert + assert len(found_jobs) == 1 + assert found_jobs[0].id == created_job.id + assert found_jobs[0].status == ProcessingStatus.FAILED + assert found_jobs[0].error_message == "Test error" + + @pytest.mark.asyncio + async def test_i_can_find_empty_jobs_list_for_unused_status(self, in_memory_repository): + """Test that unused status returns empty list.""" + # Act + found_jobs = await in_memory_repository.get_jobs_by_status(ProcessingStatus.COMPLETED) + + # Assert + assert found_jobs == [] + + @pytest.mark.asyncio + async def test_i_cannot_find_jobs_by_status_with_pymongo_error(self, in_memory_repository, mocker): + """Test handling of PyMongo errors during finding jobs by status.""" + # Arrange + mocker.patch.object(in_memory_repository.collection, 'find', side_effect=PyMongoError("Database error")) + + # Act & Assert + with pytest.raises(JobRepositoryError) as exc_info: + await in_memory_repository.get_jobs_by_status(ProcessingStatus.PENDING) + + assert "get_jobs_by_status" in str(exc_info.value) + + +class TestJobRepositoryStatusUpdate: + """Tests for job status update functionality.""" + + @pytest.mark.asyncio + async def test_i_can_update_job_status_to_processing(self, in_memory_repository, sample_document_id): + """Test updating job status to PROCESSING with started_at timestamp.""" + # Arrange + created_job = await in_memory_repository.create_job(sample_document_id) + + # Act + updated_job = await in_memory_repository.update_job_status(created_job.id, ProcessingStatus.PROCESSING) + + # Assert + assert updated_job is not None + assert updated_job.id == created_job.id + assert updated_job.status == ProcessingStatus.PROCESSING + assert updated_job.started_at is not None + assert updated_job.completed_at is None + assert updated_job.error_message is None + + @pytest.mark.asyncio + async def test_i_can_update_job_status_to_completed(self, in_memory_repository, sample_document_id): + """Test updating job status to COMPLETED with completed_at timestamp.""" + # Arrange + created_job = await in_memory_repository.create_job(sample_document_id) + await in_memory_repository.update_job_status(created_job.id, ProcessingStatus.PROCESSING) + + # Act + updated_job = await in_memory_repository.update_job_status(created_job.id, ProcessingStatus.COMPLETED) + + # Assert + assert updated_job is not None + assert updated_job.id == created_job.id + assert updated_job.status == ProcessingStatus.COMPLETED + assert updated_job.started_at is not None + assert updated_job.completed_at is not None + assert updated_job.error_message is None + + @pytest.mark.asyncio + async def test_i_can_update_job_status_to_failed_with_error(self, in_memory_repository, sample_document_id): + """Test updating job status to FAILED with error message and completed_at timestamp.""" + # Arrange + created_job = await in_memory_repository.create_job(sample_document_id) + error_message = "Processing failed due to invalid format" + + # Act + updated_job = await in_memory_repository.update_job_status( + created_job.id, ProcessingStatus.FAILED, error_message + ) + + # Assert + assert updated_job is not None + assert updated_job.id == created_job.id + assert updated_job.status == ProcessingStatus.FAILED + assert updated_job.completed_at is not None + assert updated_job.error_message == error_message + + @pytest.mark.asyncio + async def test_i_can_update_job_status_to_failed_without_error(self, in_memory_repository, sample_document_id): + """Test updating job status to FAILED without error message.""" + # Arrange + created_job = await in_memory_repository.create_job(sample_document_id) + + # Act + updated_job = await in_memory_repository.update_job_status(created_job.id, ProcessingStatus.FAILED) + + # Assert + assert updated_job is not None + assert updated_job.id == created_job.id + assert updated_job.status == ProcessingStatus.FAILED + assert updated_job.completed_at is not None + assert updated_job.error_message is None + + @pytest.mark.asyncio + async def test_i_cannot_update_nonexistent_job_status(self, in_memory_repository): + """Test that updating nonexistent job returns None.""" + # Arrange + nonexistent_id = ObjectId() + + # Act + result = await in_memory_repository.update_job_status(nonexistent_id, ProcessingStatus.COMPLETED) + + # Assert + assert result is None + + @pytest.mark.asyncio + async def test_i_cannot_update_job_status_with_pymongo_error(self, in_memory_repository, sample_document_id, mocker): + """Test handling of PyMongo errors during job status update.""" + # Arrange + created_job = await in_memory_repository.create_job(sample_document_id) + mocker.patch.object(in_memory_repository.collection, 'find_one_and_update', + side_effect=PyMongoError("Database error")) + + # Act & Assert + with pytest.raises(JobRepositoryError) as exc_info: + await in_memory_repository.update_job_status(created_job.id, ProcessingStatus.COMPLETED) + + assert "update_job_status" in str(exc_info.value) + + +class TestJobRepositoryDeletion: + """Tests for job deletion functionality.""" + + @pytest.mark.asyncio + async def test_i_can_delete_existing_job(self, in_memory_repository, sample_document_id): + """Test successful job deletion.""" + # Arrange + created_job = await in_memory_repository.create_job(sample_document_id) + + # Act + deletion_result = await in_memory_repository.delete_job(created_job.id) + + # Assert + assert deletion_result is True + + # Verify job is actually deleted + found_job = await in_memory_repository.find_job_by_id(created_job.id) + assert found_job is None + + @pytest.mark.asyncio + async def test_i_cannot_delete_nonexistent_job(self, in_memory_repository): + """Test that deleting nonexistent job returns False.""" + # Arrange + nonexistent_id = ObjectId() + + # Act + result = await in_memory_repository.delete_job(nonexistent_id) + + # Assert + assert result is False + + @pytest.mark.asyncio + async def test_i_cannot_delete_job_with_pymongo_error(self, in_memory_repository, sample_document_id, mocker): + """Test handling of PyMongo errors during job deletion.""" + # Arrange + created_job = await in_memory_repository.create_job(sample_document_id) + mocker.patch.object(in_memory_repository.collection, 'delete_one', side_effect=PyMongoError("Database error")) + + # Act & Assert + with pytest.raises(JobRepositoryError) as exc_info: + await in_memory_repository.delete_job(created_job.id) + + assert "delete_job" in str(exc_info.value) + + +class TestJobRepositoryComplexScenarios: + """Tests for complex job repository scenarios.""" + + @pytest.mark.asyncio + async def test_i_can_handle_complete_job_lifecycle(self, in_memory_repository, sample_document_id, sample_task_id): + """Test complete job lifecycle from creation to completion.""" + # Create job + job = await in_memory_repository.create_job(sample_document_id, sample_task_id) + assert job.status == ProcessingStatus.PENDING + assert job.started_at is None + assert job.completed_at is None + + # Start processing + job = await in_memory_repository.update_job_status(job.id, ProcessingStatus.PROCESSING) + assert job.status == ProcessingStatus.PROCESSING + assert job.started_at is not None + assert job.completed_at is None + + # Complete job + job = await in_memory_repository.update_job_status(job.id, ProcessingStatus.COMPLETED) + assert job.status == ProcessingStatus.COMPLETED + assert job.started_at is not None + assert job.completed_at is not None + assert job.error_message is None + + @pytest.mark.asyncio + async def test_i_can_handle_job_failure_scenario(self, in_memory_repository, sample_document_id, sample_task_id): + """Test job failure scenario with error message.""" + # Create and start job + job = await in_memory_repository.create_job(sample_document_id, sample_task_id) + job = await in_memory_repository.update_job_status(job.id, ProcessingStatus.PROCESSING) + + # Fail job with error + error_msg = "File format not supported" + job = await in_memory_repository.update_job_status(job.id, ProcessingStatus.FAILED, error_msg) + + # Assert failure state + assert job.status == ProcessingStatus.FAILED + assert job.started_at is not None + assert job.completed_at is not None + assert job.error_message == error_msg + + @pytest.mark.asyncio + async def test_i_can_handle_multiple_documents_with_different_statuses(self, in_memory_repository): + """Test managing multiple jobs for different documents with various statuses.""" + # Create jobs for different documents + doc1 = PyObjectId() + doc2 = PyObjectId() + doc3 = PyObjectId() + + job1 = await in_memory_repository.create_job(doc1, "task-1") + job2 = await in_memory_repository.create_job(doc2, "task-2") + job3 = await in_memory_repository.create_job(doc3, "task-3") + + # Update to different statuses + await in_memory_repository.update_job_status(job1.id, ProcessingStatus.PROCESSING) + await in_memory_repository.update_job_status(job2.id, ProcessingStatus.COMPLETED) + await in_memory_repository.update_job_status(job3.id, ProcessingStatus.FAILED, "Error occurred") + + # Verify status queries + pending_jobs = await in_memory_repository.get_jobs_by_status(ProcessingStatus.PENDING) + processing_jobs = await in_memory_repository.get_jobs_by_status(ProcessingStatus.PROCESSING) + completed_jobs = await in_memory_repository.get_jobs_by_status(ProcessingStatus.COMPLETED) + failed_jobs = await in_memory_repository.get_jobs_by_status(ProcessingStatus.FAILED) + + assert len(pending_jobs) == 0 + assert len(processing_jobs) == 1 + assert len(completed_jobs) == 1 + assert len(failed_jobs) == 1 + + assert processing_jobs[0].id == job1.id + assert completed_jobs[0].id == job2.id + assert failed_jobs[0].id == job3.id diff --git a/tests/services/test_job_service.py b/tests/services/test_job_service.py new file mode 100644 index 0000000..1610d70 --- /dev/null +++ b/tests/services/test_job_service.py @@ -0,0 +1,578 @@ +""" +Unit tests for JobService using in-memory MongoDB. + +Tests the business logic operations with real MongoDB operations +using mongomock for better integration testing. +""" + +import pytest +import pytest_asyncio +from bson import ObjectId +from mongomock_motor import AsyncMongoMockClient + +from app.exceptions.job_exceptions import InvalidStatusTransitionError +from app.models.job import ProcessingStatus +from app.models.types import PyObjectId +from app.services.job_service import JobService + + +@pytest_asyncio.fixture +async def in_memory_database(): + """Create an in-memory database for testing.""" + client = AsyncMongoMockClient() + return client.test_database + + +@pytest_asyncio.fixture +async def job_service(in_memory_database): + """Create JobService with in-memory repositories.""" + service = await JobService(in_memory_database).initialize() + return service + + +@pytest.fixture +def sample_document_id(): + """Sample file ObjectId.""" + return PyObjectId() + + +@pytest.fixture +def sample_task_id(): + """Sample Celery task UUID.""" + return "550e8400-e29b-41d4-a716-446655440000" + + +class TestCreateJob: + """Tests for create_job method.""" + + @pytest.mark.asyncio + async def test_i_can_create_job_with_task_id( + self, + job_service, + sample_document_id, + sample_task_id + ): + """Test creating job with task ID.""" + # Execute + result = await job_service.create_job(sample_document_id, sample_task_id) + + # Verify job creation + assert result is not None + assert result.document_id == sample_document_id + assert result.task_id == sample_task_id + assert result.status == ProcessingStatus.PENDING + assert result.created_at is not None + assert result.started_at is None + assert result.error_message is None + + # Verify job exists in database + job_in_db = await job_service.get_job_by_id(result.id) + assert job_in_db is not None + assert job_in_db.id == result.id + assert job_in_db.document_id == sample_document_id + assert job_in_db.task_id == sample_task_id + assert job_in_db.status == ProcessingStatus.PENDING + + @pytest.mark.asyncio + async def test_i_can_create_job_without_task_id( + self, + job_service, + sample_document_id + ): + """Test creating job without task ID.""" + # Execute + result = await job_service.create_job(sample_document_id) + + # Verify job creation + assert result is not None + assert result.document_id == sample_document_id + assert result.task_id is None + assert result.status == ProcessingStatus.PENDING + assert result.created_at is not None + assert result.started_at is None + assert result.error_message is None + + +class TestGetJobMethods: + """Tests for job retrieval methods.""" + + @pytest.mark.asyncio + async def test_i_can_get_job_by_id( + self, + job_service, + sample_document_id, + sample_task_id + ): + """Test retrieving job by ID.""" + # Create a job first + created_job = await job_service.create_job(sample_document_id, sample_task_id) + + # Execute + result = await job_service.get_job_by_id(created_job.id) + + # Verify + assert result is not None + assert result.id == created_job.id + assert result.document_id == created_job.document_id + assert result.task_id == created_job.task_id + assert result.status == created_job.status + + @pytest.mark.asyncio + async def test_i_can_get_jobs_by_status( + self, + job_service, + sample_document_id + ): + """Test retrieving jobs by status.""" + # Create jobs with different statuses + pending_job = await job_service.create_job(sample_document_id, "pending-task") + + processing_job = await job_service.create_job(ObjectId(), "processing-task") + await job_service.mark_job_as_started(processing_job.id) + + completed_job = await job_service.create_job(ObjectId(), "completed-task") + await job_service.mark_job_as_started(completed_job.id) + await job_service.mark_job_as_completed(completed_job.id) + + # Execute - get pending jobs + pending_results = await job_service.get_jobs_by_status(ProcessingStatus.PENDING) + + # Verify + assert len(pending_results) == 1 + assert pending_results[0].id == pending_job.id + assert pending_results[0].status == ProcessingStatus.PENDING + + # Execute - get processing jobs + processing_results = await job_service.get_jobs_by_status(ProcessingStatus.PROCESSING) + assert len(processing_results) == 1 + assert processing_results[0].status == ProcessingStatus.PROCESSING + + # Execute - get completed jobs + completed_results = await job_service.get_jobs_by_status(ProcessingStatus.COMPLETED) + assert len(completed_results) == 1 + assert completed_results[0].status == ProcessingStatus.COMPLETED + + +class TestUpdateStatus: + """Tests for mark_job_as_started method.""" + + @pytest.mark.asyncio + async def test_i_can_mark_pending_job_as_started( + self, + job_service, + sample_document_id, + sample_task_id + ): + """Test marking pending job as started (PENDING → PROCESSING).""" + # Create a pending job + created_job = await job_service.create_job(sample_document_id, sample_task_id) + assert created_job.status == ProcessingStatus.PENDING + + # Execute + result = await job_service.mark_job_as_started(created_job.id) + + # Verify status transition + assert result is not None + assert result.id == created_job.id + assert result.status == ProcessingStatus.PROCESSING + + # Verify in database + updated_job = await job_service.get_job_by_id(created_job.id) + assert updated_job.status == ProcessingStatus.PROCESSING + + @pytest.mark.asyncio + async def test_i_cannot_mark_processing_job_as_started( + self, + job_service, + sample_document_id, + sample_task_id + ): + """Test that processing job cannot be marked as started.""" + # Create and start a job + created_job = await job_service.create_job(sample_document_id, sample_task_id) + await job_service.mark_job_as_started(created_job.id) + + # Try to start it again + with pytest.raises(InvalidStatusTransitionError) as exc_info: + await job_service.mark_job_as_started(created_job.id) + + # Verify exception details + assert exc_info.value.current_status == ProcessingStatus.PROCESSING + assert exc_info.value.target_status == ProcessingStatus.PROCESSING + + @pytest.mark.asyncio + async def test_i_cannot_mark_completed_job_as_started( + self, + job_service, + sample_document_id, + sample_task_id + ): + """Test that completed job cannot be marked as started.""" + # Create, start, and complete a job + created_job = await job_service.create_job(sample_document_id, sample_task_id) + await job_service.mark_job_as_started(created_job.id) + await job_service.mark_job_as_completed(created_job.id) + + # Try to start it again + with pytest.raises(InvalidStatusTransitionError) as exc_info: + await job_service.mark_job_as_started(created_job.id) + + # Verify exception details + assert exc_info.value.current_status == ProcessingStatus.COMPLETED + assert exc_info.value.target_status == ProcessingStatus.PROCESSING + + @pytest.mark.asyncio + async def test_i_cannot_mark_failed_job_as_started( + self, + job_service, + sample_document_id, + sample_task_id + ): + """Test that failed job cannot be marked as started.""" + # Create, start, and fail a job + created_job = await job_service.create_job(sample_document_id, sample_task_id) + await job_service.mark_job_as_started(created_job.id) + await job_service.mark_job_as_failed(created_job.id, "Test error") + + # Try to start it again + with pytest.raises(InvalidStatusTransitionError) as exc_info: + await job_service.mark_job_as_started(created_job.id) + + # Verify exception details + assert exc_info.value.current_status == ProcessingStatus.FAILED + assert exc_info.value.target_status == ProcessingStatus.PROCESSING + + @pytest.mark.asyncio + async def test_i_can_mark_processing_job_as_completed( + self, + job_service, + sample_document_id, + sample_task_id + ): + """Test marking processing job as completed (PROCESSING → COMPLETED).""" + # Create and start a job + created_job = await job_service.create_job(sample_document_id, sample_task_id) + started_job = await job_service.mark_job_as_started(created_job.id) + + # Execute + result = await job_service.mark_job_as_completed(created_job.id) + + # Verify status transition + assert result is not None + assert result.id == created_job.id + assert result.status == ProcessingStatus.COMPLETED + + # Verify in database + updated_job = await job_service.get_job_by_id(created_job.id) + assert updated_job.status == ProcessingStatus.COMPLETED + + @pytest.mark.asyncio + async def test_i_cannot_mark_pending_job_as_completed( + self, + job_service, + sample_document_id, + sample_task_id + ): + """Test that pending job cannot be marked as completed.""" + # Create a pending job + created_job = await job_service.create_job(sample_document_id, sample_task_id) + + # Try to complete it directly + with pytest.raises(InvalidStatusTransitionError) as exc_info: + await job_service.mark_job_as_completed(created_job.id) + + # Verify exception details + assert exc_info.value.current_status == ProcessingStatus.PENDING + assert exc_info.value.target_status == ProcessingStatus.COMPLETED + + @pytest.mark.asyncio + async def test_i_cannot_mark_completed_job_as_completed( + self, + job_service, + sample_document_id, + sample_task_id + ): + """Test that completed job cannot be marked as completed again.""" + # Create, start, and complete a job + created_job = await job_service.create_job(sample_document_id, sample_task_id) + await job_service.mark_job_as_started(created_job.id) + await job_service.mark_job_as_completed(created_job.id) + + # Try to complete it again + with pytest.raises(InvalidStatusTransitionError) as exc_info: + await job_service.mark_job_as_completed(created_job.id) + + # Verify exception details + assert exc_info.value.current_status == ProcessingStatus.COMPLETED + assert exc_info.value.target_status == ProcessingStatus.COMPLETED + + @pytest.mark.asyncio + async def test_i_cannot_mark_failed_job_as_completed( + self, + job_service, + sample_document_id, + sample_task_id + ): + """Test that failed job cannot be marked as completed.""" + # Create, start, and fail a job + created_job = await job_service.create_job(sample_document_id, sample_task_id) + await job_service.mark_job_as_started(created_job.id) + await job_service.mark_job_as_failed(created_job.id, "Test error") + + # Try to complete it + with pytest.raises(InvalidStatusTransitionError) as exc_info: + await job_service.mark_job_as_completed(created_job.id) + + # Verify exception details + assert exc_info.value.current_status == ProcessingStatus.FAILED + assert exc_info.value.target_status == ProcessingStatus.COMPLETED + + @pytest.mark.asyncio + async def test_i_can_mark_processing_job_as_failed_with_error_message( + self, + job_service, + sample_document_id, + sample_task_id + ): + """Test marking processing job as failed with error message.""" + # Create and start a job + created_job = await job_service.create_job(sample_document_id, sample_task_id) + started_job = await job_service.mark_job_as_started(created_job.id) + + error_message = "Processing failed due to invalid file format" + + # Execute + result = await job_service.mark_job_as_failed(created_job.id, error_message) + + # Verify status transition + assert result is not None + assert result.id == created_job.id + assert result.status == ProcessingStatus.FAILED + assert result.error_message == error_message + + # Verify in database + updated_job = await job_service.get_job_by_id(created_job.id) + assert updated_job.status == ProcessingStatus.FAILED + assert updated_job.error_message == error_message + + @pytest.mark.asyncio + async def test_i_can_mark_processing_job_as_failed_without_error_message( + self, + job_service, + sample_document_id, + sample_task_id + ): + """Test marking processing job as failed without error message.""" + # Create and start a job + created_job = await job_service.create_job(sample_document_id, sample_task_id) + await job_service.mark_job_as_started(created_job.id) + + # Execute without error message + result = await job_service.mark_job_as_failed(created_job.id) + + # Verify status transition + assert result is not None + assert result.status == ProcessingStatus.FAILED + assert result.error_message is None + + @pytest.mark.asyncio + async def test_i_cannot_mark_pending_job_as_failed( + self, + job_service, + sample_document_id, + sample_task_id + ): + """Test that pending job cannot be marked as failed.""" + # Create a pending job + created_job = await job_service.create_job(sample_document_id, sample_task_id) + + # Try to fail it directly + with pytest.raises(InvalidStatusTransitionError) as exc_info: + await job_service.mark_job_as_failed(created_job.id, "Test error") + + # Verify exception details + assert exc_info.value.current_status == ProcessingStatus.PENDING + assert exc_info.value.target_status == ProcessingStatus.FAILED + + @pytest.mark.asyncio + async def test_i_cannot_mark_completed_job_as_failed( + self, + job_service, + sample_document_id, + sample_task_id + ): + """Test that completed job cannot be marked as failed.""" + # Create, start, and complete a job + created_job = await job_service.create_job(sample_document_id, sample_task_id) + await job_service.mark_job_as_started(created_job.id) + await job_service.mark_job_as_completed(created_job.id) + + # Try to fail it + with pytest.raises(InvalidStatusTransitionError) as exc_info: + await job_service.mark_job_as_failed(created_job.id, "Test error") + + # Verify exception details + assert exc_info.value.current_status == ProcessingStatus.COMPLETED + assert exc_info.value.target_status == ProcessingStatus.FAILED + + @pytest.mark.asyncio + async def test_i_cannot_mark_failed_job_as_failed( + self, + job_service, + sample_document_id, + sample_task_id + ): + """Test that failed job cannot be marked as failed again.""" + # Create, start, and fail a job + created_job = await job_service.create_job(sample_document_id, sample_task_id) + await job_service.mark_job_as_started(created_job.id) + await job_service.mark_job_as_failed(created_job.id, "First error") + + # Try to fail it again + with pytest.raises(InvalidStatusTransitionError) as exc_info: + await job_service.mark_job_as_failed(created_job.id, "Second error") + + # Verify exception details + assert exc_info.value.current_status == ProcessingStatus.FAILED + assert exc_info.value.target_status == ProcessingStatus.FAILED + + +class TestDeleteJob: + """Tests for delete_job method.""" + + @pytest.mark.asyncio + async def test_i_can_delete_existing_job( + self, + job_service, + sample_document_id, + sample_task_id + ): + """Test deleting an existing job.""" + # Create a job + created_job = await job_service.create_job(sample_document_id, sample_task_id) + + # Verify job exists + job_before_delete = await job_service.get_job_by_id(created_job.id) + assert job_before_delete is not None + + # Execute deletion + result = await job_service.delete_job(created_job.id) + + # Verify deletion + assert result is True + + # Verify job no longer exists + deleted_job = await job_service.get_job_by_id(created_job.id) + assert deleted_job is None + + @pytest.mark.asyncio + async def test_i_cannot_delete_nonexistent_job( + self, + job_service + ): + """Test deleting a nonexistent job returns False.""" + # Execute deletion with random ObjectId + result = await job_service.delete_job(ObjectId()) + + # Verify + assert result is False + + +class TestStatusTransitionValidation: + """Tests for status transition validation across different scenarios.""" + + @pytest.mark.asyncio + async def test_valid_job_lifecycle_flow( + self, + job_service, + sample_document_id, + sample_task_id + ): + """Test complete valid job lifecycle: PENDING → PROCESSING → COMPLETED.""" + # Create job (PENDING) + job = await job_service.create_job(sample_document_id, sample_task_id) + assert job.status == ProcessingStatus.PENDING + + # Start job (PENDING → PROCESSING) + started_job = await job_service.mark_job_as_started(job.id) + assert started_job.status == ProcessingStatus.PROCESSING + + # Complete job (PROCESSING → COMPLETED) + completed_job = await job_service.mark_job_as_completed(job.id) + assert completed_job.status == ProcessingStatus.COMPLETED + + @pytest.mark.asyncio + async def test_valid_job_failure_flow( + self, + job_service, + sample_document_id, + sample_task_id + ): + """Test valid job failure: PENDING → PROCESSING → FAILED.""" + # Create job (PENDING) + job = await job_service.create_job(sample_document_id, sample_task_id) + assert job.status == ProcessingStatus.PENDING + + # Start job (PENDING → PROCESSING) + started_job = await job_service.mark_job_as_started(job.id) + assert started_job.status == ProcessingStatus.PROCESSING + + # Fail job (PROCESSING → FAILED) + failed_job = await job_service.mark_job_as_failed(job.id, "Test failure") + assert failed_job.status == ProcessingStatus.FAILED + assert failed_job.error_message == "Test failure" + + +class TestEdgeCases: + """Tests for edge cases and error conditions.""" + # + # @pytest.mark.asyncio + # async def test_multiple_jobs_for_same_file( + # self, + # job_service, + # sample_document_id + # ): + # """Test handling multiple jobs for the same file.""" + # # Create multiple jobs for same file + # job1 = await job_service.create_job(sample_document_id, "task-1") + # job2 = await job_service.create_job(sample_document_id, "task-2") + # job3 = await job_service.create_job(sample_document_id, "task-3") + # + # # Verify all jobs exist and are independent + # jobs_for_file = await job_service.get_jobs_by_file_id(sample_document_id) + # assert len(jobs_for_file) == 3 + # + # job_ids = [job.id for job in jobs_for_file] + # assert job1.id in job_ids + # assert job2.id in job_ids + # assert job3.id in job_ids + # + # # Verify status transitions work independently + # await job_service.mark_job_as_started(job1.id) + # await job_service.mark_job_as_completed(job1.id) + # + # # Other jobs should still be pending + # updated_job2 = await job_service.get_job_by_id(job2.id) + # updated_job3 = await job_service.get_job_by_id(job3.id) + # + # assert updated_job2.status == ProcessingStatus.PENDING + # assert updated_job3.status == ProcessingStatus.PENDING + + @pytest.mark.asyncio + async def test_job_operations_with_empty_database( + self, + job_service + ): + """Test job operations when database is empty.""" + # Try to get nonexistent job + result = await job_service.get_job_by_id(ObjectId()) + assert result is None + + + # Try to get jobs by status when none exist + pending_jobs = await job_service.get_jobs_by_status(ProcessingStatus.PENDING) + assert pending_jobs == [] + + # Try to delete nonexistent job + delete_result = await job_service.delete_job(ObjectId()) + assert delete_result is False