Fisrt commit. Docker compose is working

This commit is contained in:
2025-09-15 23:21:09 +02:00
commit 10650420ef
17 changed files with 858 additions and 0 deletions

0
src/__init__.py Normal file
View File

View File

@@ -0,0 +1,17 @@
FROM python:3.12-slim
# Set working directory
WORKDIR /app
# Copy requirements and install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application code
COPY app/ .
# Expose port
EXPOSE 8000
# Command will be overridden by docker-compose
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]

View File

View File

View File

@@ -0,0 +1,120 @@
"""
FastAPI application for MyDocManager file processor service.
This service provides API endpoints for health checks and task dispatching.
"""
import os
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import redis
from celery import Celery
# Initialize FastAPI app
app = FastAPI(
title="MyDocManager File Processor",
description="File processing and task dispatch service",
version="1.0.0"
)
# Environment variables
REDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379/0")
MONGODB_URL = os.getenv("MONGODB_URL", "mongodb://localhost:27017")
# Initialize Redis client
try:
redis_client = redis.from_url(REDIS_URL)
except Exception as e:
redis_client = None
print(f"Warning: Could not connect to Redis: {e}")
# Initialize Celery
celery_app = Celery(
"file_processor",
broker=REDIS_URL,
backend=REDIS_URL
)
# Pydantic models
class TestTaskRequest(BaseModel):
"""Request model for test task."""
message: str
@app.get("/health")
async def health_check():
"""
Health check endpoint.
Returns:
dict: Service health status with dependencies
"""
health_status = {
"status": "healthy",
"service": "file-processor",
"dependencies": {
"redis": "unknown",
"mongodb": "unknown"
},
}
# Check Redis connection
if redis_client:
try:
redis_client.ping()
health_status["dependencies"]["redis"] = "connected"
except Exception:
health_status["dependencies"]["redis"] = "disconnected"
health_status["status"] = "degraded"
return health_status
@app.post("/test-task")
async def dispatch_test_task(request: TestTaskRequest):
"""
Dispatch a test task to Celery worker.
Args:
request: Test task request containing message
Returns:
dict: Task dispatch information
Raises:
HTTPException: If task dispatch fails
"""
try:
# Send task to worker
task = celery_app.send_task(
"main.test_task",
args=[request.message]
)
return {
"status": "dispatched",
"task_id": task.id,
"message": f"Test task dispatched with message: {request.message}"
}
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"Failed to dispatch task: {str(e)}"
)
@app.get("/")
async def root():
"""
Root endpoint.
Returns:
dict: Basic service information
"""
return {
"service": "MyDocManager File Processor",
"version": "1.0.0",
"status": "running"
}

View File

@@ -0,0 +1,6 @@
fastapi==0.116.1
uvicorn==0.35.0
celery==5.5.3
redis==6.4.0
pymongo==4.15.0
pydantic==2.11.9

14
src/worker/Dockerfile Normal file
View File

@@ -0,0 +1,14 @@
FROM python:3.12-slim
# Set working directory
WORKDIR /app
# Copy requirements and install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application code
COPY tasks/ .
# Command will be overridden by docker-compose
CMD ["celery", "-A", "main", "worker", "--loglevel=info"]

0
src/worker/__init__.py Normal file
View File

View File

@@ -0,0 +1,4 @@
celery==5.5.3
redis==6.4.0
pymongo==4.15.0

View File

113
src/worker/tasks/main.py Normal file
View File

@@ -0,0 +1,113 @@
"""
Celery worker for MyDocManager document processing tasks.
This module contains all Celery tasks for processing documents.
"""
import os
import time
from celery import Celery
# Environment variables
REDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379/0")
MONGODB_URL = os.getenv("MONGODB_URL", "mongodb://localhost:27017")
# Initialize Celery app
app = Celery(
"mydocmanager_worker",
broker=REDIS_URL,
backend=REDIS_URL
)
# Celery configuration
app.conf.update(
task_serializer="json",
accept_content=["json"],
result_serializer="json",
timezone="UTC",
enable_utc=True,
task_track_started=True,
task_time_limit=300, # 5 minutes
task_soft_time_limit=240, # 4 minutes
)
@app.task(bind=True)
def test_task(self, message: str):
"""
Test task for validating worker functionality.
Args:
message: Test message to process
Returns:
dict: Task result with processing information
"""
try:
print(f"[WORKER] Starting test task with message: {message}")
# Simulate some work
for i in range(5):
print(f"[WORKER] Processing step {i + 1}/5...")
time.sleep(1)
# Update task progress
self.update_state(
state="PROGRESS",
meta={
"current": i + 1,
"total": 5,
"message": f"Processing step {i + 1}"
}
)
result = {
"status": "completed",
"message": f"Successfully processed: {message}",
"processed_at": time.time(),
"worker_id": self.request.id
}
print(f"[WORKER] Test task completed successfully: {result}")
return result
except Exception as exc:
print(f"[WORKER] Test task failed: {str(exc)}")
raise self.retry(exc=exc, countdown=60, max_retries=3)
@app.task(bind=True)
def process_document_task(self, file_path: str):
"""
Placeholder task for document processing.
Args:
file_path: Path to the document to process
Returns:
dict: Processing result
"""
try:
print(f"[WORKER] Starting document processing for: {file_path}")
# Placeholder for document processing logic
time.sleep(2) # Simulate processing time
result = {
"status": "completed",
"file_path": file_path,
"processed_at": time.time(),
"content": f"Placeholder content for {file_path}",
"worker_id": self.request.id
}
print(f"[WORKER] Document processing completed: {file_path}")
return result
except Exception as exc:
print(f"[WORKER] Document processing failed for {file_path}: {str(exc)}")
raise self.retry(exc=exc, countdown=60, max_retries=3)
if __name__ == "__main__":
app.start()