2025-09-26 22:08:39 +02:00
2025-09-26 22:08:39 +02:00
2025-09-26 22:08:39 +02:00
2025-09-18 22:53:51 +02:00
2025-09-26 22:08:39 +02:00
2025-09-26 22:08:39 +02:00

MyDocManager

Overview

MyDocManager is a real-time document processing application that automatically detects files in a monitored directory, processes them asynchronously, and stores the results in a database. The application uses a modern microservices architecture with Redis for task queuing and MongoDB for data persistence.

Architecture

Technology Stack

  • Backend API: FastAPI (Python 3.12)
  • Task Processing: Celery with Redis broker
  • Document Processing: EasyOCR, PyMuPDF, python-docx, pdfplumber
  • Database: MongoDB (pymongo)
  • Frontend: React
  • Containerization: Docker & Docker Compose
  • File Monitoring: Python watchdog library

Services Architecture

┌─────────────────┐    ┌─────────────┐    ┌─────────────┐    ┌─────────────┐    ┌─────────────┐
│   Frontend      │    │ file-       │    │    Redis    │    │   Worker    │    │  MongoDB    │
│   (React)       │◄──►│ processor   │───►│  (Broker)   │◄──►│  (Celery)   │───►│ (Results)   │
│                 │    │ (FastAPI +  │    │             │    │             │    │             │
│                 │    │ watchdog)   │    │             │    │             │    │             │
└─────────────────┘    └─────────────┘    └─────────────┘    └─────────────┘    └─────────────┘

Docker Services

  1. file-processor: FastAPI + real-time file monitoring + Celery task dispatch
  2. worker: Celery workers for document processing (OCR, text extraction)
  3. redis: Message broker for Celery tasks
  4. mongodb: Final database for processing results
  5. frontend: React interface for monitoring and file access

Data Flow

  1. File Detection: Watchdog monitors target directory in real-time
  2. Task Creation: FastAPI creates Celery task for each detected file
  3. Task Processing: Worker processes document (OCR, text extraction)
  4. Result Storage: Processed data stored in MongoDB
  5. Monitoring: React frontend displays processing status and results

Document Processing Capabilities

Supported File Types

  • PDF: Direct text extraction + OCR for scanned documents
  • Word Documents: .docx text extraction
  • Images: OCR text recognition (JPG, PNG, etc.)

Processing Libraries

  • EasyOCR: Modern OCR engine (80+ languages, deep learning-based)
  • PyMuPDF: PDF text extraction and manipulation
  • python-docx: Word document processing
  • pdfplumber: Advanced PDF text extraction

Development Environment

Container-Based Development

The application is designed for container-based development with hot-reload capabilities:

  • Source code mounted as volumes for real-time updates
  • All services orchestrated via Docker Compose
  • Development and production parity

Key Features

  • Real-time Processing: Immediate file detection and processing
  • Horizontal Scaling: Multiple workers can be added easily
  • Fault Tolerance: Celery provides automatic retry mechanisms
  • Monitoring: Built-in task status tracking
  • Hot Reload: Development changes reflected instantly in containers

Docker Services

  1. file-processor: FastAPI + real-time file monitoring + Celery task dispatch
  2. worker: Celery workers for document processing (OCR, text extraction)
  3. redis: Message broker for Celery tasks
  4. mongodb: Final database for processing results
  5. frontend: React interface for monitoring and file access

Project Structure

