Working on tasks

This commit is contained in:
2025-09-21 22:51:34 +02:00
parent 98c43feadf
commit 34f7854b3c
14 changed files with 617 additions and 169 deletions

2
.gitignore vendored
View File

@@ -1,3 +1,5 @@
volumes
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[codz]

View File

@@ -95,8 +95,8 @@ MyDocManager/
│ │ ├── requirements.txt
│ │ ├── app/
│ │ │ ├── main.py
│ │ │ ├── file_watcher.py
│ │ │ ├── celery_app.py
│ │ │ ├── file_watcher.py # FileWatcher class with observer thread
│ │ │ ├── celery_app.py # Celery Configuration
│ │ │ ├── config/
│ │ │ │ ├── __init__.py
│ │ │ │ └── settings.py # JWT, MongoDB config

View File

@@ -34,9 +34,10 @@ services:
environment:
- REDIS_URL=redis://redis:6379/0
- MONGODB_URL=mongodb://admin:password123@mongodb:27017/mydocmanager?authSource=admin
- PYTHONPATH=/app
- PYTHONPATH=/app:/tasks # Added /tasks to Python path
volumes:
- ./src/file-processor:/app
- ./src/worker/tasks:/app/tasks # <- Added: shared access to worker tasks
- ./volumes/watched_files:/watched_files
- ./volumes/objects:/objects
depends_on:
@@ -57,14 +58,15 @@ services:
- MONGODB_URL=mongodb://admin:password123@mongodb:27017/mydocmanager?authSource=admin
- PYTHONPATH=/app
volumes:
- ./src/worker/tasks:/app
- ./src/worker:/app
- ./volumes/watched_files:/watched_files
depends_on:
- redis
- mongodb
networks:
- mydocmanager-network
command: celery -A main worker --loglevel=info
command: celery -A tasks.main worker --loglevel=info
#command: celery -A main --loglevel=info # pour la production
volumes:
mongodb-data:

View File

@@ -45,6 +45,7 @@ tzdata==2025.2
uvicorn==0.35.0
uvloop==0.21.0
vine==5.1.0
watchdog==6.0.0
watchfiles==1.1.0
wcwidth==0.2.13
websockets==15.0.1

View File

