Removed async

This commit is contained in:
2025-09-24 21:53:48 +02:00
parent e17c4c7e7b
commit 48f5b009ae
23 changed files with 609 additions and 770 deletions

View File

@@ -17,4 +17,4 @@ RUN pip install --no-cache-dir -r requirements.txt
COPY . .
# Command will be overridden by docker-compose
CMD ["celery", "-A", "main", "worker", "--loglevel=info"]
CMD ["celery", "-A", "main", "worker", "--loglevel=info"]

View File

@@ -1,3 +1,4 @@
asgiref==3.9.1
bcrypt==4.3.0
celery==5.5.3
email-validator==2.3.0

View File

@@ -11,11 +11,12 @@ from typing import Any, Dict
from app.config import settings
from app.database.connection import get_database
from app.services.document_service import DocumentService
from tasks.main import celery_app
logger = logging.getLogger(__name__)
async def process_document_async(self, filepath: str) -> Dict[str, Any]:
@celery_app.task(bind=True, autoretry_for=(Exception,), retry_kwargs={'max_retries': 3, 'countdown': 60})
def process_document(self, filepath: str) -> Dict[str, Any]:
"""
Process a document file and extract its content.
@@ -45,18 +46,18 @@ async def process_document_async(self, filepath: str) -> Dict[str, Any]:
job = None
try:
# Step 1: Insert the document in DB
document = await document_service.create_document(filepath)
document = document_service.create_document(filepath)
logger.info(f"Job {task_id} created for document {document.id} with file path: {filepath}")
# Step 2: Create a new job record for the document
job = await job_service.create_job(task_id=task_id, document_id=document.id)
job = job_service.create_job(task_id=task_id, document_id=document.id)
# Step 3: Mark job as started
await job_service.mark_job_as_started(job_id=job.id)
job_service.mark_job_as_started(job_id=job.id)
logger.info(f"Job {task_id} marked as PROCESSING")
# Step 4: Mark job as completed
await job_service.mark_job_as_completed(job_id=job.id)
job_service.mark_job_as_completed(job_id=job.id)
logger.info(f"Job {task_id} marked as COMPLETED")
return {
@@ -72,7 +73,7 @@ async def process_document_async(self, filepath: str) -> Dict[str, Any]:
try:
# Mark job as failed
if job is not None:
await job_service.mark_job_as_failed(job_id=job.id, error_message=error_message)
job_service.mark_job_as_failed(job_id=job.id, error_message=error_message)
logger.info(f"Job {task_id} marked as FAILED")
else:
logger.error(f"Failed to process {filepath}. error = {str(e)}")
@@ -81,3 +82,4 @@ async def process_document_async(self, filepath: str) -> Dict[str, Any]:
# Re-raise the exception to trigger Celery retry mechanism
raise

View File

@@ -3,13 +3,10 @@ Celery worker for MyDocManager document processing tasks.
This module contains all Celery tasks for processing documents.
"""
import asyncio
import os
from celery import Celery
from tasks.document_processing import process_document_async
# Environment variables
REDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379/0")
MONGODB_URL = os.getenv("MONGODB_URL", "mongodb://localhost:27017")
@@ -21,6 +18,8 @@ celery_app = Celery(
backend=REDIS_URL,
)
celery_app.autodiscover_tasks(["tasks.document_processing"])
# Celery configuration
celery_app.conf.update(
task_serializer="json",
@@ -33,11 +32,5 @@ celery_app.conf.update(
task_soft_time_limit=240, # 4 minutes
)
@celery_app.task(bind=True, autoretry_for=(Exception,), retry_kwargs={'max_retries': 3, 'countdown': 60})
def process_document(self, filepath: str):
return asyncio.run(process_document_async(self, filepath))
if __name__ == "__main__":
celery_app.start()