MyDocManager/
├── docker-compose.yml
├── src/
│   ├── file-processor/
│   │   ├── Dockerfile
│   │   ├── requirements.txt
│   │   ├── app/
│   │   │   ├── main.py
│   │   │   ├── file_watcher.py             # FileWatcher class with observer thread
│   │   │   ├── celery_app.py               # Celery Configuration 
│   │   │   ├── config/
│   │   │   │   ├── __init__.py
│   │   │   │   └── settings.py              # JWT, MongoDB config
│   │   │   ├── models/
│   │   │   │   ├── __init__.py
│   │   │   │   ├── user.py                  # User Pydantic models
│   │   │   │   ├── auth.py                  # Auth Pydantic models
│   │   │   │   ├── document.py              # Document Pydantic models
│   │   │   │   ├── job.py                   # Job Processing Pydantic models
│   │   │   │   └── types.py                 # PyObjectId and other useful types
│   │   │   ├── database/
│   │   │   │   ├── __init__.py
│   │   │   │   ├── connection.py            # MongoDB connection (pymongo)
│   │   │   │   └── repositories/
│   │   │   │       ├── __init__.py
│   │   │   │       ├── user_repository.py      # User CRUD operations (synchronous)
│   │   │   │       ├── document_repository.py  # Document CRUD operations (synchronous)
│   │   │   │       └── job_repository.py       # Job CRUD operations (synchronous)
│   │   │   ├── services/
│   │   │   │   ├── __init__.py
│   │   │   │   ├── auth_service.py          # JWT & password logic (synchronous)
│   │   │   │   ├── user_service.py          # User business logic (synchronous)
│   │   │   │   ├── document_service.py      # Document business logic (synchronous)
│   │   │   │   ├── job_service.py           # Job processing logic (synchronous)
│   │   │   │   └── init_service.py          # Admin creation at startup
│   │   │   ├── api/
│   │   │   │   ├── __init__.py
│   │   │   │   ├── dependencies.py          # Auth dependencies
│   │   │   │   └── routes/
│   │   │   │       ├── __init__.py
│   │   │   │       ├── auth.py              # Authentication routes
│   │   │   │       └── users.py             # User management routes
│   │   │   └── utils/
│   │   │       ├── __init__.py
│   │   │       ├── security.py             # Password utilities
│   │   │       └── document_matching.py    # Fuzzy matching Algorithms
│   ├── worker/
│   │   ├── Dockerfile
│   │   ├── requirements.txt
│   │   └── tasks/
│   └── frontend/
│       ├── Dockerfile
│       ├── package.json
│       ├── index.html
│       └── src/
│           ├── assets/
│           ├── App.css
│           ├── App.jsx
│           ├── main.css
│           └── main.jsx
├── tests/
│   ├── file-processor/
│   │   ├── test_auth/
│   │   ├── test_users/
│   │   └── test_services/
│   └── worker/
├── volumes/
│   └── watched_files/
└── README.md

Authentication & User Management

Security Features

  • JWT Authentication: Stateless authentication with 24-hour token expiration
  • Password Security: bcrypt hashing with automatic salting
  • Role-Based Access: Admin and User roles with granular permissions
  • Protected Routes: All user management APIs require valid authentication
  • Auto Admin Creation: Default admin user created on first startup

User Roles

  • Admin: Full access to user management (create, read, update, delete users)
  • User: Limited access (view own profile, access document processing features)

Authentication Flow

  1. Login: User provides credentials → Server validates → Returns JWT token
  2. API Access: Client includes JWT in Authorization header
  3. Token Validation: Server verifies token signature and expiration
  4. Role Check: Server validates user permissions for requested resource

User Management APIs

POST /auth/login              # Generate JWT token
GET  /users                   # List all users (admin only)
POST /users                   # Create new user (admin only)
PUT  /users/{user_id}         # Update user (admin only)
DELETE /users/{user_id}       # Delete user (admin only)
GET  /users/me                # Get current user profile (authenticated users)

Useful Service URLs

Testing Commands

# Test FastAPI health
curl http://localhost:8000/health

# Test Celery task dispatch
curl -X POST http://localhost:8000/test-task \
  -H "Content-Type: application/json" \
  -d '{"message": "Hello from test!"}'

# Monitor Celery tasks
docker-compose logs -f worker

Default Admin User

On first startup, the application automatically creates a default admin user:

  • Username: admin
  • Password: admin
  • Role: admin
  • Email: admin@mydocmanager.local ⚠️ Important: Change the default admin password immediately after first login in production environments.

