Working default workflow (file -> celery -> redis -> worker)

This commit is contained in:
2025-09-22 22:45:37 +02:00
parent 34f7854b3c
commit 010ef56f63
8 changed files with 68 additions and 220 deletions

View File

@@ -59,6 +59,7 @@ services:
- PYTHONPATH=/app
volumes:
- ./src/worker:/app
- ./src/file-processor/app:/app/app # <- Added: shared access file-processor app
- ./volumes/watched_files:/watched_files
depends_on:
- redis

View File

@@ -6,9 +6,10 @@ in MongoDB with proper error handling and type safety.
"""
from typing import Optional, List
from bson import ObjectId
from pymongo.errors import DuplicateKeyError, PyMongoError
from motor.motor_asyncio import AsyncIOMotorCollection, AsyncIOMotorDatabase
from pymongo.errors import DuplicateKeyError, PyMongoError
from app.database.connection import get_extra_args
from app.models.document import FileDocument
@@ -40,7 +41,6 @@ class FileDocumentRepository:
"""Initialize file repository with database connection."""
self.db = database
self.collection: AsyncIOMotorCollection = self.db.documents
self._ensure_indexes()
async def initialize(self):
"""

View File

@@ -64,7 +64,7 @@ class DocumentFileEventHandler(FileSystemEventHandler):
logger.info(f"Processing new file: {filepath}")
try:
from tasks.document_processing import process_document
from tasks.main import process_document
celery_result = process_document.delay(filepath)
celery_task_id = celery_result.id
logger.info(f"Dispatched Celery task with ID: {celery_task_id}")

View File

