Added JobRepository and JobServices
This commit is contained in:
229
src/file-processor/app/database/repositories/job_repository.py
Normal file
229
src/file-processor/app/database/repositories/job_repository.py
Normal file
@@ -0,0 +1,229 @@
|
||||
"""
|
||||
Repository for managing processing jobs in MongoDB.
|
||||
|
||||
This module provides data access layer for ProcessingJob operations
|
||||
with automatic timestamp management and error handling.
|
||||
"""
|
||||
|
||||
from datetime import datetime
|
||||
from typing import List, Optional
|
||||
|
||||
from motor.motor_asyncio import AsyncIOMotorCollection, AsyncIOMotorDatabase
|
||||
from pymongo.errors import PyMongoError
|
||||
|
||||
from app.exceptions.job_exceptions import JobRepositoryError
|
||||
from app.models.job import ProcessingJob, ProcessingStatus
|
||||
from app.models.types import PyObjectId
|
||||
|
||||
|
||||
class JobRepository:
|
||||
"""
|
||||
Repository for processing job data access operations.
|
||||
|
||||
Provides CRUD operations for ProcessingJob documents with automatic
|
||||
timestamp management and proper error handling.
|
||||
"""
|
||||
|
||||
def __init__(self, database: AsyncIOMotorDatabase):
|
||||
"""Initialize repository with MongoDB collection reference."""
|
||||
self.db = database
|
||||
self.collection: AsyncIOMotorCollection = self.db.processing_jobs
|
||||
|
||||
async def _ensure_indexes(self):
|
||||
"""
|
||||
Ensure required database indexes exist.
|
||||
|
||||
Creates unique index on username field to prevent duplicates.
|
||||
"""
|
||||
try:
|
||||
await self.collection.create_index("document_id", unique=True)
|
||||
except PyMongoError:
|
||||
# Index might already exist, ignore error
|
||||
pass
|
||||
|
||||
async def initialize(self):
|
||||
"""
|
||||
Initialize repository by ensuring required indexes exist.
|
||||
|
||||
Should be called after repository instantiation to setup database indexes.
|
||||
"""
|
||||
await self._ensure_indexes()
|
||||
return self
|
||||
|
||||
async def create_job(self, document_id: PyObjectId, task_id: Optional[str] = None) -> ProcessingJob:
|
||||
"""
|
||||
Create a new processing job.
|
||||
|
||||
Args:
|
||||
file_id: Reference to the file document
|
||||
task_id: Optional Celery task UUID
|
||||
|
||||
Returns:
|
||||
The created ProcessingJob
|
||||
|
||||
Raises:
|
||||
JobRepositoryError: If database operation fails
|
||||
"""
|
||||
try:
|
||||
job_data = {
|
||||
"document_id": document_id,
|
||||
"status": ProcessingStatus.PENDING,
|
||||
"task_id": task_id,
|
||||
"created_at": datetime.now(),
|
||||
"started_at": None,
|
||||
"completed_at": None,
|
||||
"error_message": None
|
||||
}
|
||||
|
||||
result = await self.collection.insert_one(job_data)
|
||||
job_data["_id"] = result.inserted_id
|
||||
|
||||
return ProcessingJob(**job_data)
|
||||
|
||||
except PyMongoError as e:
|
||||
raise JobRepositoryError("create_job", e)
|
||||
|
||||
async def find_job_by_id(self, job_id: PyObjectId) -> Optional[ProcessingJob]:
|
||||
"""
|
||||
Retrieve a job by its ID.
|
||||
|
||||
Args:
|
||||
job_id: The job ObjectId
|
||||
|
||||
Returns:
|
||||
The ProcessingJob document
|
||||
|
||||
Raises:
|
||||
JobNotFoundError: If job doesn't exist
|
||||
JobRepositoryError: If database operation fails
|
||||
"""
|
||||
try:
|
||||
job_data = await self.collection.find_one({"_id": job_id})
|
||||
if job_data:
|
||||
return ProcessingJob(**job_data)
|
||||
|
||||
return None
|
||||
|
||||
except PyMongoError as e:
|
||||
raise JobRepositoryError("get_job_by_id", e)
|
||||
|
||||
async def update_job_status(
|
||||
self,
|
||||
job_id: PyObjectId,
|
||||
status: ProcessingStatus,
|
||||
error_message: Optional[str] = None
|
||||
) -> Optional[ProcessingJob]:
|
||||
"""
|
||||
Update job status with automatic timestamp management.
|
||||
|
||||
Args:
|
||||
job_id: The job ObjectId
|
||||
status: New processing status
|
||||
error_message: Optional error message for failed jobs
|
||||
|
||||
Returns:
|
||||
The updated ProcessingJob
|
||||
|
||||
Raises:
|
||||
JobNotFoundError: If job doesn't exist
|
||||
JobRepositoryError: If database operation fails
|
||||
"""
|
||||
try:
|
||||
# Prepare update data
|
||||
update_data = {"status": status}
|
||||
|
||||
# Set appropriate timestamp based on status
|
||||
current_time = datetime.now()
|
||||
if status == ProcessingStatus.PROCESSING:
|
||||
update_data["started_at"] = current_time
|
||||
elif status in (ProcessingStatus.COMPLETED, ProcessingStatus.FAILED):
|
||||
update_data["completed_at"] = current_time
|
||||
|
||||
# Add error message if provided
|
||||
if error_message is not None:
|
||||
update_data["error_message"] = error_message
|
||||
|
||||
result = await self.collection.find_one_and_update(
|
||||
{"_id": job_id},
|
||||
{"$set": update_data},
|
||||
return_document=True
|
||||
)
|
||||
|
||||
if result:
|
||||
return ProcessingJob(**result)
|
||||
|
||||
return None
|
||||
|
||||
except PyMongoError as e:
|
||||
raise JobRepositoryError("update_job_status", e)
|
||||
|
||||
async def delete_job(self, job_id: PyObjectId) -> bool:
|
||||
"""
|
||||
Delete a job from the database.
|
||||
|
||||
Args:
|
||||
job_id: The job ObjectId
|
||||
|
||||
Returns:
|
||||
True if job was deleted, False if not found
|
||||
|
||||
Raises:
|
||||
JobRepositoryError: If database operation fails
|
||||
"""
|
||||
try:
|
||||
result = await self.collection.delete_one({"_id": job_id})
|
||||
|
||||
return result.deleted_count > 0
|
||||
|
||||
except PyMongoError as e:
|
||||
raise JobRepositoryError("delete_job", e)
|
||||
|
||||
async def find_jobs_by_document_id(self, document_id: PyObjectId) -> List[ProcessingJob]:
|
||||
"""
|
||||
Retrieve all jobs for a specific file.
|
||||
|
||||
Args:
|
||||
document_id: The file ObjectId
|
||||
|
||||
Returns:
|
||||
List of ProcessingJob documents
|
||||
|
||||
Raises:
|
||||
JobRepositoryError: If database operation fails
|
||||
"""
|
||||
try:
|
||||
cursor = self.collection.find({"document_id": document_id})
|
||||
|
||||
jobs = []
|
||||
async for job_data in cursor:
|
||||
jobs.append(ProcessingJob(**job_data))
|
||||
|
||||
return jobs
|
||||
|
||||
except PyMongoError as e:
|
||||
raise JobRepositoryError("get_jobs_by_file_id", e)
|
||||
|
||||
async def get_jobs_by_status(self, status: ProcessingStatus) -> List[ProcessingJob]:
|
||||
"""
|
||||
Retrieve all jobs with a specific status.
|
||||
|
||||
Args:
|
||||
status: The processing status to filter by
|
||||
|
||||
Returns:
|
||||
List of ProcessingJob documents
|
||||
|
||||
Raises:
|
||||
JobRepositoryError: If database operation fails
|
||||
"""
|
||||
try:
|
||||
cursor = self.collection.find({"status": status})
|
||||
|
||||
jobs = []
|
||||
async for job_data in cursor:
|
||||
jobs.append(ProcessingJob(**job_data))
|
||||
|
||||
return jobs
|
||||
|
||||
except PyMongoError as e:
|
||||
raise JobRepositoryError("get_jobs_by_status", e)
|
||||
0
src/file-processor/app/exceptions/__init__.py
Normal file
0
src/file-processor/app/exceptions/__init__.py
Normal file
38
src/file-processor/app/exceptions/job_exceptions.py
Normal file
38
src/file-processor/app/exceptions/job_exceptions.py
Normal file
@@ -0,0 +1,38 @@
|
||||
"""
|
||||
Custom exceptions for job management operations.
|
||||
|
||||
This module defines specific exceptions for job processing lifecycle
|
||||
and repository operations to provide clear error handling.
|
||||
"""
|
||||
|
||||
from app.models.job import ProcessingStatus
|
||||
|
||||
|
||||
class InvalidStatusTransitionError(Exception):
|
||||
"""
|
||||
Raised when an invalid status transition is attempted.
|
||||
|
||||
This exception indicates that an attempt was made to change a job's
|
||||
status to an invalid target status given the current status.
|
||||
"""
|
||||
|
||||
def __init__(self, current_status: ProcessingStatus, target_status: ProcessingStatus):
|
||||
self.current_status = current_status
|
||||
self.target_status = target_status
|
||||
super().__init__(
|
||||
f"Invalid status transition from '{current_status}' to '{target_status}'"
|
||||
)
|
||||
|
||||
|
||||
class JobRepositoryError(Exception):
|
||||
"""
|
||||
Raised when a MongoDB operation fails in the job repository.
|
||||
|
||||
This exception wraps database-related errors that occur during
|
||||
job repository operations.
|
||||
"""
|
||||
|
||||
def __init__(self, operation: str, original_error: Exception):
|
||||
self.operation = operation
|
||||
self.original_error = original_error
|
||||
super().__init__(f"Repository operation '{operation}' failed: {str(original_error)}")
|
||||
@@ -25,7 +25,7 @@ class ProcessingJob(BaseModel):
|
||||
"""
|
||||
|
||||
id: Optional[PyObjectId] = Field(default=None, alias="_id")
|
||||
file_id: PyObjectId = Field(..., description="Reference to file document")
|
||||
document_id: PyObjectId = Field(..., description="Reference to file document")
|
||||
status: ProcessingStatus = Field(default=ProcessingStatus.PENDING, description="Current processing status")
|
||||
task_id: Optional[str] = Field(default=None, description="Celery task UUID")
|
||||
created_at: Optional[datetime] = Field(default=None, description="Timestamp when job was created")
|
||||
|
||||
182
src/file-processor/app/services/job_service.py
Normal file
182
src/file-processor/app/services/job_service.py
Normal file
@@ -0,0 +1,182 @@
|
||||
"""
|
||||
Service layer for job processing business logic.
|
||||
|
||||
This module provides high-level operations for managing processing jobs
|
||||
with strict status transition validation and business rules enforcement.
|
||||
"""
|
||||
|
||||
from typing import Optional
|
||||
|
||||
from app.database.repositories.job_repository import JobRepository
|
||||
from app.exceptions.job_exceptions import InvalidStatusTransitionError
|
||||
from app.models.job import ProcessingJob, ProcessingStatus
|
||||
from app.models.types import PyObjectId
|
||||
|
||||
|
||||
class JobService:
|
||||
"""
|
||||
Service for processing job business logic operations.
|
||||
|
||||
Provides high-level job management with strict status transition
|
||||
validation and business rule enforcement.
|
||||
"""
|
||||
|
||||
def __init__(self, database):
|
||||
"""
|
||||
Initialize service with job repository.
|
||||
|
||||
Args:
|
||||
repository: Optional JobRepository instance (creates default if None)
|
||||
"""
|
||||
self.db = database
|
||||
self.repository = JobRepository(database)
|
||||
|
||||
async def initialize(self):
|
||||
await self.repository.initialize()
|
||||
return self
|
||||
|
||||
async def create_job(self, file_id: PyObjectId, task_id: Optional[str] = None) -> ProcessingJob:
|
||||
"""
|
||||
Create a new processing job.
|
||||
|
||||
Args:
|
||||
file_id: Reference to the file document
|
||||
task_id: Optional Celery task UUID
|
||||
|
||||
Returns:
|
||||
The created ProcessingJob
|
||||
|
||||
Raises:
|
||||
JobRepositoryError: If database operation fails
|
||||
"""
|
||||
return await self.repository.create_job(file_id, task_id)
|
||||
|
||||
async def get_job_by_id(self, job_id: PyObjectId) -> ProcessingJob:
|
||||
"""
|
||||
Retrieve a job by its ID.
|
||||
|
||||
Args:
|
||||
job_id: The job ObjectId
|
||||
|
||||
Returns:
|
||||
The ProcessingJob document
|
||||
|
||||
Raises:
|
||||
JobNotFoundError: If job doesn't exist
|
||||
JobRepositoryError: If database operation fails
|
||||
"""
|
||||
return await self.repository.find_job_by_id(job_id)
|
||||
|
||||
async def mark_job_as_started(self, job_id: PyObjectId) -> ProcessingJob:
|
||||
"""
|
||||
Mark a job as started (PENDING → PROCESSING).
|
||||
|
||||
Args:
|
||||
job_id: The job ObjectId
|
||||
|
||||
Returns:
|
||||
The updated ProcessingJob
|
||||
|
||||
Raises:
|
||||
JobNotFoundError: If job doesn't exist
|
||||
InvalidStatusTransitionError: If job is not in PENDING status
|
||||
JobRepositoryError: If database operation fails
|
||||
"""
|
||||
# Get current job to validate transition
|
||||
current_job = await self.repository.find_job_by_id(job_id)
|
||||
|
||||
# Validate status transition
|
||||
if current_job.status != ProcessingStatus.PENDING:
|
||||
raise InvalidStatusTransitionError(current_job.status, ProcessingStatus.PROCESSING)
|
||||
|
||||
# Update status
|
||||
return await self.repository.update_job_status(job_id, ProcessingStatus.PROCESSING)
|
||||
|
||||
async def mark_job_as_completed(self, job_id: PyObjectId) -> ProcessingJob:
|
||||
"""
|
||||
Mark a job as completed (PROCESSING → COMPLETED).
|
||||
|
||||
Args:
|
||||
job_id: The job ObjectId
|
||||
|
||||
Returns:
|
||||
The updated ProcessingJob
|
||||
|
||||
Raises:
|
||||
JobNotFoundError: If job doesn't exist
|
||||
InvalidStatusTransitionError: If job is not in PROCESSING status
|
||||
JobRepositoryError: If database operation fails
|
||||
"""
|
||||
# Get current job to validate transition
|
||||
current_job = await self.repository.find_job_by_id(job_id)
|
||||
|
||||
# Validate status transition
|
||||
if current_job.status != ProcessingStatus.PROCESSING:
|
||||
raise InvalidStatusTransitionError(current_job.status, ProcessingStatus.COMPLETED)
|
||||
|
||||
# Update status
|
||||
return await self.repository.update_job_status(job_id, ProcessingStatus.COMPLETED)
|
||||
|
||||
async def mark_job_as_failed(
|
||||
self,
|
||||
job_id: PyObjectId,
|
||||
error_message: Optional[str] = None
|
||||
) -> ProcessingJob:
|
||||
"""
|
||||
Mark a job as failed (PROCESSING → FAILED).
|
||||
|
||||
Args:
|
||||
job_id: The job ObjectId
|
||||
error_message: Optional error description
|
||||
|
||||
Returns:
|
||||
The updated ProcessingJob
|
||||
|
||||
Raises:
|
||||
JobNotFoundError: If job doesn't exist
|
||||
InvalidStatusTransitionError: If job is not in PROCESSING status
|
||||
JobRepositoryError: If database operation fails
|
||||
"""
|
||||
# Get current job to validate transition
|
||||
current_job = await self.repository.find_job_by_id(job_id)
|
||||
|
||||
# Validate status transition
|
||||
if current_job.status != ProcessingStatus.PROCESSING:
|
||||
raise InvalidStatusTransitionError(current_job.status, ProcessingStatus.FAILED)
|
||||
|
||||
# Update status with error message
|
||||
return await self.repository.update_job_status(
|
||||
job_id,
|
||||
ProcessingStatus.FAILED,
|
||||
error_message
|
||||
)
|
||||
|
||||
async def delete_job(self, job_id: PyObjectId) -> bool:
|
||||
"""
|
||||
Delete a job from the database.
|
||||
|
||||
Args:
|
||||
job_id: The job ObjectId
|
||||
|
||||
Returns:
|
||||
True if job was deleted, False if not found
|
||||
|
||||
Raises:
|
||||
JobRepositoryError: If database operation fails
|
||||
"""
|
||||
return await self.repository.delete_job(job_id)
|
||||
|
||||
async def get_jobs_by_status(self, status: ProcessingStatus) -> list[ProcessingJob]:
|
||||
"""
|
||||
Retrieve all jobs with a specific status.
|
||||
|
||||
Args:
|
||||
status: The processing status to filter by
|
||||
|
||||
Returns:
|
||||
List of ProcessingJob documents
|
||||
|
||||
Raises:
|
||||
JobRepositoryError: If database operation fails
|
||||
"""
|
||||
return await self.repository.get_jobs_by_status(status)
|
||||
Reference in New Issue
Block a user