File Processing Architecture

Document Processing Flow

  1. File Detection: Watchdog monitors /volumes/watched_files/ directory in real-time
  2. Task Creation: File watcher creates Celery task for each detected file
  3. Document Processing: Celery worker processes the document and extracts content
  4. Database Storage: Processed data stored in MongoDB collections

MongoDB Collections Design

Files Collection

Stores file metadata and extracted content using Pydantic models:

class FileDocument(BaseModel):
  """
  Model for file documents stored in the 'files' collection.

  Represents a file detected in the watched directory with its
  metadata and extracted content.
  """
  
  id: Optional[PyObjectId] = Field(default=None, alias="_id")
  filename: str = Field(..., description="Original filename")
  filepath: str = Field(..., description="Full path to the file")
  file_type: FileType = Field(..., description="Type of the file")
  extraction_method: Optional[ExtractionMethod] = Field(default=None, description="Method used to extract content")
  metadata: Dict[str, Any] = Field(default_factory=dict, description="File-specific metadata")
  detected_at: Optional[datetime] = Field(default=None, description="Timestamp when file was detected")
  file_hash: Optional[str] = Field(default=None, description="SHA256 hash of file content")
  encoding: str = Field(default="utf-8", description="Character encoding for text files")
  file_size: int = Field(..., ge=0, description="File size in bytes")
  mime_type: str = Field(..., description="MIME type detected")
  
  @field_validator('filepath')
  @classmethod
  def validate_filepath(cls, v: str) -> str:
    """Validate filepath format."""
    if not v.strip():
      raise ValueError("Filepath cannot be empty")
    return v.strip()
  
  @field_validator('filename')
  @classmethod
  def validate_filename(cls, v: str) -> str:
    """Validate filename format."""
    if not v.strip():
      raise ValueError("Filename cannot be empty")
    return v.strip()

Processing Jobs Collection

Tracks processing status and lifecycle:

class ProcessingJob(BaseModel):
  """
  Model for processing jobs stored in the 'processing_jobs' collection.

  Tracks the lifecycle and status of document processing tasks.
  """
  
  id: Optional[PyObjectId] = Field(default=None, alias="_id")
  file_id: PyObjectId = Field(..., description="Reference to file document")
  status: ProcessingStatus = Field(default=ProcessingStatus.PENDING, description="Current processing status")
  task_id: Optional[str] = Field(default=None, description="Celery task UUID")
  created_at: Optional[datetime] = Field(default=None, description="Timestamp when job was created")
  started_at: Optional[datetime] = Field(default=None, description="Timestamp when processing started")
  completed_at: Optional[datetime] = Field(default=None, description="Timestamp when processing completed")
  error_message: Optional[str] = Field(default=None, description="Error message if processing failed")
  
  @field_validator('error_message')
  @classmethod
  def validate_error_message(cls, v: Optional[str]) -> Optional[str]:
    """Clean up error message."""
    if v is not None:
      return v.strip() if v.strip() else None
    return v

Supported File Types (Initial Implementation)

  • Text Files (.txt): Direct content reading
  • PDF Documents (.pdf): Text extraction via PyMuPDF/pdfplumber
  • Word Documents (.docx): Content extraction via python-docx

File Processing Architecture Decisions

Watchdog Implementation

  • Choice: Dedicated observer thread
  • Rationale: Standard approach, clean separation of concerns
  • Implementation: Watchdog observer runs in separate thread from FastAPI

Task Dispatch Strategy

  • Choice: Direct Celery task creation from file watcher
  • Rationale: Minimal latency, straightforward flow
  • Implementation: File detected → Immediate Celery task dispatch

Data Storage Strategy

  • Choice: Separate collections for files and processing status
  • Rationale: Clean separation of file data vs processing lifecycle
  • Benefits:
    • Better query performance
    • Clear data model boundaries
    • Easy processing status tracking

Content Storage Location

  • Choice: Store files in the file system, using the SHA256 hash as filename
  • Rationale: MongoDB is not meant for large files, better performance. Files remain in the file system for easy access.