@@ -35,12 +35,12 @@ class JobService:
await self.repository.initialize()
return self
async def create_job(self, file_id: PyObjectId, task_id: Optional[str] = None) -> ProcessingJob:
async def create_job(self, document_id: PyObjectId, task_id: Optional[str] = None) -> ProcessingJob:
"""
Create a new processing job.
Args:
file_id: Reference to the file document
document_id: Reference to the file document
task_id: Optional Celery task UUID
Returns:
@@ -49,7 +49,7 @@ class JobService:
Raises:
JobRepositoryError: If database operation fails
"""
return await self.repository.create_job(file_id, task_id)
return await self.repository.create_job(document_id, task_id)
async def get_job_by_id(self, job_id: PyObjectId) -> ProcessingJob:
"""

View File

@@ -3,6 +3,12 @@ FROM python:3.12-slim
# Set working directory
WORKDIR /app
# Install libmagic
RUN apt-get update && apt-get install -y --no-install-recommends \
libmagic1 \
file \
&& rm -rf /var/lib/apt/lists/*
# Copy requirements and install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

View File

@@ -1,4 +1,12 @@
bcrypt==4.3.0
celery==5.5.3
redis==6.4.0
email-validator==2.3.0
fastapi==0.116.1
httptools==0.6.4
motor==3.7.1
pymongo==4.15.0
pydantic==2.11.9
redis==6.4.0
uvicorn==0.35.0
python-magic==0.4.27
watchdog==6.0.0

View File

@@ -8,81 +8,14 @@ and update processing job statuses throughout the task lifecycle.
import logging
from typing import Any, Dict
from tasks.main import app as celery_app
from app.config import settings
from app.database.connection import get_database
from app.services.document_service import DocumentService
logger = logging.getLogger(__name__)
# @celery_app.task(bind=True, autoretry_for=(Exception,), retry_kwargs={'max_retries': 3, 'countdown': 60})
# def process_document(self, document_service, job_service, filepath: str) -> Dict[str, Any]:
# """
# Process a document file and extract its content.
#
# This task:
# 1. Updates the processing job status to PROCESSING
# 2. Performs document content extraction
# 3. Updates job status to COMPLETED or FAILED based on result
#
# Args:
# self : Celery task instance
# job_service : Instance of JobService
# document_service : Instance of DocumentService
# filepath: Full path to the document file to process
#
# Returns:
# Dictionary containing processing results
#
# Raises:
# Exception: Any processing error (will trigger retry)
# """
# task_id = self.request.id
# logger.info(f"Starting document processing task {task_id} for file: {filepath}")
#
# try:
# # Step 1: Mark job as started
# await job_service.mark_job_as_started(task_id=task_id)
# logger.info(f"Job {task_id} marked as PROCESSING")
#
# # Step 2: Process the document (extract content, OCR, etc.)
# document = await self.document_service.create_document(filepath)
# logger.info(f"Created document record with ID: {document.id}")
#
# result = document_service.extract_document_content(filepath)
# logger.info(f"Document content extracted successfully for task {task_id}")
#
# # Step 3: Mark job as completed
# await job_service.mark_job_as_completed(task_id=task_id)
# logger.info(f"Job {task_id} marked as COMPLETED")
#
# return {
# "task_id": task_id,
# "filepath": filepath,
# "status": "completed",
# "content_length": len(result.get("content", "")),
# "extraction_method": result.get("extraction_method"),
# "processing_time": result.get("processing_time")
# }
#
# except Exception as e:
# error_message = f"Document processing failed: {str(e)}"
# logger.error(f"Task {task_id} failed: {error_message}")
#
# try:
# # Mark job as failed
# job_service.mark_job_as_failed(task_id=task_id, error_message=error_message)
# logger.info(f"Job {task_id} marked as FAILED")
# except Exception as job_error:
# logger.error(f"Failed to update job status for task {task_id}: {str(job_error)}")
#
# # Re-raise the exception to trigger Celery retry mechanism
# raise
@celery_app.task(name="tasks.document_processing.process_document",
bind=True,
autoretry_for=(Exception,),
retry_kwargs={'max_retries': 3, 'countdown': 60})
def process_document(self, filepath: str) -> Dict[str, Any]:
async def process_document_async(self, filepath: str) -> Dict[str, Any]:
"""
Process a document file and extract its content.
@@ -93,8 +26,6 @@ def process_document(self, filepath: str) -> Dict[str, Any]:
Args:
self : Celery task instance
job_service : Instance of JobService
document_service : Instance of DocumentService
filepath: Full path to the document file to process
Returns:
@@ -106,74 +37,47 @@ def process_document(self, filepath: str) -> Dict[str, Any]:
task_id = self.request.id
logger.info(f"Starting document processing task {task_id} for file: {filepath}")
database = get_database()
document_service = DocumentService(database=database, objects_folder=settings.get_objects_folder())
from app.services.job_service import JobService
job_service = JobService(database=database)
@celery_app.task(bind=True)
def cleanup_old_processing_jobs(self, days_old: int = 30) -> Dict[str, Any]:
"""
Clean up old processing jobs from the database.
This maintenance task removes completed and failed jobs older than
the specified number of days.
Args:
days_old: Number of days after which to clean up jobs
Returns:
Dictionary containing cleanup statistics
"""
task_id = self.request.id
logger.info(f"Starting cleanup task {task_id} for jobs older than {days_old} days")
job_service = JobService()
job = None
try:
# Perform cleanup
cleanup_result = job_service.cleanup_old_jobs(days_old=days_old)
# Step 1: Insert the document in DB
document = await document_service.create_document(filepath)
logger.info(f"Job {task_id} created for document {document.id} with file path: {filepath}")
logger.info(
f"Cleanup task {task_id} completed: "
f"deleted {cleanup_result['deleted_count']} jobs"
)
# Step 2: Create a new job record for the document
job = await job_service.create_job(task_id=task_id, document_id=document.id)
# Step 3: Mark job as started
await job_service.mark_job_as_started(job_id=job.id)
logger.info(f"Job {task_id} marked as PROCESSING")
# Step 4: Mark job as completed
await job_service.mark_job_as_completed(job_id=job.id)
logger.info(f"Job {task_id} marked as COMPLETED")
return {
"task_id": task_id,
"filepath": filepath,
"status": "completed",
"deleted_count": cleanup_result["deleted_count"],
"days_old": days_old
}
except Exception as e:
error_message = f"Cleanup task failed: {str(e)}"
logger.error(f"Cleanup task {task_id} failed: {error_message}")
raise
@celery_app.task(bind=True)
def get_processing_statistics(self) -> Dict[str, Any]:
"""
Generate processing statistics for monitoring.
Returns:
Dictionary containing current processing statistics
"""
task_id = self.request.id
logger.info(f"Generating processing statistics for task {task_id}")
job_service = JobService()
try:
stats = job_service.get_processing_statistics()
logger.info(f"Statistics generated for task {task_id}")
return {
"task_id": task_id,
"status": "completed",
"statistics": stats,
"timestamp": stats.get("generated_at")
}
except Exception as e:
error_message = f"Statistics generation failed: {str(e)}"
logger.error(f"Statistics task {task_id} failed: {error_message}")
error_message = f"Document processing failed: {str(e)}"
logger.error(f"Task {task_id} failed: {error_message}")
try:
# Mark job as failed
if job is not None:
await job_service.mark_job_as_failed(job_id=job.id, error_message=error_message)
logger.info(f"Job {task_id} marked as FAILED")
else:
logger.error(f"Failed to process {filepath}. error = {str(e)}")
except Exception as job_error:
logger.error(f"Failed to update job status for task {task_id}: {str(job_error)}")
# Re-raise the exception to trigger Celery retry mechanism
raise

View File

@@ -3,25 +3,26 @@ Celery worker for MyDocManager document processing tasks.
This module contains all Celery tasks for processing documents.
"""
import asyncio
import os
import time
from celery import Celery
from tasks.document_processing import process_document_async
# Environment variables
REDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379/0")
MONGODB_URL = os.getenv("MONGODB_URL", "mongodb://localhost:27017")
# Initialize Celery app
app = Celery(
celery_app = Celery(
"mydocmanager_worker",
broker=REDIS_URL,
backend=REDIS_URL
backend=REDIS_URL,
)
# Celery configuration
app.conf.update(
celery_app.conf.update(
task_serializer="json",
accept_content=["json"],
result_serializer="json",
@@ -33,82 +34,10 @@ app.conf.update(
)
@app.task(bind=True)
def test_task(self, message: str):
"""
Test task for validating worker functionality.
Args:
message: Test message to process
Returns:
dict: Task result with processing information
"""
try:
print(f"[WORKER] Starting test task with message: {message}")
# Simulate some work
for i in range(5):
print(f"[WORKER] Processing step {i + 1}/5...")
time.sleep(1)
# Update task progress
self.update_state(
state="PROGRESS",
meta={
"current": i + 1,
"total": 5,
"message": f"Processing step {i + 1}"
}
)
result = {
"status": "completed",
"message": f"Successfully processed: {message}",
"processed_at": time.time(),
"worker_id": self.request.id
}
print(f"[WORKER] Test task completed successfully: {result}")
return result
except Exception as exc:
print(f"[WORKER] Test task failed: {str(exc)}")
raise self.retry(exc=exc, countdown=60, max_retries=3)
@app.task(bind=True)
def process_document_task(self, file_path: str):
"""
Placeholder task for document processing.
Args:
file_path: Path to the document to process
Returns:
dict: Processing result
"""
try:
print(f"[WORKER] Starting document processing for: {file_path}")
# Placeholder for document processing logic
time.sleep(2) # Simulate processing time
result = {
"status": "completed",
"file_path": file_path,
"processed_at": time.time(),
"content": f"Placeholder content for {file_path}",
"worker_id": self.request.id
}
print(f"[WORKER] Document processing completed: {file_path}")
return result
except Exception as exc:
print(f"[WORKER] Document processing failed for {file_path}: {str(exc)}")
raise self.retry(exc=exc, countdown=60, max_retries=3)
@celery_app.task(bind=True, autoretry_for=(Exception,), retry_kwargs={'max_retries': 3, 'countdown': 60})
def process_document(self, filepath: str):
return asyncio.run(process_document_async(self, filepath))
if __name__ == "__main__":
app.start()
celery_app.start()