diff --git a/.gitignore b/.gitignore index 70054a0..6896de6 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,5 @@ +volumes + # Byte-compiled / optimized / DLL files __pycache__/ *.py[codz] diff --git a/Readme.md b/Readme.md index acab586..3cb446d 100644 --- a/Readme.md +++ b/Readme.md @@ -95,8 +95,8 @@ MyDocManager/ │ │ ├── requirements.txt │ │ ├── app/ │ │ │ ├── main.py -│ │ │ ├── file_watcher.py -│ │ │ ├── celery_app.py +│ │ │ ├── file_watcher.py # FileWatcher class with observer thread +│ │ │ ├── celery_app.py # Celery Configuration │ │ │ ├── config/ │ │ │ │ ├── __init__.py │ │ │ │ └── settings.py # JWT, MongoDB config diff --git a/docker-compose.yml b/docker-compose.yml index 7cb68cb..e258a2b 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -34,9 +34,10 @@ services: environment: - REDIS_URL=redis://redis:6379/0 - MONGODB_URL=mongodb://admin:password123@mongodb:27017/mydocmanager?authSource=admin - - PYTHONPATH=/app + - PYTHONPATH=/app:/tasks # Added /tasks to Python path volumes: - ./src/file-processor:/app + - ./src/worker/tasks:/app/tasks # <- Added: shared access to worker tasks - ./volumes/watched_files:/watched_files - ./volumes/objects:/objects depends_on: @@ -57,14 +58,15 @@ services: - MONGODB_URL=mongodb://admin:password123@mongodb:27017/mydocmanager?authSource=admin - PYTHONPATH=/app volumes: - - ./src/worker/tasks:/app + - ./src/worker:/app - ./volumes/watched_files:/watched_files depends_on: - redis - mongodb networks: - mydocmanager-network - command: celery -A main worker --loglevel=info + command: celery -A tasks.main worker --loglevel=info + #command: celery -A main --loglevel=info # pour la production volumes: mongodb-data: diff --git a/requirements.txt b/requirements.txt index 7df4d86..021b0f0 100644 --- a/requirements.txt +++ b/requirements.txt @@ -45,6 +45,7 @@ tzdata==2025.2 uvicorn==0.35.0 uvloop==0.21.0 vine==5.1.0 +watchdog==6.0.0 watchfiles==1.1.0 wcwidth==0.2.13 websockets==15.0.1 diff --git a/src/file-processor/Dockerfile b/src/file-processor/Dockerfile index 62477fd..434b3cf 100644 --- a/src/file-processor/Dockerfile +++ b/src/file-processor/Dockerfile @@ -3,6 +3,12 @@ FROM python:3.12-slim # Set working directory WORKDIR /app +# Install libmagic +RUN apt-get update && apt-get install -y --no-install-recommends \ + libmagic1 \ + file \ + && rm -rf /var/lib/apt/lists/* + # Copy requirements and install dependencies COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt diff --git a/src/file-processor/app/config/settings.py b/src/file-processor/app/config/settings.py index 7f8dc7f..051afb3 100644 --- a/src/file-processor/app/config/settings.py +++ b/src/file-processor/app/config/settings.py @@ -6,7 +6,6 @@ using simple os.getenv() approach without external validation libraries. """ import os -from typing import Optional def get_mongodb_url() -> str: @@ -51,15 +50,6 @@ def get_jwt_secret_key() -> str: raise ValueError("JWT_SECRET environment variable must be set in production") return secret -def get_objects_folder() -> str: - """ - Get Vault path from environment variables. - - Returns: - str: Vault path - """ - return os.getenv("OBJECTS_FOLDER", "/objects") - def get_jwt_algorithm() -> str: """ @@ -91,4 +81,19 @@ def is_development_environment() -> bool: Returns: bool: True if development environment """ - return os.getenv("ENVIRONMENT", "development").lower() == "development" \ No newline at end of file + return os.getenv("ENVIRONMENT", "development").lower() == "development" + + +def get_objects_folder() -> str: + """ + Get Vault path from environment variables. + + Returns: + str: Vault path + """ + return os.getenv("OBJECTS_FOLDER", "/objects") + + +def watch_directory() -> str: + """Directory to monitor for new files""" + return os.getenv("WATCH_DIRECTORY", "/watched_files") diff --git a/src/file-processor/app/file_watcher.py b/src/file-processor/app/file_watcher.py new file mode 100644 index 0000000..d98e373 --- /dev/null +++ b/src/file-processor/app/file_watcher.py @@ -0,0 +1,241 @@ +""" +File watcher implementation with Watchdog observer and ProcessingJob management. + +This module provides real-time file monitoring for document processing. +When a file is created in the watched directory, it: +1. Creates a document record via DocumentService +2. Dispatches a Celery task for processing +3. Creates a ProcessingJob to track the task lifecycle +""" + +import logging +import threading +from pathlib import Path +from typing import Optional + +from watchdog.events import FileSystemEventHandler, FileCreatedEvent +from watchdog.observers import Observer + +from app.services.document_service import DocumentService +from app.services.job_service import JobService + +logger = logging.getLogger(__name__) + + +class DocumentFileEventHandler(FileSystemEventHandler): + """ + Event handler for document file creation events. + + Processes newly created files by creating document records, + dispatching Celery tasks, and managing processing jobs. + """ + + SUPPORTED_EXTENSIONS = {'.txt', '.pdf', '.docx'} + + def __init__(self, document_service: DocumentService, job_service: JobService): + """ + Initialize the event handler. + + Args: + document_service: Service for document management + job_service: Service for processing job management + """ + super().__init__() + self.document_service = document_service + self.job_service = job_service + + def on_created(self, event: FileCreatedEvent) -> None: + """ + Handle file creation events. + + Args: + event: File system event containing file path information + """ + if event.is_directory: + return + + filepath = event.src_path + file_extension = Path(filepath).suffix.lower() + + if file_extension not in self.SUPPORTED_EXTENSIONS: + logger.info(f"Ignoring unsupported file type: {filepath}") + return + + logger.info(f"Processing new file: {filepath}") + + try: + from tasks.document_processing import process_document + celery_result = process_document.delay(filepath) + celery_task_id = celery_result.id + logger.info(f"Dispatched Celery task with ID: {celery_task_id}") + + except Exception as e: + logger.error(f"Failed to process file {filepath}: {str(e)}") + # Note: We don't re-raise the exception to keep the watcher running + + +class FileWatcher: + """ + File system watcher for automatic document processing. + + Monitors a directory for new files and triggers processing pipeline + using a dedicated observer thread. + """ + + def __init__( + self, + watch_directory: str, + document_service: DocumentService, + job_service: JobService, + recursive: bool = True + ): + """ + Initialize the file watcher. + + Args: + watch_directory: Directory path to monitor + document_service: Service for document management + job_service: Service for processing job management + recursive: Whether to watch subdirectories recursively + """ + self.watch_directory = Path(watch_directory) + self.recursive = recursive + self.observer: Optional[Observer] = None + self._observer_thread: Optional[threading.Thread] = None + self._stop_event = threading.Event() + + # Validate watch directory + if not self.watch_directory.exists(): + raise ValueError(f"Watch directory does not exist: {watch_directory}") + + if not self.watch_directory.is_dir(): + raise ValueError(f"Watch path is not a directory: {watch_directory}") + + # Create event handler + self.event_handler = DocumentFileEventHandler( + document_service=document_service, + job_service=job_service + ) + + logger.info(f"FileWatcher initialized for directory: {self.watch_directory}") + + def start(self) -> None: + """ + Start the file watcher in a separate thread. + + Raises: + RuntimeError: If the watcher is already running + """ + if self.is_running(): + raise RuntimeError("FileWatcher is already running") + + self.observer = Observer() + self.observer.schedule( + self.event_handler, + str(self.watch_directory), + recursive=self.recursive + ) + + # Start observer in separate thread + self._observer_thread = threading.Thread( + target=self._run_observer, + name="FileWatcher-Observer" + ) + self._stop_event.clear() + self._observer_thread.start() + + logger.info("FileWatcher started successfully") + + def stop(self, timeout: float = 5.0) -> None: + """ + Stop the file watcher gracefully. + + Args: + timeout: Maximum time to wait for graceful shutdown + """ + if not self.is_running(): + logger.warning("FileWatcher is not running") + return + + logger.info("Stopping FileWatcher...") + + # Signal stop and wait for observer thread + self._stop_event.set() + + if self.observer: + self.observer.stop() + + if self._observer_thread and self._observer_thread.is_alive(): + self._observer_thread.join(timeout=timeout) + + if self._observer_thread.is_alive(): + logger.warning("FileWatcher thread did not stop gracefully within timeout") + else: + logger.info("FileWatcher stopped gracefully") + + # Clean up + self.observer = None + self._observer_thread = None + + def is_running(self) -> bool: + """ + Check if the file watcher is currently running. + + Returns: + True if the watcher is running, False otherwise + """ + return ( + self.observer is not None + and self._observer_thread is not None + and self._observer_thread.is_alive() + ) + + def _run_observer(self) -> None: + """ + Internal method to run the observer in a separate thread. + + This method should not be called directly. + """ + if not self.observer: + logger.error("Observer not initialized") + return + + try: + self.observer.start() + logger.info("Observer thread started") + + # Keep the observer running until stop is requested + while not self._stop_event.is_set(): + self._stop_event.wait(timeout=1.0) + + logger.info("Observer thread stopping...") + + except Exception as e: + logger.error(f"Observer thread error: {str(e)}") + finally: + if self.observer: + self.observer.join() + logger.info("Observer thread stopped") + + +def create_file_watcher( + watch_directory: str, + document_service: DocumentService, + job_service: JobService +) -> FileWatcher: + """ + Factory function to create a FileWatcher instance. + + Args: + watch_directory: Directory path to monitor + document_service: Service for document management + job_service: Service for processing job management + + Returns: + Configured FileWatcher instance + """ + return FileWatcher( + watch_directory=watch_directory, + document_service=document_service, + job_service=job_service + ) diff --git a/src/file-processor/app/main.py b/src/file-processor/app/main.py index 7c99fc7..dbf9499 100644 --- a/src/file-processor/app/main.py +++ b/src/file-processor/app/main.py @@ -1,205 +1,174 @@ """ -FastAPI application for MyDocManager file processor service. +FastAPI application with integrated FileWatcher for document processing. -This service provides API endpoints for health checks and task dispatching. +This module provides the main FastAPI application with: +- JWT authentication +- User management APIs +- Real-time file monitoring via FileWatcher +- Document processing via Celery tasks """ import logging -import os from contextlib import asynccontextmanager +from typing import AsyncGenerator -import redis -from celery import Celery -from fastapi import FastAPI, HTTPException, Depends -from pydantic import BaseModel +from fastapi import FastAPI +from fastapi.middleware.cors import CORSMiddleware -from app.database.connection import test_database_connection, get_database -from app.database.repositories.user_repository import UserRepository -from app.models.user import UserCreate +from app.config import settings +from app.database.connection import get_database +from app.file_watcher import create_file_watcher, FileWatcher +from app.services.document_service import DocumentService from app.services.init_service import InitializationService +from app.services.job_service import JobService from app.services.user_service import UserService +# from api.routes.auth import router as auth_router +# from api.routes.users import router as users_router +# from api.routes.documents import router as documents_router +# from api.routes.jobs import router as jobs_router + + # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) +# Global file watcher instance +file_watcher: FileWatcher = None + @asynccontextmanager -async def lifespan(app: FastAPI): +async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]: """ - Application lifespan manager for startup and shutdown tasks. - - Handles initialization tasks that need to run when the application starts, - including admin user creation and other setup procedures. + FastAPI lifespan context manager. + + Handles application startup and shutdown events including: + - Database connection + - Default admin user creation + - FileWatcher startup/shutdown """ - # Startup tasks + global file_watcher + + # Startup logger.info("Starting MyDocManager application...") try: # Initialize database connection database = get_database() + logger.info("Database connection established") - # Initialize repositories and services - user_service = await UserService(database).initialize() + document_service = DocumentService(database=database, objects_folder=settings.get_objects_folder()) + job_service = JobService(database=database) + user_service = UserService(database=database) + logger.info("Service created") + + # Create default admin user init_service = InitializationService(user_service) + await init_service.initialize_application() + logger.info("Default admin user initialization completed") - # Run initialization tasks - initialization_result = await init_service.initialize_application() + # Create and start file watcher + file_watcher = create_file_watcher( + watch_directory=settings.watch_directory(), + document_service=document_service, + job_service=job_service + ) + file_watcher.start() + logger.info(f"FileWatcher started for directory: {settings.watch_directory()}") - if initialization_result["initialization_success"]: - logger.info("Application startup completed successfully") - if initialization_result["admin_user_created"]: - logger.info("Default admin user was created during startup") - else: - logger.error("Application startup completed with errors:") - for error in initialization_result["errors"]: - logger.error(f" - {error}") + logger.info("Application startup completed successfully") + + yield except Exception as e: - raise e - logger.error(f"Critical error during application startup: {str(e)}") - # You might want to decide if the app should continue or exit here - # For now, we log the error but continue + logger.error(f"Application startup failed: {str(e)}") + raise - yield # Application is running - - # Shutdown tasks (if needed) - logger.info("Shutting down MyDocManager application...") + finally: + # Shutdown + logger.info("Shutting down MyDocManager application...") + + if file_watcher and file_watcher.is_running(): + file_watcher.stop() + logger.info("FileWatcher stopped") + + logger.info("Application shutdown completed") -# Initialize FastAPI app +# Create FastAPI application app = FastAPI( - title="MyDocManager File Processor", - description="File processing and task dispatch service", - version="1.0.0", + title="MyDocManager", + description="Real-time document processing application with authentication", + version="0.1.0", lifespan=lifespan ) -# Environment variables -REDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379/0") -MONGODB_URL = os.getenv("MONGODB_URL", "mongodb://localhost:27017") - -# Initialize Redis client -try: - redis_client = redis.from_url(REDIS_URL) -except Exception as e: - redis_client = None - print(f"Warning: Could not connect to Redis: {e}") - -# Initialize Celery -celery_app = Celery( - "file_processor", - broker=REDIS_URL, - backend=REDIS_URL +# Configure CORS +app.add_middleware( + CORSMiddleware, + allow_origins=["http://localhost:3000"], # React frontend + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], ) -# Pydantic models -class TestTaskRequest(BaseModel): - """Request model for test task.""" - message: str - - -def get_user_service() -> UserService: - """ - Dependency to get user service instance. - - This should be properly implemented with database connection management - in your actual application. - """ - database = get_database() - user_repository = UserRepository(database) - return UserService(user_repository) - - -# Your API routes would use the service like this: -@app.post("/api/users") -async def create_user( - user_data: UserCreate, - user_service: UserService = Depends(get_user_service) -): - return user_service.create_user(user_data) +# Include routers +# app.include_router(auth_router, prefix="/auth", tags=["Authentication"]) +# app.include_router(users_router, prefix="/users", tags=["User Management"]) +# app.include_router(documents_router, prefix="/documents", tags=["Documents"]) +# app.include_router(jobs_router, prefix="/jobs", tags=["Processing Jobs"]) @app.get("/health") async def health_check(): """ Health check endpoint. - + Returns: - dict: Service health status with dependencies + Dictionary containing application health status """ - health_status = { + return { "status": "healthy", - "service": "file-processor", - "dependencies": { - "redis": "unknown", - "mongodb": "unknown" - }, + "service": "MyDocManager", + "version": "1.0.0", + "file_watcher_running": file_watcher.is_running() if file_watcher else False } - - # Check Redis connection - if redis_client: - try: - redis_client.ping() - health_status["dependencies"]["redis"] = "connected" - except Exception: - health_status["dependencies"]["redis"] = "disconnected" - health_status["status"] = "degraded" - - # check MongoDB connection - if test_database_connection(): - health_status["dependencies"]["mongodb"] = "connected" - else: - health_status["dependencies"]["mongodb"] = "disconnected" - - return health_status - - -@app.post("/test-task") -async def dispatch_test_task(request: TestTaskRequest): - """ - Dispatch a test task to Celery worker. - - Args: - request: Test task request containing message - - Returns: - dict: Task dispatch information - - Raises: - HTTPException: If task dispatch fails - """ - try: - # Send task to worker - task = celery_app.send_task( - "main.test_task", - args=[request.message] - ) - - return { - "status": "dispatched", - "task_id": task.id, - "message": f"Test task dispatched with message: {request.message}" - } - - except Exception as e: - raise HTTPException( - status_code=500, - detail=f"Failed to dispatch task: {str(e)}" - ) @app.get("/") async def root(): """ - Root endpoint. - + Root endpoint with basic application information. + Returns: - dict: Basic service information + Dictionary containing welcome message and available endpoints """ return { - "service": "MyDocManager File Processor", - "version": "1.0.0", - "status": "running" + "message": "Welcome to MyDocManager", + "description": "Real-time document processing application", + "docs": "/docs", + "health": "/health" + } + + +@app.get("/watcher/status") +async def watcher_status(): + """ + Get file watcher status. + + Returns: + Dictionary containing file watcher status information + """ + if not file_watcher: + return { + "status": "not_initialized", + "running": False + } + + return { + "status": "initialized", + "running": file_watcher.is_running(), + "watch_directory": str(file_watcher.watch_directory), + "recursive": file_watcher.recursive } diff --git a/src/file-processor/app/services/document_service.py b/src/file-processor/app/services/document_service.py index ac00fb6..f19e73d 100644 --- a/src/file-processor/app/services/document_service.py +++ b/src/file-processor/app/services/document_service.py @@ -95,6 +95,28 @@ class DocumentService: """ return magic.from_buffer(file_bytes, mime=True) + @staticmethod + def _read_file_bytes(file_path: str | Path) -> bytes: + """ + Read file content as bytes asynchronously. + + Args: + file_path (str | Path): Path of the file to read + + Returns: + bytes: Content of the file + + Raises: + FileNotFoundError: If the file does not exist + OSError: If any I/O error occurs + """ + path = Path(file_path) + + if not path.exists(): + raise FileNotFoundError(f"File not found: {file_path}") + + return path.read_bytes() + def _get_document_path(self, file_hash): """ @@ -117,7 +139,7 @@ class DocumentService: async def create_document( self, file_path: str, - file_bytes: bytes, + file_bytes: bytes | None = None, encoding: str = "utf-8" ) -> FileDocument: """ @@ -140,6 +162,7 @@ class DocumentService: PyMongoError: If database operation fails """ # Calculate automatic attributes + file_bytes = file_bytes or self._read_file_bytes(file_path) file_hash = self._calculate_file_hash(file_bytes) file_type = self._detect_file_type(file_path) mime_type = self._detect_mime_type(file_bytes) diff --git a/src/file-processor/app/services/init_service.py b/src/file-processor/app/services/init_service.py index ed2cd7d..0d7aac9 100644 --- a/src/file-processor/app/services/init_service.py +++ b/src/file-processor/app/services/init_service.py @@ -8,8 +8,8 @@ creating default admin user if none exists. import logging from typing import Optional -from app.models.user import UserCreate, UserInDB, UserCreateNoValidation from app.models.auth import UserRole +from app.models.user import UserInDB, UserCreateNoValidation from app.services.user_service import UserService logger = logging.getLogger(__name__) @@ -31,7 +31,6 @@ class InitializationService: user_service (UserService): Service for user operations """ self.user_service = user_service - async def ensure_admin_user_exists(self) -> Optional[UserInDB]: """ @@ -131,4 +130,23 @@ class InitializationService: logger.error(error_msg) initialization_summary["errors"].append(error_msg) - return initialization_summary \ No newline at end of file + self.log_initialization_result(initialization_summary) + + return initialization_summary + + @staticmethod + def log_initialization_result(summary: dict) -> None: + """ + Log the result of the initialization process. + + Args: + summary (dict): Summary of initialization tasks performed + """ + if summary["initialization_success"]: + logger.info("Application startup completed successfully") + if summary["admin_user_created"]: + logger.info("Default admin user was created during startup") + else: + logger.error("Application startup completed with errors:") + for error in summary["errors"]: + logger.error(f" - {error}") diff --git a/src/file-processor/requirements.txt b/src/file-processor/requirements.txt index 8b4b465..8214763 100644 --- a/src/file-processor/requirements.txt +++ b/src/file-processor/requirements.txt @@ -8,4 +8,5 @@ pymongo==4.15.0 pydantic==2.11.9 redis==6.4.0 uvicorn==0.35.0 -python-magic==0.4.27 \ No newline at end of file +python-magic==0.4.27 +watchdog==6.0.0 \ No newline at end of file diff --git a/src/worker/Dockerfile b/src/worker/Dockerfile index 8723a3e..7be56ed 100644 --- a/src/worker/Dockerfile +++ b/src/worker/Dockerfile @@ -8,7 +8,7 @@ COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt # Copy application code -COPY tasks/ . +COPY . . # Command will be overridden by docker-compose CMD ["celery", "-A", "main", "worker", "--loglevel=info"] \ No newline at end of file diff --git a/src/worker/tasks/document_processing.py b/src/worker/tasks/document_processing.py new file mode 100644 index 0000000..dbec010 --- /dev/null +++ b/src/worker/tasks/document_processing.py @@ -0,0 +1,179 @@ +""" +Celery tasks for document processing with ProcessingJob status management. + +This module contains Celery tasks that handle document content extraction +and update processing job statuses throughout the task lifecycle. +""" + +import logging +from typing import Any, Dict + +from tasks.main import app as celery_app + +logger = logging.getLogger(__name__) + + +# @celery_app.task(bind=True, autoretry_for=(Exception,), retry_kwargs={'max_retries': 3, 'countdown': 60}) +# def process_document(self, document_service, job_service, filepath: str) -> Dict[str, Any]: +# """ +# Process a document file and extract its content. +# +# This task: +# 1. Updates the processing job status to PROCESSING +# 2. Performs document content extraction +# 3. Updates job status to COMPLETED or FAILED based on result +# +# Args: +# self : Celery task instance +# job_service : Instance of JobService +# document_service : Instance of DocumentService +# filepath: Full path to the document file to process +# +# Returns: +# Dictionary containing processing results +# +# Raises: +# Exception: Any processing error (will trigger retry) +# """ +# task_id = self.request.id +# logger.info(f"Starting document processing task {task_id} for file: {filepath}") +# +# try: +# # Step 1: Mark job as started +# await job_service.mark_job_as_started(task_id=task_id) +# logger.info(f"Job {task_id} marked as PROCESSING") +# +# # Step 2: Process the document (extract content, OCR, etc.) +# document = await self.document_service.create_document(filepath) +# logger.info(f"Created document record with ID: {document.id}") +# +# result = document_service.extract_document_content(filepath) +# logger.info(f"Document content extracted successfully for task {task_id}") +# +# # Step 3: Mark job as completed +# await job_service.mark_job_as_completed(task_id=task_id) +# logger.info(f"Job {task_id} marked as COMPLETED") +# +# return { +# "task_id": task_id, +# "filepath": filepath, +# "status": "completed", +# "content_length": len(result.get("content", "")), +# "extraction_method": result.get("extraction_method"), +# "processing_time": result.get("processing_time") +# } +# +# except Exception as e: +# error_message = f"Document processing failed: {str(e)}" +# logger.error(f"Task {task_id} failed: {error_message}") +# +# try: +# # Mark job as failed +# job_service.mark_job_as_failed(task_id=task_id, error_message=error_message) +# logger.info(f"Job {task_id} marked as FAILED") +# except Exception as job_error: +# logger.error(f"Failed to update job status for task {task_id}: {str(job_error)}") +# +# # Re-raise the exception to trigger Celery retry mechanism +# raise + + +@celery_app.task(name="tasks.document_processing.process_document", + bind=True, + autoretry_for=(Exception,), + retry_kwargs={'max_retries': 3, 'countdown': 60}) +def process_document(self, filepath: str) -> Dict[str, Any]: + """ + Process a document file and extract its content. + + This task: + 1. Updates the processing job status to PROCESSING + 2. Performs document content extraction + 3. Updates job status to COMPLETED or FAILED based on result + + Args: + self : Celery task instance + job_service : Instance of JobService + document_service : Instance of DocumentService + filepath: Full path to the document file to process + + Returns: + Dictionary containing processing results + + Raises: + Exception: Any processing error (will trigger retry) + """ + task_id = self.request.id + logger.info(f"Starting document processing task {task_id} for file: {filepath}") + + +@celery_app.task(bind=True) +def cleanup_old_processing_jobs(self, days_old: int = 30) -> Dict[str, Any]: + """ + Clean up old processing jobs from the database. + + This maintenance task removes completed and failed jobs older than + the specified number of days. + + Args: + days_old: Number of days after which to clean up jobs + + Returns: + Dictionary containing cleanup statistics + """ + task_id = self.request.id + logger.info(f"Starting cleanup task {task_id} for jobs older than {days_old} days") + + job_service = JobService() + + try: + # Perform cleanup + cleanup_result = job_service.cleanup_old_jobs(days_old=days_old) + + logger.info( + f"Cleanup task {task_id} completed: " + f"deleted {cleanup_result['deleted_count']} jobs" + ) + + return { + "task_id": task_id, + "status": "completed", + "deleted_count": cleanup_result["deleted_count"], + "days_old": days_old + } + + except Exception as e: + error_message = f"Cleanup task failed: {str(e)}" + logger.error(f"Cleanup task {task_id} failed: {error_message}") + raise + + +@celery_app.task(bind=True) +def get_processing_statistics(self) -> Dict[str, Any]: + """ + Generate processing statistics for monitoring. + + Returns: + Dictionary containing current processing statistics + """ + task_id = self.request.id + logger.info(f"Generating processing statistics for task {task_id}") + + job_service = JobService() + + try: + stats = job_service.get_processing_statistics() + + logger.info(f"Statistics generated for task {task_id}") + + return { + "task_id": task_id, + "status": "completed", + "statistics": stats, + "timestamp": stats.get("generated_at") + } + + except Exception as e: + error_message = f"Statistics generation failed: {str(e)}" + logger.error(f"Statistics task {task_id} failed: {error_message}") + raise diff --git a/src/worker/tasks/main.py b/src/worker/tasks/main.py index 63b2c5d..ac91b78 100644 --- a/src/worker/tasks/main.py +++ b/src/worker/tasks/main.py @@ -6,6 +6,7 @@ This module contains all Celery tasks for processing documents. import os import time + from celery import Celery # Environment variables @@ -110,4 +111,4 @@ def process_document_task(self, file_path: str): if __name__ == "__main__": - app.start() \ No newline at end of file + app.start()