Repository and Services Implementation

  • Choice: Synchronous implementation using pymongo
  • Rationale: Full compatibility with Celery workers and simplified workflow
  • Implementation: All repositories and services operate synchronously for seamless integration

Implementation Status

  1. Pydantic models for MongoDB collections
  2. Repository layer for data access (files + processing_jobs + users + documents) - synchronous
  3. Service layer for business logic (auth, user, document, job) - synchronous
  4. Celery tasks for document processing
  5. Watchdog file monitoring implementation
  6. FastAPI integration and startup coordination

Job Management Layer

Repository Pattern Implementation

The job management system follows the repository pattern for clean separation between data access and business logic.

JobRepository

Handles direct MongoDB operations for processing jobs using synchronous pymongo:

CRUD Operations:

  • create_job() - Create new processing job with automatic created_at timestamp
  • get_job_by_id() - Retrieve job by ObjectId
  • update_job_status() - Update job status with automatic timestamp management
  • delete_job() - Remove job from database
  • get_jobs_by_file_id() - Get all jobs for specific file
  • get_jobs_by_status() - Get jobs filtered by processing status

Automatic Timestamp Management:

  • created_at: Set automatically during job creation
  • started_at: Set automatically when status changes to PROCESSING
  • completed_at: Set automatically when status changes to COMPLETED or FAILED

JobService

Provides synchronous business logic layer with strict status transition validation:

Status Transition Methods:

  • mark_job_as_started() - PENDING → PROCESSING
  • mark_job_as_completed() - PROCESSING → COMPLETED
  • mark_job_as_failed() - PROCESSING → FAILED

Validation Rules:

  • Strict status transitions (invalid transitions raise exceptions)
  • Job existence verification before any operation
  • Automatic timestamp management through repository layer

Custom Exceptions

InvalidStatusTransitionError: Raised for invalid status transitions
JobRepositoryError: Raised for MongoDB operation failures

Valid Status Transitions

PENDING → PROCESSING    (via mark_job_as_started)
PROCESSING → COMPLETED  (via mark_job_as_completed)
PROCESSING → FAILED     (via mark_job_as_failed)

All other transitions are forbidden and will raise InvalidStatusTransitionError.

File Structure

src/file-processor/app/
├── database/repositories/
│   ├── job_repository.py           # JobRepository class (synchronous)
│   ├── user_repository.py          # UserRepository class (synchronous)
│   ├── document_repository.py      # DocumentRepository class (synchronous)
│   └── file_repository.py          # FileRepository class (synchronous)
├── services/  
│   ├── job_service.py              # JobService class (synchronous)
│   ├── auth_service.py             # AuthService class (synchronous)
│   ├── user_service.py             # UserService class (synchronous)
│   └── document_service.py         # DocumentService class (synchronous)
└── exceptions/
    └── job_exceptions.py           # Custom exceptions

Processing Pipeline Features

  • Duplicate Detection: SHA256 hashing prevents reprocessing same files
  • Error Handling: Failed processing tracked with error messages
  • Status Tracking: Real-time processing status via processing_jobs collection
  • Extensible Metadata: Flexible metadata storage per file type
  • Multiple Extraction Methods: Support for direct text, OCR, and hybrid approaches
  • Synchronous Operations: All database operations use pymongo for Celery compatibility

Key Implementation Notes

Python Standards

  • Style: PEP 8 compliance
  • Documentation: Google/NumPy docstring format
  • Naming: snake_case for variables and functions
  • Testing: pytest with test_i_can_xxx / test_i_cannot_xxx patterns

Security Best Practices

  • Password Storage: Never store plain text passwords, always use bcrypt hashing
  • JWT Secrets: Use strong, randomly generated secret keys in production
  • Token Expiration: 24-hour expiration with secure signature validation
  • Role Validation: Server-side role checking for all protected endpoints

