From 010ef56f63455ee68dbc7a8b221d8e5f0837a703 Mon Sep 17 00:00:00 2001 From: Kodjo Sossouvi Date: Mon, 22 Sep 2025 22:45:37 +0200 Subject: [PATCH] Working default workflow (file -> celery -> redis -> worker) --- docker-compose.yml | 1 + .../repositories/document_repository.py | 4 +- src/file-processor/app/file_watcher.py | 2 +- .../app/services/job_service.py | 6 +- src/worker/Dockerfile | 6 + src/worker/requirements.txt | 12 +- src/worker/tasks/document_processing.py | 166 ++++-------------- src/worker/tasks/main.py | 91 ++-------- 8 files changed, 68 insertions(+), 220 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index e258a2b..1b7db26 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -59,6 +59,7 @@ services: - PYTHONPATH=/app volumes: - ./src/worker:/app + - ./src/file-processor/app:/app/app # <- Added: shared access file-processor app - ./volumes/watched_files:/watched_files depends_on: - redis diff --git a/src/file-processor/app/database/repositories/document_repository.py b/src/file-processor/app/database/repositories/document_repository.py index 450b8d6..9828b26 100644 --- a/src/file-processor/app/database/repositories/document_repository.py +++ b/src/file-processor/app/database/repositories/document_repository.py @@ -6,9 +6,10 @@ in MongoDB with proper error handling and type safety. """ from typing import Optional, List + from bson import ObjectId -from pymongo.errors import DuplicateKeyError, PyMongoError from motor.motor_asyncio import AsyncIOMotorCollection, AsyncIOMotorDatabase +from pymongo.errors import DuplicateKeyError, PyMongoError from app.database.connection import get_extra_args from app.models.document import FileDocument @@ -40,7 +41,6 @@ class FileDocumentRepository: """Initialize file repository with database connection.""" self.db = database self.collection: AsyncIOMotorCollection = self.db.documents - self._ensure_indexes() async def initialize(self): """ diff --git a/src/file-processor/app/file_watcher.py b/src/file-processor/app/file_watcher.py index d98e373..c6baa30 100644 --- a/src/file-processor/app/file_watcher.py +++ b/src/file-processor/app/file_watcher.py @@ -64,7 +64,7 @@ class DocumentFileEventHandler(FileSystemEventHandler): logger.info(f"Processing new file: {filepath}") try: - from tasks.document_processing import process_document + from tasks.main import process_document celery_result = process_document.delay(filepath) celery_task_id = celery_result.id logger.info(f"Dispatched Celery task with ID: {celery_task_id}") diff --git a/src/file-processor/app/services/job_service.py b/src/file-processor/app/services/job_service.py index c0c5e6a..4f87d66 100644 --- a/src/file-processor/app/services/job_service.py +++ b/src/file-processor/app/services/job_service.py @@ -35,12 +35,12 @@ class JobService: await self.repository.initialize() return self - async def create_job(self, file_id: PyObjectId, task_id: Optional[str] = None) -> ProcessingJob: + async def create_job(self, document_id: PyObjectId, task_id: Optional[str] = None) -> ProcessingJob: """ Create a new processing job. Args: - file_id: Reference to the file document + document_id: Reference to the file document task_id: Optional Celery task UUID Returns: @@ -49,7 +49,7 @@ class JobService: Raises: JobRepositoryError: If database operation fails """ - return await self.repository.create_job(file_id, task_id) + return await self.repository.create_job(document_id, task_id) async def get_job_by_id(self, job_id: PyObjectId) -> ProcessingJob: """ diff --git a/src/worker/Dockerfile b/src/worker/Dockerfile index 7be56ed..778383b 100644 --- a/src/worker/Dockerfile +++ b/src/worker/Dockerfile @@ -3,6 +3,12 @@ FROM python:3.12-slim # Set working directory WORKDIR /app +# Install libmagic +RUN apt-get update && apt-get install -y --no-install-recommends \ + libmagic1 \ + file \ + && rm -rf /var/lib/apt/lists/* + # Copy requirements and install dependencies COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt diff --git a/src/worker/requirements.txt b/src/worker/requirements.txt index af2f3cd..8214763 100644 --- a/src/worker/requirements.txt +++ b/src/worker/requirements.txt @@ -1,4 +1,12 @@ - +bcrypt==4.3.0 celery==5.5.3 +email-validator==2.3.0 +fastapi==0.116.1 +httptools==0.6.4 +motor==3.7.1 +pymongo==4.15.0 +pydantic==2.11.9 redis==6.4.0 -pymongo==4.15.0 \ No newline at end of file +uvicorn==0.35.0 +python-magic==0.4.27 +watchdog==6.0.0 \ No newline at end of file diff --git a/src/worker/tasks/document_processing.py b/src/worker/tasks/document_processing.py index dbec010..4fcf6a7 100644 --- a/src/worker/tasks/document_processing.py +++ b/src/worker/tasks/document_processing.py @@ -8,81 +8,14 @@ and update processing job statuses throughout the task lifecycle. import logging from typing import Any, Dict -from tasks.main import app as celery_app +from app.config import settings +from app.database.connection import get_database +from app.services.document_service import DocumentService logger = logging.getLogger(__name__) -# @celery_app.task(bind=True, autoretry_for=(Exception,), retry_kwargs={'max_retries': 3, 'countdown': 60}) -# def process_document(self, document_service, job_service, filepath: str) -> Dict[str, Any]: -# """ -# Process a document file and extract its content. -# -# This task: -# 1. Updates the processing job status to PROCESSING -# 2. Performs document content extraction -# 3. Updates job status to COMPLETED or FAILED based on result -# -# Args: -# self : Celery task instance -# job_service : Instance of JobService -# document_service : Instance of DocumentService -# filepath: Full path to the document file to process -# -# Returns: -# Dictionary containing processing results -# -# Raises: -# Exception: Any processing error (will trigger retry) -# """ -# task_id = self.request.id -# logger.info(f"Starting document processing task {task_id} for file: {filepath}") -# -# try: -# # Step 1: Mark job as started -# await job_service.mark_job_as_started(task_id=task_id) -# logger.info(f"Job {task_id} marked as PROCESSING") -# -# # Step 2: Process the document (extract content, OCR, etc.) -# document = await self.document_service.create_document(filepath) -# logger.info(f"Created document record with ID: {document.id}") -# -# result = document_service.extract_document_content(filepath) -# logger.info(f"Document content extracted successfully for task {task_id}") -# -# # Step 3: Mark job as completed -# await job_service.mark_job_as_completed(task_id=task_id) -# logger.info(f"Job {task_id} marked as COMPLETED") -# -# return { -# "task_id": task_id, -# "filepath": filepath, -# "status": "completed", -# "content_length": len(result.get("content", "")), -# "extraction_method": result.get("extraction_method"), -# "processing_time": result.get("processing_time") -# } -# -# except Exception as e: -# error_message = f"Document processing failed: {str(e)}" -# logger.error(f"Task {task_id} failed: {error_message}") -# -# try: -# # Mark job as failed -# job_service.mark_job_as_failed(task_id=task_id, error_message=error_message) -# logger.info(f"Job {task_id} marked as FAILED") -# except Exception as job_error: -# logger.error(f"Failed to update job status for task {task_id}: {str(job_error)}") -# -# # Re-raise the exception to trigger Celery retry mechanism -# raise - - -@celery_app.task(name="tasks.document_processing.process_document", - bind=True, - autoretry_for=(Exception,), - retry_kwargs={'max_retries': 3, 'countdown': 60}) -def process_document(self, filepath: str) -> Dict[str, Any]: +async def process_document_async(self, filepath: str) -> Dict[str, Any]: """ Process a document file and extract its content. @@ -93,8 +26,6 @@ def process_document(self, filepath: str) -> Dict[str, Any]: Args: self : Celery task instance - job_service : Instance of JobService - document_service : Instance of DocumentService filepath: Full path to the document file to process Returns: @@ -105,75 +36,48 @@ def process_document(self, filepath: str) -> Dict[str, Any]: """ task_id = self.request.id logger.info(f"Starting document processing task {task_id} for file: {filepath}") - - -@celery_app.task(bind=True) -def cleanup_old_processing_jobs(self, days_old: int = 30) -> Dict[str, Any]: - """ - Clean up old processing jobs from the database. - - This maintenance task removes completed and failed jobs older than - the specified number of days. - - Args: - days_old: Number of days after which to clean up jobs - - Returns: - Dictionary containing cleanup statistics - """ - task_id = self.request.id - logger.info(f"Starting cleanup task {task_id} for jobs older than {days_old} days") - job_service = JobService() + database = get_database() + document_service = DocumentService(database=database, objects_folder=settings.get_objects_folder()) + from app.services.job_service import JobService + job_service = JobService(database=database) + job = None try: - # Perform cleanup - cleanup_result = job_service.cleanup_old_jobs(days_old=days_old) + # Step 1: Insert the document in DB + document = await document_service.create_document(filepath) + logger.info(f"Job {task_id} created for document {document.id} with file path: {filepath}") - logger.info( - f"Cleanup task {task_id} completed: " - f"deleted {cleanup_result['deleted_count']} jobs" - ) + # Step 2: Create a new job record for the document + job = await job_service.create_job(task_id=task_id, document_id=document.id) + + # Step 3: Mark job as started + await job_service.mark_job_as_started(job_id=job.id) + logger.info(f"Job {task_id} marked as PROCESSING") + + # Step 4: Mark job as completed + await job_service.mark_job_as_completed(job_id=job.id) + logger.info(f"Job {task_id} marked as COMPLETED") return { "task_id": task_id, + "filepath": filepath, "status": "completed", - "deleted_count": cleanup_result["deleted_count"], - "days_old": days_old } except Exception as e: - error_message = f"Cleanup task failed: {str(e)}" - logger.error(f"Cleanup task {task_id} failed: {error_message}") - raise - - -@celery_app.task(bind=True) -def get_processing_statistics(self) -> Dict[str, Any]: - """ - Generate processing statistics for monitoring. - - Returns: - Dictionary containing current processing statistics - """ - task_id = self.request.id - logger.info(f"Generating processing statistics for task {task_id}") - - job_service = JobService() - - try: - stats = job_service.get_processing_statistics() + error_message = f"Document processing failed: {str(e)}" + logger.error(f"Task {task_id} failed: {error_message}") - logger.info(f"Statistics generated for task {task_id}") + try: + # Mark job as failed + if job is not None: + await job_service.mark_job_as_failed(job_id=job.id, error_message=error_message) + logger.info(f"Job {task_id} marked as FAILED") + else: + logger.error(f"Failed to process {filepath}. error = {str(e)}") + except Exception as job_error: + logger.error(f"Failed to update job status for task {task_id}: {str(job_error)}") - return { - "task_id": task_id, - "status": "completed", - "statistics": stats, - "timestamp": stats.get("generated_at") - } - - except Exception as e: - error_message = f"Statistics generation failed: {str(e)}" - logger.error(f"Statistics task {task_id} failed: {error_message}") + # Re-raise the exception to trigger Celery retry mechanism raise diff --git a/src/worker/tasks/main.py b/src/worker/tasks/main.py index ac91b78..dee1852 100644 --- a/src/worker/tasks/main.py +++ b/src/worker/tasks/main.py @@ -3,25 +3,26 @@ Celery worker for MyDocManager document processing tasks. This module contains all Celery tasks for processing documents. """ - +import asyncio import os -import time from celery import Celery +from tasks.document_processing import process_document_async + # Environment variables REDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379/0") MONGODB_URL = os.getenv("MONGODB_URL", "mongodb://localhost:27017") # Initialize Celery app -app = Celery( +celery_app = Celery( "mydocmanager_worker", broker=REDIS_URL, - backend=REDIS_URL + backend=REDIS_URL, ) # Celery configuration -app.conf.update( +celery_app.conf.update( task_serializer="json", accept_content=["json"], result_serializer="json", @@ -33,82 +34,10 @@ app.conf.update( ) -@app.task(bind=True) -def test_task(self, message: str): - """ - Test task for validating worker functionality. - - Args: - message: Test message to process - - Returns: - dict: Task result with processing information - """ - try: - print(f"[WORKER] Starting test task with message: {message}") - - # Simulate some work - for i in range(5): - print(f"[WORKER] Processing step {i + 1}/5...") - time.sleep(1) - - # Update task progress - self.update_state( - state="PROGRESS", - meta={ - "current": i + 1, - "total": 5, - "message": f"Processing step {i + 1}" - } - ) - - result = { - "status": "completed", - "message": f"Successfully processed: {message}", - "processed_at": time.time(), - "worker_id": self.request.id - } - - print(f"[WORKER] Test task completed successfully: {result}") - return result - - except Exception as exc: - print(f"[WORKER] Test task failed: {str(exc)}") - raise self.retry(exc=exc, countdown=60, max_retries=3) - - -@app.task(bind=True) -def process_document_task(self, file_path: str): - """ - Placeholder task for document processing. - - Args: - file_path: Path to the document to process - - Returns: - dict: Processing result - """ - try: - print(f"[WORKER] Starting document processing for: {file_path}") - - # Placeholder for document processing logic - time.sleep(2) # Simulate processing time - - result = { - "status": "completed", - "file_path": file_path, - "processed_at": time.time(), - "content": f"Placeholder content for {file_path}", - "worker_id": self.request.id - } - - print(f"[WORKER] Document processing completed: {file_path}") - return result - - except Exception as exc: - print(f"[WORKER] Document processing failed for {file_path}: {str(exc)}") - raise self.retry(exc=exc, countdown=60, max_retries=3) +@celery_app.task(bind=True, autoretry_for=(Exception,), retry_kwargs={'max_retries': 3, 'countdown': 60}) +def process_document(self, filepath: str): + return asyncio.run(process_document_async(self, filepath)) if __name__ == "__main__": - app.start() + celery_app.start()