@@ -3,6 +3,12 @@ FROM python:3.12-slim
# Set working directory
WORKDIR /app
# Install libmagic
RUN apt-get update && apt-get install -y --no-install-recommends \
libmagic1 \
file \
&& rm -rf /var/lib/apt/lists/*
# Copy requirements and install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

View File

@@ -6,7 +6,6 @@ using simple os.getenv() approach without external validation libraries.
"""
import os
from typing import Optional
def get_mongodb_url() -> str:
@@ -51,15 +50,6 @@ def get_jwt_secret_key() -> str:
raise ValueError("JWT_SECRET environment variable must be set in production")
return secret
def get_objects_folder() -> str:
"""
Get Vault path from environment variables.
Returns:
str: Vault path
"""
return os.getenv("OBJECTS_FOLDER", "/objects")
def get_jwt_algorithm() -> str:
"""
@@ -91,4 +81,19 @@ def is_development_environment() -> bool:
Returns:
bool: True if development environment
"""
return os.getenv("ENVIRONMENT", "development").lower() == "development"
return os.getenv("ENVIRONMENT", "development").lower() == "development"
def get_objects_folder() -> str:
"""
Get Vault path from environment variables.
Returns:
str: Vault path
"""
return os.getenv("OBJECTS_FOLDER", "/objects")
def watch_directory() -> str:
"""Directory to monitor for new files"""
return os.getenv("WATCH_DIRECTORY", "/watched_files")

View File

@@ -0,0 +1,241 @@
"""
File watcher implementation with Watchdog observer and ProcessingJob management.
This module provides real-time file monitoring for document processing.
When a file is created in the watched directory, it:
1. Creates a document record via DocumentService
2. Dispatches a Celery task for processing
3. Creates a ProcessingJob to track the task lifecycle
"""
import logging
import threading
from pathlib import Path
from typing import Optional
from watchdog.events import FileSystemEventHandler, FileCreatedEvent
from watchdog.observers import Observer
from app.services.document_service import DocumentService
from app.services.job_service import JobService
logger = logging.getLogger(__name__)
class DocumentFileEventHandler(FileSystemEventHandler):
"""
Event handler for document file creation events.
Processes newly created files by creating document records,
dispatching Celery tasks, and managing processing jobs.
"""
SUPPORTED_EXTENSIONS = {'.txt', '.pdf', '.docx'}
def __init__(self, document_service: DocumentService, job_service: JobService):
"""
Initialize the event handler.
Args:
document_service: Service for document management
job_service: Service for processing job management
"""
super().__init__()
self.document_service = document_service
self.job_service = job_service
def on_created(self, event: FileCreatedEvent) -> None:
"""
Handle file creation events.
Args:
event: File system event containing file path information
"""
if event.is_directory:
return
filepath = event.src_path
file_extension = Path(filepath).suffix.lower()
if file_extension not in self.SUPPORTED_EXTENSIONS:
logger.info(f"Ignoring unsupported file type: {filepath}")
return
logger.info(f"Processing new file: {filepath}")
try:
from tasks.document_processing import process_document
celery_result = process_document.delay(filepath)
celery_task_id = celery_result.id
logger.info(f"Dispatched Celery task with ID: {celery_task_id}")
except Exception as e:
logger.error(f"Failed to process file {filepath}: {str(e)}")
# Note: We don't re-raise the exception to keep the watcher running
class FileWatcher:
"""
File system watcher for automatic document processing.
Monitors a directory for new files and triggers processing pipeline
using a dedicated observer thread.
"""
def __init__(
self,
watch_directory: str,
document_service: DocumentService,
job_service: JobService,
recursive: bool = True
):
"""
Initialize the file watcher.
Args:
watch_directory: Directory path to monitor
document_service: Service for document management
job_service: Service for processing job management
recursive: Whether to watch subdirectories recursively
"""
self.watch_directory = Path(watch_directory)
self.recursive = recursive
self.observer: Optional[Observer] = None
self._observer_thread: Optional[threading.Thread] = None
self._stop_event = threading.Event()
# Validate watch directory
if not self.watch_directory.exists():
raise ValueError(f"Watch directory does not exist: {watch_directory}")
if not self.watch_directory.is_dir():
raise ValueError(f"Watch path is not a directory: {watch_directory}")
# Create event handler
self.event_handler = DocumentFileEventHandler(
document_service=document_service,
job_service=job_service
)
logger.info(f"FileWatcher initialized for directory: {self.watch_directory}")
def start(self) -> None:
"""
Start the file watcher in a separate thread.
Raises:
RuntimeError: If the watcher is already running
"""
if self.is_running():
raise RuntimeError("FileWatcher is already running")
self.observer = Observer()
self.observer.schedule(
self.event_handler,
str(self.watch_directory),
recursive=self.recursive
)
# Start observer in separate thread
self._observer_thread = threading.Thread(
target=self._run_observer,
name="FileWatcher-Observer"
)
self._stop_event.clear()
self._observer_thread.start()
logger.info("FileWatcher started successfully")
def stop(self, timeout: float = 5.0) -> None:
"""
Stop the file watcher gracefully.
Args:
timeout: Maximum time to wait for graceful shutdown
"""
if not self.is_running():
logger.warning("FileWatcher is not running")
return
logger.info("Stopping FileWatcher...")
# Signal stop and wait for observer thread
self._stop_event.set()
if self.observer:
self.observer.stop()
if self._observer_thread and self._observer_thread.is_alive():
self._observer_thread.join(timeout=timeout)
if self._observer_thread.is_alive():
logger.warning("FileWatcher thread did not stop gracefully within timeout")
else:
logger.info("FileWatcher stopped gracefully")
# Clean up
self.observer = None
self._observer_thread = None
def is_running(self) -> bool:
"""
Check if the file watcher is currently running.
Returns:
True if the watcher is running, False otherwise
"""
return (
self.observer is not None
and self._observer_thread is not None
and self._observer_thread.is_alive()
)
def _run_observer(self) -> None:
"""
Internal method to run the observer in a separate thread.
This method should not be called directly.
"""
if not self.observer:
logger.error("Observer not initialized")
return
try:
self.observer.start()
logger.info("Observer thread started")
# Keep the observer running until stop is requested
while not self._stop_event.is_set():
self._stop_event.wait(timeout=1.0)
logger.info("Observer thread stopping...")
except Exception as e:
logger.error(f"Observer thread error: {str(e)}")
finally:
if self.observer:
self.observer.join()
logger.info("Observer thread stopped")
def create_file_watcher(
watch_directory: str,
document_service: DocumentService,
job_service: JobService
) -> FileWatcher:
"""
Factory function to create a FileWatcher instance.
Args:
watch_directory: Directory path to monitor
document_service: Service for document management
job_service: Service for processing job management
Returns:
Configured FileWatcher instance
"""
return FileWatcher(
watch_directory=watch_directory,
document_service=document_service,
job_service=job_service
)

View File

@@ -1,205 +1,174 @@
"""
FastAPI application for MyDocManager file processor service.
FastAPI application with integrated FileWatcher for document processing.
This service provides API endpoints for health checks and task dispatching.
This module provides the main FastAPI application with:
- JWT authentication
- User management APIs
- Real-time file monitoring via FileWatcher
- Document processing via Celery tasks
"""
import logging
import os
from contextlib import asynccontextmanager
from typing import AsyncGenerator
import redis
from celery import Celery
from fastapi import FastAPI, HTTPException, Depends
from pydantic import BaseModel
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from app.database.connection import test_database_connection, get_database
from app.database.repositories.user_repository import UserRepository
from app.models.user import UserCreate
from app.config import settings
from app.database.connection import get_database
from app.file_watcher import create_file_watcher, FileWatcher
from app.services.document_service import DocumentService
from app.services.init_service import InitializationService
from app.services.job_service import JobService
from app.services.user_service import UserService
# from api.routes.auth import router as auth_router
# from api.routes.users import router as users_router
# from api.routes.documents import router as documents_router
# from api.routes.jobs import router as jobs_router
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Global file watcher instance
file_watcher: FileWatcher = None
@asynccontextmanager
async def lifespan(app: FastAPI):
async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
"""
Application lifespan manager for startup and shutdown tasks.
Handles initialization tasks that need to run when the application starts,
including admin user creation and other setup procedures.
FastAPI lifespan context manager.
Handles application startup and shutdown events including:
- Database connection
- Default admin user creation
- FileWatcher startup/shutdown
"""
# Startup tasks
global file_watcher
# Startup
logger.info("Starting MyDocManager application...")
try:
# Initialize database connection
database = get_database()
logger.info("Database connection established")
# Initialize repositories and services
user_service = await UserService(database).initialize()
document_service = DocumentService(database=database, objects_folder=settings.get_objects_folder())
job_service = JobService(database=database)
user_service = UserService(database=database)
logger.info("Service created")
# Create default admin user
init_service = InitializationService(user_service)
await init_service.initialize_application()
logger.info("Default admin user initialization completed")
# Run initialization tasks
initialization_result = await init_service.initialize_application()
# Create and start file watcher
file_watcher = create_file_watcher(
watch_directory=settings.watch_directory(),
document_service=document_service,
job_service=job_service
)
file_watcher.start()
logger.info(f"FileWatcher started for directory: {settings.watch_directory()}")
if initialization_result["initialization_success"]:
logger.info("Application startup completed successfully")
if initialization_result["admin_user_created"]:
logger.info("Default admin user was created during startup")
else:
logger.error("Application startup completed with errors:")
for error in initialization_result["errors"]:
logger.error(f" - {error}")
logger.info("Application startup completed successfully")
yield
except Exception as e:
raise e
logger.error(f"Critical error during application startup: {str(e)}")
# You might want to decide if the app should continue or exit here
# For now, we log the error but continue
logger.error(f"Application startup failed: {str(e)}")
raise
yield # Application is running
# Shutdown tasks (if needed)
logger.info("Shutting down MyDocManager application...")
finally:
# Shutdown
logger.info("Shutting down MyDocManager application...")
if file_watcher and file_watcher.is_running():
file_watcher.stop()
logger.info("FileWatcher stopped")
logger.info("Application shutdown completed")
# Initialize FastAPI app
# Create FastAPI application
app = FastAPI(
title="MyDocManager File Processor",
description="File processing and task dispatch service",
version="1.0.0",
title="MyDocManager",
description="Real-time document processing application with authentication",
version="0.1.0",
lifespan=lifespan
)
# Environment variables
REDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379/0")
MONGODB_URL = os.getenv("MONGODB_URL", "mongodb://localhost:27017")
# Initialize Redis client
try:
redis_client = redis.from_url(REDIS_URL)
except Exception as e:
redis_client = None
print(f"Warning: Could not connect to Redis: {e}")
# Initialize Celery
celery_app = Celery(
"file_processor",
broker=REDIS_URL,
backend=REDIS_URL
# Configure CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["http://localhost:3000"], # React frontend
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Pydantic models
class TestTaskRequest(BaseModel):
"""Request model for test task."""
message: str
def get_user_service() -> UserService:
"""
Dependency to get user service instance.
This should be properly implemented with database connection management
in your actual application.
"""
database = get_database()
user_repository = UserRepository(database)
return UserService(user_repository)
# Your API routes would use the service like this:
@app.post("/api/users")
async def create_user(
user_data: UserCreate,
user_service: UserService = Depends(get_user_service)
):
return user_service.create_user(user_data)
# Include routers
# app.include_router(auth_router, prefix="/auth", tags=["Authentication"])
# app.include_router(users_router, prefix="/users", tags=["User Management"])
# app.include_router(documents_router, prefix="/documents", tags=["Documents"])
# app.include_router(jobs_router, prefix="/jobs", tags=["Processing Jobs"])
@app.get("/health")
async def health_check():
"""
Health check endpoint.
Returns:
dict: Service health status with dependencies
Dictionary containing application health status
"""
health_status = {
return {
"status": "healthy",
"service": "file-processor",
"dependencies": {
"redis": "unknown",
"mongodb": "unknown"
},
"service": "MyDocManager",
"version": "1.0.0",
"file_watcher_running": file_watcher.is_running() if file_watcher else False
}
# Check Redis connection
if redis_client:
try:
redis_client.ping()
health_status["dependencies"]["redis"] = "connected"
except Exception:
health_status["dependencies"]["redis"] = "disconnected"
health_status["status"] = "degraded"
# check MongoDB connection
if test_database_connection():
health_status["dependencies"]["mongodb"] = "connected"
else:
health_status["dependencies"]["mongodb"] = "disconnected"
return health_status
@app.post("/test-task")
async def dispatch_test_task(request: TestTaskRequest):
"""
Dispatch a test task to Celery worker.
Args:
request: Test task request containing message
Returns:
dict: Task dispatch information
Raises:
HTTPException: If task dispatch fails
"""
try:
# Send task to worker
task = celery_app.send_task(
"main.test_task",
args=[request.message]
)
return {
"status": "dispatched",
"task_id": task.id,
"message": f"Test task dispatched with message: {request.message}"
}
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"Failed to dispatch task: {str(e)}"
)
@app.get("/")
async def root():
"""
Root endpoint.
Root endpoint with basic application information.
Returns:
dict: Basic service information
Dictionary containing welcome message and available endpoints
"""
return {
"service": "MyDocManager File Processor",
"version": "1.0.0",
"status": "running"
"message": "Welcome to MyDocManager",
"description": "Real-time document processing application",
"docs": "/docs",
"health": "/health"
}
@app.get("/watcher/status")
async def watcher_status():
"""
Get file watcher status.
Returns:
Dictionary containing file watcher status information
"""
if not file_watcher:
return {
"status": "not_initialized",
"running": False
}
return {
"status": "initialized",
"running": file_watcher.is_running(),
"watch_directory": str(file_watcher.watch_directory),
"recursive": file_watcher.recursive
}

View File

@@ -95,6 +95,28 @@ class DocumentService:
"""
return magic.from_buffer(file_bytes, mime=True)
@staticmethod
def _read_file_bytes(file_path: str | Path) -> bytes:
"""
Read file content as bytes asynchronously.
Args:
file_path (str | Path): Path of the file to read
Returns:
bytes: Content of the file
Raises:
FileNotFoundError: If the file does not exist
OSError: If any I/O error occurs
"""
path = Path(file_path)
if not path.exists():
raise FileNotFoundError(f"File not found: {file_path}")
return path.read_bytes()
def _get_document_path(self, file_hash):
"""
@@ -117,7 +139,7 @@ class DocumentService:
async def create_document(
self,
file_path: str,
file_bytes: bytes,
file_bytes: bytes | None = None,
encoding: str = "utf-8"
) -> FileDocument:
"""
@@ -140,6 +162,7 @@ class DocumentService:
PyMongoError: If database operation fails
"""
# Calculate automatic attributes
file_bytes = file_bytes or self._read_file_bytes(file_path)
file_hash = self._calculate_file_hash(file_bytes)
file_type = self._detect_file_type(file_path)
mime_type = self._detect_mime_type(file_bytes)

View File

@@ -8,8 +8,8 @@ creating default admin user if none exists.
import logging
from typing import Optional
from app.models.user import UserCreate, UserInDB, UserCreateNoValidation
from app.models.auth import UserRole
from app.models.user import UserInDB, UserCreateNoValidation
from app.services.user_service import UserService
logger = logging.getLogger(__name__)
@@ -31,7 +31,6 @@ class InitializationService:
user_service (UserService): Service for user operations
"""
self.user_service = user_service
async def ensure_admin_user_exists(self) -> Optional[UserInDB]:
"""
@@ -131,4 +130,23 @@ class InitializationService:
logger.error(error_msg)
initialization_summary["errors"].append(error_msg)
return initialization_summary
self.log_initialization_result(initialization_summary)
return initialization_summary
@staticmethod
def log_initialization_result(summary: dict) -> None:
"""
Log the result of the initialization process.
Args:
summary (dict): Summary of initialization tasks performed
"""
if summary["initialization_success"]:
logger.info("Application startup completed successfully")
if summary["admin_user_created"]:
logger.info("Default admin user was created during startup")
else:
logger.error("Application startup completed with errors:")
for error in summary["errors"]:
logger.error(f" - {error}")

View File

@@ -8,4 +8,5 @@ pymongo==4.15.0
pydantic==2.11.9
redis==6.4.0
uvicorn==0.35.0
python-magic==0.4.27
python-magic==0.4.27
watchdog==6.0.0

View File

@@ -8,7 +8,7 @@ COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application code
COPY tasks/ .
COPY . .
# Command will be overridden by docker-compose
CMD ["celery", "-A", "main", "worker", "--loglevel=info"]

View File

@@ -0,0 +1,179 @@
"""
Celery tasks for document processing with ProcessingJob status management.
This module contains Celery tasks that handle document content extraction
and update processing job statuses throughout the task lifecycle.
"""
import logging
from typing import Any, Dict
from tasks.main import app as celery_app
logger = logging.getLogger(__name__)
# @celery_app.task(bind=True, autoretry_for=(Exception,), retry_kwargs={'max_retries': 3, 'countdown': 60})
# def process_document(self, document_service, job_service, filepath: str) -> Dict[str, Any]:
# """
# Process a document file and extract its content.
#
# This task:
# 1. Updates the processing job status to PROCESSING
# 2. Performs document content extraction
# 3. Updates job status to COMPLETED or FAILED based on result
#
# Args:
# self : Celery task instance
# job_service : Instance of JobService
# document_service : Instance of DocumentService
# filepath: Full path to the document file to process
#
# Returns:
# Dictionary containing processing results
#
# Raises:
# Exception: Any processing error (will trigger retry)
# """
# task_id = self.request.id
# logger.info(f"Starting document processing task {task_id} for file: {filepath}")
#
# try:
# # Step 1: Mark job as started
# await job_service.mark_job_as_started(task_id=task_id)
# logger.info(f"Job {task_id} marked as PROCESSING")
#
# # Step 2: Process the document (extract content, OCR, etc.)
# document = await self.document_service.create_document(filepath)
# logger.info(f"Created document record with ID: {document.id}")
#
# result = document_service.extract_document_content(filepath)
# logger.info(f"Document content extracted successfully for task {task_id}")
#
# # Step 3: Mark job as completed
# await job_service.mark_job_as_completed(task_id=task_id)
# logger.info(f"Job {task_id} marked as COMPLETED")
#
# return {
# "task_id": task_id,
# "filepath": filepath,
# "status": "completed",
# "content_length": len(result.get("content", "")),
# "extraction_method": result.get("extraction_method"),
# "processing_time": result.get("processing_time")
# }
#
# except Exception as e:
# error_message = f"Document processing failed: {str(e)}"
# logger.error(f"Task {task_id} failed: {error_message}")
#
# try:
# # Mark job as failed
# job_service.mark_job_as_failed(task_id=task_id, error_message=error_message)
# logger.info(f"Job {task_id} marked as FAILED")
# except Exception as job_error:
# logger.error(f"Failed to update job status for task {task_id}: {str(job_error)}")
#
# # Re-raise the exception to trigger Celery retry mechanism
# raise
@celery_app.task(name="tasks.document_processing.process_document",
bind=True,
autoretry_for=(Exception,),
retry_kwargs={'max_retries': 3, 'countdown': 60})
def process_document(self, filepath: str) -> Dict[str, Any]:
"""
Process a document file and extract its content.
This task:
1. Updates the processing job status to PROCESSING
2. Performs document content extraction
3. Updates job status to COMPLETED or FAILED based on result
Args:
self : Celery task instance
job_service : Instance of JobService
document_service : Instance of DocumentService
filepath: Full path to the document file to process
Returns:
Dictionary containing processing results
Raises:
Exception: Any processing error (will trigger retry)
"""
task_id = self.request.id
logger.info(f"Starting document processing task {task_id} for file: {filepath}")
@celery_app.task(bind=True)
def cleanup_old_processing_jobs(self, days_old: int = 30) -> Dict[str, Any]:
"""
Clean up old processing jobs from the database.
This maintenance task removes completed and failed jobs older than
the specified number of days.
Args:
days_old: Number of days after which to clean up jobs
Returns:
Dictionary containing cleanup statistics
"""
task_id = self.request.id
logger.info(f"Starting cleanup task {task_id} for jobs older than {days_old} days")
job_service = JobService()
try:
# Perform cleanup
cleanup_result = job_service.cleanup_old_jobs(days_old=days_old)
logger.info(
f"Cleanup task {task_id} completed: "
f"deleted {cleanup_result['deleted_count']} jobs"
)
return {
"task_id": task_id,
"status": "completed",
"deleted_count": cleanup_result["deleted_count"],
"days_old": days_old
}
except Exception as e:
error_message = f"Cleanup task failed: {str(e)}"
logger.error(f"Cleanup task {task_id} failed: {error_message}")
raise
@celery_app.task(bind=True)
def get_processing_statistics(self) -> Dict[str, Any]:
"""
Generate processing statistics for monitoring.
Returns:
Dictionary containing current processing statistics
"""
task_id = self.request.id
logger.info(f"Generating processing statistics for task {task_id}")
job_service = JobService()
try:
stats = job_service.get_processing_statistics()
logger.info(f"Statistics generated for task {task_id}")
return {
"task_id": task_id,
"status": "completed",
"statistics": stats,
"timestamp": stats.get("generated_at")
}
except Exception as e:
error_message = f"Statistics generation failed: {str(e)}"
logger.error(f"Statistics task {task_id} failed: {error_message}")
raise

View File

@@ -6,6 +6,7 @@ This module contains all Celery tasks for processing documents.
import os
import time
from celery import Celery
# Environment variables
@@ -110,4 +111,4 @@ def process_document_task(self, file_path: str):
if __name__ == "__main__":
app.start()
app.start()