Dependencies Management

  • Package Manager: pip (standard)
  • External Dependencies: Listed in each service's requirements.txt
  • Standard Library First: Prefer standard library when possible
  • Database Driver: pymongo for synchronous MongoDB operations

Testing Strategy

  • All code must be testable
  • Unit tests for each authentication and user management function
  • Integration tests for complete authentication flow
  • Tests validated before implementation

Critical Architecture Decisions Made

  1. JWT Authentication: Simple token-based auth with 24-hour expiration
  2. Role-Based Access: Admin/User roles for granular permissions
  3. bcrypt Password Hashing: Industry-standard password security
  4. MongoDB User Storage: Centralized user management in main database
  5. Auto Admin Creation: Automatic setup for first-time deployment
  6. Single FastAPI Service: Handles both API and file watching with authentication
  7. Celery with Redis: Chosen over other async patterns for scalability
  8. EasyOCR Preferred: Selected over Tesseract for modern OCR needs
  9. Container Development: Hot-reload setup required for development workflow
  10. Dedicated Watchdog Observer: Thread-based file monitoring for reliability
  11. Separate MongoDB Collections: Files and processing jobs stored separately
  12. Content in Files Collection: Extracted content stored with file metadata
  13. Direct Task Dispatch: File watcher directly creates Celery tasks
  14. SHA256 Duplicate Detection: Prevents reprocessing identical files
  15. Synchronous Implementation: All repositories and services use pymongo for Celery compatibility

Development Process Requirements

  1. Collaborative Validation: All options must be explained before coding
  2. Test-First Approach: Test cases defined and validated before implementation
  3. Incremental Development: Start simple, extend functionality progressively
  4. Error Handling: Clear problem explanation required before proposing fixes

Next Implementation Steps

  1. TODO: Complete file processing pipeline =>
    1. Create Pydantic models for files and processing_jobs collections
    2. Implement repository layer for file and processing job data access (synchronous)
    3. Implement service layer for business logic (synchronous)
    4. Create Celery tasks for document processing (.txt, .pdf, .docx)
    5. Implement Watchdog file monitoring with dedicated observer
    6. Integrate file watcher with FastAPI startup
  2. Create protected API routes for user management
  3. Build React monitoring interface with authentication

Annexes

Docker Commands Reference

Initial Setup & Build

# Build and start all services (first time)
docker-compose up --build

# Build and start in background
docker-compose up --build -d

# Build specific service
docker-compose build file-processor
docker-compose build worker

Development Workflow

# Start all services
docker-compose up

# Start in background (detached mode)
docker-compose up -d

# Stop all services
docker-compose down

# Stop and remove volumes (⚠️ deletes MongoDB data)
docker-compose down -v

# Restart specific service
docker-compose restart file-processor
docker-compose restart worker
docker-compose restart redis
docker-compose restart mongodb

Monitoring & Debugging

# View logs of all services
docker-compose logs

# View logs of specific service
docker-compose logs file-processor
docker-compose logs worker
docker-compose logs redis
docker-compose logs mongodb

# Follow logs in real-time
docker-compose logs -f
docker-compose logs -f worker

# View running containers
docker-compose ps

# Execute command in running container
docker-compose exec file-processor bash
docker-compose exec worker bash
docker-compose exec mongodb mongosh

Service Management

# Start only specific services
docker-compose up redis mongodb file-processor

# Stop specific service
docker-compose stop worker
docker-compose stop file-processor

# Remove stopped containers
docker-compose rm

# Scale workers (multiple instances)
docker-compose up --scale worker=3

Hot-Reload Configuration

  • file-processor: Hot-reload enabled via --reload flag
    • Code changes in src/file-processor/app/ automatically restart FastAPI
  • worker: No hot-reload (manual restart required for stability)
    • Code changes in src/worker/tasks/ require: docker-compose restart worker
Description
Gestionnaire de documents
Readme 626 KiB
Languages
Python 97.4%
JavaScript 0.9%
CSS 0.8%
Dockerfile 0.7%
HTML 0.2%