MyDocManager
Overview
MyDocManager is a real-time document processing application that automatically detects files in a monitored directory, processes them asynchronously, and stores the results in a database. The application uses a modern microservices architecture with Redis for task queuing and MongoDB for data persistence.
Architecture
Technology Stack
- Backend API: FastAPI (Python 3.12)
- Task Processing: Celery with Redis broker
- Document Processing: EasyOCR, PyMuPDF, python-docx, pdfplumber
- Database: MongoDB (pymongo)
- Frontend: React
- Containerization: Docker & Docker Compose
- File Monitoring: Python watchdog library
Services Architecture
┌─────────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Frontend │ │ file- │ │ Redis │ │ Worker │ │ MongoDB │
│ (React) │◄──►│ processor │───►│ (Broker) │◄──►│ (Celery) │───►│ (Results) │
│ │ │ (FastAPI + │ │ │ │ │ │ │
│ │ │ watchdog) │ │ │ │ │ │ │
└─────────────────┘ └─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘
Docker Services
- file-processor: FastAPI + real-time file monitoring + Celery task dispatch
- worker: Celery workers for document processing (OCR, text extraction)
- redis: Message broker for Celery tasks
- mongodb: Final database for processing results
- frontend: React interface for monitoring and file access
Data Flow
- File Detection: Watchdog monitors target directory in real-time
- Task Creation: FastAPI creates Celery task for each detected file
- Task Processing: Worker processes document (OCR, text extraction)
- Result Storage: Processed data stored in MongoDB
- Monitoring: React frontend displays processing status and results
Document Processing Capabilities
Supported File Types
- PDF: Direct text extraction + OCR for scanned documents
- Word Documents: .docx text extraction
- Images: OCR text recognition (JPG, PNG, etc.)
Processing Libraries
- EasyOCR: Modern OCR engine (80+ languages, deep learning-based)
- PyMuPDF: PDF text extraction and manipulation
- python-docx: Word document processing
- pdfplumber: Advanced PDF text extraction
Development Environment
Container-Based Development
The application is designed for container-based development with hot-reload capabilities:
- Source code mounted as volumes for real-time updates
- All services orchestrated via Docker Compose
- Development and production parity
Key Features
- Real-time Processing: Immediate file detection and processing
- Horizontal Scaling: Multiple workers can be added easily
- Fault Tolerance: Celery provides automatic retry mechanisms
- Monitoring: Built-in task status tracking
- Hot Reload: Development changes reflected instantly in containers
Docker Services
- file-processor: FastAPI + real-time file monitoring + Celery task dispatch
- worker: Celery workers for document processing (OCR, text extraction)
- redis: Message broker for Celery tasks
- mongodb: Final database for processing results
- frontend: React interface for monitoring and file access
Project Structure
MyDocManager/
├── docker-compose.yml
├── src/
│ ├── file-processor/
│ │ ├── Dockerfile
│ │ ├── requirements.txt
│ │ ├── app/
│ │ │ ├── main.py
│ │ │ ├── file_watcher.py # FileWatcher class with observer thread
│ │ │ ├── celery_app.py # Celery Configuration
│ │ │ ├── config/
│ │ │ │ ├── __init__.py
│ │ │ │ └── settings.py # JWT, MongoDB config
│ │ │ ├── models/
│ │ │ │ ├── __init__.py
│ │ │ │ ├── user.py # User Pydantic models
│ │ │ │ ├── auth.py # Auth Pydantic models
│ │ │ │ ├── document.py # Document Pydantic models
│ │ │ │ ├── job.py # Job Processing Pydantic models
│ │ │ │ └── types.py # PyObjectId and other useful types
│ │ │ ├── database/
│ │ │ │ ├── __init__.py
│ │ │ │ ├── connection.py # MongoDB connection (pymongo)
│ │ │ │ └── repositories/
│ │ │ │ ├── __init__.py
│ │ │ │ ├── user_repository.py # User CRUD operations (synchronous)
│ │ │ │ ├── document_repository.py # Document CRUD operations (synchronous)
│ │ │ │ └── job_repository.py # Job CRUD operations (synchronous)
│ │ │ ├── services/
│ │ │ │ ├── __init__.py
│ │ │ │ ├── auth_service.py # JWT & password logic (synchronous)
│ │ │ │ ├── user_service.py # User business logic (synchronous)
│ │ │ │ ├── document_service.py # Document business logic (synchronous)
│ │ │ │ ├── job_service.py # Job processing logic (synchronous)
│ │ │ │ └── init_service.py # Admin creation at startup
│ │ │ ├── api/
│ │ │ │ ├── __init__.py
│ │ │ │ ├── dependencies.py # Auth dependencies
│ │ │ │ └── routes/
│ │ │ │ ├── __init__.py
│ │ │ │ ├── auth.py # Authentication routes
│ │ │ │ └── users.py # User management routes
│ │ │ └── utils/
│ │ │ ├── __init__.py
│ │ │ ├── security.py # Password utilities
│ │ │ └── document_matching.py # Fuzzy matching Algorithms
│ ├── worker/
│ │ ├── Dockerfile
│ │ ├── requirements.txt
│ │ └── tasks/
│ └── frontend/
│ ├── Dockerfile
│ ├── package.json
│ ├── index.html
│ └── src/
│ ├── assets/
│ ├── App.css
│ ├── App.jsx
│ ├── main.css
│ └── main.jsx
├── tests/
│ ├── file-processor/
│ │ ├── test_auth/
│ │ ├── test_users/
│ │ └── test_services/
│ └── worker/
├── volumes/
│ └── watched_files/
└── README.md
Authentication & User Management
Security Features
- JWT Authentication: Stateless authentication with 24-hour token expiration
- Password Security: bcrypt hashing with automatic salting
- Role-Based Access: Admin and User roles with granular permissions
- Protected Routes: All user management APIs require valid authentication
- Auto Admin Creation: Default admin user created on first startup
User Roles
- Admin: Full access to user management (create, read, update, delete users)
- User: Limited access (view own profile, access document processing features)
Authentication Flow
- Login: User provides credentials → Server validates → Returns JWT token
- API Access: Client includes JWT in Authorization header
- Token Validation: Server verifies token signature and expiration
- Role Check: Server validates user permissions for requested resource
User Management APIs
POST /auth/login # Generate JWT token
GET /users # List all users (admin only)
POST /users # Create new user (admin only)
PUT /users/{user_id} # Update user (admin only)
DELETE /users/{user_id} # Delete user (admin only)
GET /users/me # Get current user profile (authenticated users)
Useful Service URLs
- FastAPI API: http://localhost:8000
- FastAPI Docs: http://localhost:8000/docs
- Health Check: http://localhost:8000/health
- Redis: localhost:6379
- MongoDB: localhost:27017
Testing Commands
# Test FastAPI health
curl http://localhost:8000/health
# Test Celery task dispatch
curl -X POST http://localhost:8000/test-task \
-H "Content-Type: application/json" \
-d '{"message": "Hello from test!"}'
# Monitor Celery tasks
docker-compose logs -f worker
Default Admin User
On first startup, the application automatically creates a default admin user:
- Username:
admin - Password:
admin - Role:
admin - Email:
admin@mydocmanager.local⚠️ Important: Change the default admin password immediately after first login in production environments.
File Processing Architecture
Document Processing Flow
- File Detection: Watchdog monitors
/volumes/watched_files/directory in real-time - Task Creation: File watcher creates Celery task for each detected file
- Document Processing: Celery worker processes the document and extracts content
- Database Storage: Processed data stored in MongoDB collections
MongoDB Collections Design
Files Collection
Stores file metadata and extracted content using Pydantic models:
class FileDocument(BaseModel):
"""
Model for file documents stored in the 'files' collection.
Represents a file detected in the watched directory with its
metadata and extracted content.
"""
id: Optional[PyObjectId] = Field(default=None, alias="_id")
filename: str = Field(..., description="Original filename")
filepath: str = Field(..., description="Full path to the file")
file_type: FileType = Field(..., description="Type of the file")
extraction_method: Optional[ExtractionMethod] = Field(default=None, description="Method used to extract content")
metadata: Dict[str, Any] = Field(default_factory=dict, description="File-specific metadata")
detected_at: Optional[datetime] = Field(default=None, description="Timestamp when file was detected")
file_hash: Optional[str] = Field(default=None, description="SHA256 hash of file content")
encoding: str = Field(default="utf-8", description="Character encoding for text files")
file_size: int = Field(..., ge=0, description="File size in bytes")
mime_type: str = Field(..., description="MIME type detected")
@field_validator('filepath')
@classmethod
def validate_filepath(cls, v: str) -> str:
"""Validate filepath format."""
if not v.strip():
raise ValueError("Filepath cannot be empty")
return v.strip()
@field_validator('filename')
@classmethod
def validate_filename(cls, v: str) -> str:
"""Validate filename format."""
if not v.strip():
raise ValueError("Filename cannot be empty")
return v.strip()
Processing Jobs Collection
Tracks processing status and lifecycle:
class ProcessingJob(BaseModel):
"""
Model for processing jobs stored in the 'processing_jobs' collection.
Tracks the lifecycle and status of document processing tasks.
"""
id: Optional[PyObjectId] = Field(default=None, alias="_id")
file_id: PyObjectId = Field(..., description="Reference to file document")
status: ProcessingStatus = Field(default=ProcessingStatus.PENDING, description="Current processing status")
task_id: Optional[str] = Field(default=None, description="Celery task UUID")
created_at: Optional[datetime] = Field(default=None, description="Timestamp when job was created")
started_at: Optional[datetime] = Field(default=None, description="Timestamp when processing started")
completed_at: Optional[datetime] = Field(default=None, description="Timestamp when processing completed")
error_message: Optional[str] = Field(default=None, description="Error message if processing failed")
@field_validator('error_message')
@classmethod
def validate_error_message(cls, v: Optional[str]) -> Optional[str]:
"""Clean up error message."""
if v is not None:
return v.strip() if v.strip() else None
return v
Supported File Types (Initial Implementation)
- Text Files (
.txt): Direct content reading - PDF Documents (
.pdf): Text extraction via PyMuPDF/pdfplumber - Word Documents (
.docx): Content extraction via python-docx
File Processing Architecture Decisions
Watchdog Implementation
- Choice: Dedicated observer thread
- Rationale: Standard approach, clean separation of concerns
- Implementation: Watchdog observer runs in separate thread from FastAPI
Task Dispatch Strategy
- Choice: Direct Celery task creation from file watcher
- Rationale: Minimal latency, straightforward flow
- Implementation: File detected → Immediate Celery task dispatch
Data Storage Strategy
- Choice: Separate collections for files and processing status
- Rationale: Clean separation of file data vs processing lifecycle
- Benefits:
- Better query performance
- Clear data model boundaries
- Easy processing status tracking
Content Storage Location
- Choice: Store files in the file system, using the SHA256 hash as filename
- Rationale: MongoDB is not meant for large files, better performance. Files remain in the file system for easy access.
Repository and Services Implementation
- Choice: Synchronous implementation using pymongo
- Rationale: Full compatibility with Celery workers and simplified workflow
- Implementation: All repositories and services operate synchronously for seamless integration
Implementation Status
- ✅ Pydantic models for MongoDB collections
- ✅ Repository layer for data access (files + processing_jobs + users + documents) - synchronous
- ✅ Service layer for business logic (auth, user, document, job) - synchronous
- ✅ Celery tasks for document processing
- ✅ Watchdog file monitoring implementation
- ✅ FastAPI integration and startup coordination
Job Management Layer
Repository Pattern Implementation
The job management system follows the repository pattern for clean separation between data access and business logic.
JobRepository
Handles direct MongoDB operations for processing jobs using synchronous pymongo:
CRUD Operations:
create_job()- Create new processing job with automaticcreated_attimestampget_job_by_id()- Retrieve job by ObjectIdupdate_job_status()- Update job status with automatic timestamp managementdelete_job()- Remove job from databaseget_jobs_by_file_id()- Get all jobs for specific fileget_jobs_by_status()- Get jobs filtered by processing status
Automatic Timestamp Management:
created_at: Set automatically during job creationstarted_at: Set automatically when status changes to PROCESSINGcompleted_at: Set automatically when status changes to COMPLETED or FAILED
JobService
Provides synchronous business logic layer with strict status transition validation:
Status Transition Methods:
mark_job_as_started()- PENDING → PROCESSINGmark_job_as_completed()- PROCESSING → COMPLETEDmark_job_as_failed()- PROCESSING → FAILED
Validation Rules:
- Strict status transitions (invalid transitions raise exceptions)
- Job existence verification before any operation
- Automatic timestamp management through repository layer
Custom Exceptions
InvalidStatusTransitionError: Raised for invalid status transitions
JobRepositoryError: Raised for MongoDB operation failures
Valid Status Transitions
PENDING → PROCESSING (via mark_job_as_started)
PROCESSING → COMPLETED (via mark_job_as_completed)
PROCESSING → FAILED (via mark_job_as_failed)
All other transitions are forbidden and will raise InvalidStatusTransitionError.
File Structure
src/file-processor/app/
├── database/repositories/
│ ├── job_repository.py # JobRepository class (synchronous)
│ ├── user_repository.py # UserRepository class (synchronous)
│ ├── document_repository.py # DocumentRepository class (synchronous)
│ └── file_repository.py # FileRepository class (synchronous)
├── services/
│ ├── job_service.py # JobService class (synchronous)
│ ├── auth_service.py # AuthService class (synchronous)
│ ├── user_service.py # UserService class (synchronous)
│ └── document_service.py # DocumentService class (synchronous)
└── exceptions/
└── job_exceptions.py # Custom exceptions
Processing Pipeline Features
- Duplicate Detection: SHA256 hashing prevents reprocessing same files
- Error Handling: Failed processing tracked with error messages
- Status Tracking: Real-time processing status via
processing_jobscollection - Extensible Metadata: Flexible metadata storage per file type
- Multiple Extraction Methods: Support for direct text, OCR, and hybrid approaches
- Synchronous Operations: All database operations use pymongo for Celery compatibility
Key Implementation Notes
Python Standards
- Style: PEP 8 compliance
- Documentation: Google/NumPy docstring format
- Naming: snake_case for variables and functions
- Testing: pytest with test_i_can_xxx / test_i_cannot_xxx patterns
Security Best Practices
- Password Storage: Never store plain text passwords, always use bcrypt hashing
- JWT Secrets: Use strong, randomly generated secret keys in production
- Token Expiration: 24-hour expiration with secure signature validation
- Role Validation: Server-side role checking for all protected endpoints
Dependencies Management
- Package Manager: pip (standard)
- External Dependencies: Listed in each service's requirements.txt
- Standard Library First: Prefer standard library when possible
- Database Driver: pymongo for synchronous MongoDB operations
Testing Strategy
- All code must be testable
- Unit tests for each authentication and user management function
- Integration tests for complete authentication flow
- Tests validated before implementation
Critical Architecture Decisions Made
- JWT Authentication: Simple token-based auth with 24-hour expiration
- Role-Based Access: Admin/User roles for granular permissions
- bcrypt Password Hashing: Industry-standard password security
- MongoDB User Storage: Centralized user management in main database
- Auto Admin Creation: Automatic setup for first-time deployment
- Single FastAPI Service: Handles both API and file watching with authentication
- Celery with Redis: Chosen over other async patterns for scalability
- EasyOCR Preferred: Selected over Tesseract for modern OCR needs
- Container Development: Hot-reload setup required for development workflow
- Dedicated Watchdog Observer: Thread-based file monitoring for reliability
- Separate MongoDB Collections: Files and processing jobs stored separately
- Content in Files Collection: Extracted content stored with file metadata
- Direct Task Dispatch: File watcher directly creates Celery tasks
- SHA256 Duplicate Detection: Prevents reprocessing identical files
- Synchronous Implementation: All repositories and services use pymongo for Celery compatibility
Development Process Requirements
- Collaborative Validation: All options must be explained before coding
- Test-First Approach: Test cases defined and validated before implementation
- Incremental Development: Start simple, extend functionality progressively
- Error Handling: Clear problem explanation required before proposing fixes
Next Implementation Steps
- TODO: Complete file processing pipeline =>
- ✅ Create Pydantic models for files and processing_jobs collections
- ✅ Implement repository layer for file and processing job data access (synchronous)
- ✅ Implement service layer for business logic (synchronous)
- ✅ Create Celery tasks for document processing (.txt, .pdf, .docx)
- ✅ Implement Watchdog file monitoring with dedicated observer
- ✅ Integrate file watcher with FastAPI startup
- Create protected API routes for user management
- Build React monitoring interface with authentication
Annexes
Docker Commands Reference
Initial Setup & Build
# Build and start all services (first time)
docker-compose up --build
# Build and start in background
docker-compose up --build -d
# Build specific service
docker-compose build file-processor
docker-compose build worker
Development Workflow
# Start all services
docker-compose up
# Start in background (detached mode)
docker-compose up -d
# Stop all services
docker-compose down
# Stop and remove volumes (⚠️ deletes MongoDB data)
docker-compose down -v
# Restart specific service
docker-compose restart file-processor
docker-compose restart worker
docker-compose restart redis
docker-compose restart mongodb
Monitoring & Debugging
# View logs of all services
docker-compose logs
# View logs of specific service
docker-compose logs file-processor
docker-compose logs worker
docker-compose logs redis
docker-compose logs mongodb
# Follow logs in real-time
docker-compose logs -f
docker-compose logs -f worker
# View running containers
docker-compose ps
# Execute command in running container
docker-compose exec file-processor bash
docker-compose exec worker bash
docker-compose exec mongodb mongosh
Service Management
# Start only specific services
docker-compose up redis mongodb file-processor
# Stop specific service
docker-compose stop worker
docker-compose stop file-processor
# Remove stopped containers
docker-compose rm
# Scale workers (multiple instances)
docker-compose up --scale worker=3
Hot-Reload Configuration
- file-processor: Hot-reload enabled via
--reloadflag- Code changes in
src/file-processor/app/automatically restart FastAPI
- Code changes in
- worker: No hot-reload (manual restart required for stability)
- Code changes in
src/worker/tasks/require:docker-compose restart worker
- Code changes in