From 62c7e46a88dae006590d3cd30d3cb488da275fc7 Mon Sep 17 00:00:00 2001 From: Kodjo Sossouvi Date: Tue, 30 Sep 2025 22:58:51 +0200 Subject: [PATCH] Working on pdf creation --- docker-compose.yml | 1 + src/file-processor/Dockerfile | 5 ++ src/file-processor/app/config/settings.py | 2 +- src/file-processor/requirements.txt | 1 + src/worker/Dockerfile | 5 ++ src/worker/requirements.txt | 1 + src/worker/tasks/common/document_utils.py | 64 +++++++++++++++++++++++ src/worker/tasks/common/pdf_converter.py | 61 ++++++++++++++++++++- src/worker/tasks/document_processing.py | 5 +- src/worker/tasks/main.py | 18 +++++-- 10 files changed, 156 insertions(+), 7 deletions(-) create mode 100644 src/worker/tasks/common/document_utils.py diff --git a/docker-compose.yml b/docker-compose.yml index 109df33..a702201 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -61,6 +61,7 @@ services: - ./src/worker:/app - ./src/file-processor/app:/app/app # <- Added: shared access file-processor app - ./volumes/watched_files:/watched_files + - ./volumes/objects:/objects depends_on: - redis - mongodb diff --git a/src/file-processor/Dockerfile b/src/file-processor/Dockerfile index a0b7b63..8603615 100644 --- a/src/file-processor/Dockerfile +++ b/src/file-processor/Dockerfile @@ -12,10 +12,14 @@ RUN apt-get update && apt-get install -y --no-install-recommends \ texlive-xetex \ && rm -rf /var/lib/apt/lists/* + # Copy requirements and install dependencies COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt +# Change the user +USER 1002:1002 + # Copy application code COPY . . @@ -24,5 +28,6 @@ ENV PYTHONPATH=/app # Expose port EXPOSE 8000 + # Command will be overridden by docker-compose CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"] \ No newline at end of file diff --git a/src/file-processor/app/config/settings.py b/src/file-processor/app/config/settings.py index 8a737f1..94cf48a 100644 --- a/src/file-processor/app/config/settings.py +++ b/src/file-processor/app/config/settings.py @@ -105,4 +105,4 @@ def get_watch_folder() -> str: def get_temp_folder() -> str: """Directory to store temporary files""" - return os.getenv("TEMP_DIRECTORY", "/temp") + return os.getenv("TEMP_DIRECTORY", "/tmp") diff --git a/src/file-processor/requirements.txt b/src/file-processor/requirements.txt index 6ad69ce..8a69627 100644 --- a/src/file-processor/requirements.txt +++ b/src/file-processor/requirements.txt @@ -5,6 +5,7 @@ email-validator==2.3.0 fastapi==0.116.1 httptools==0.6.4 motor==3.7.1 +pikepdf==9.11.0 pillow==11.3.0 pydantic==2.11.9 PyJWT==2.10.1 diff --git a/src/worker/Dockerfile b/src/worker/Dockerfile index 8e45379..9611810 100644 --- a/src/worker/Dockerfile +++ b/src/worker/Dockerfile @@ -12,12 +12,17 @@ RUN apt-get update && apt-get install -y --no-install-recommends \ texlive-xetex \ && rm -rf /var/lib/apt/lists/* + # Copy requirements and install dependencies COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt +# Change the user +USER 1002:1002 + # Copy application code COPY . . + # Command will be overridden by docker-compose CMD ["celery", "-A", "main", "worker", "--loglevel=info"] diff --git a/src/worker/requirements.txt b/src/worker/requirements.txt index 6ad69ce..8a69627 100644 --- a/src/worker/requirements.txt +++ b/src/worker/requirements.txt @@ -5,6 +5,7 @@ email-validator==2.3.0 fastapi==0.116.1 httptools==0.6.4 motor==3.7.1 +pikepdf==9.11.0 pillow==11.3.0 pydantic==2.11.9 PyJWT==2.10.1 diff --git a/src/worker/tasks/common/document_utils.py b/src/worker/tasks/common/document_utils.py new file mode 100644 index 0000000..0188501 --- /dev/null +++ b/src/worker/tasks/common/document_utils.py @@ -0,0 +1,64 @@ +import hashlib +import logging +import os +from pathlib import Path + +from app.config import settings + +logger = logging.getLogger(__name__) + + +def get_file_hash(file_bytes: bytes) -> str: + """ + Calculate SHA256 hash of file content. + + Args: + file_bytes: Raw file content as bytes + + Returns: + Hexadecimal SHA256 hash string + """ + return hashlib.sha256(file_bytes).hexdigest() + + +def get_object_path(file_hash): + """ + + :param file_hash: + :return: + """ + root = settings.get_objects_folder() + return os.path.join(root, file_hash[:24], file_hash) + + +def save_as_object(file_path, remove_on_success=True) -> str: + """ + Read the file, get the hash and save using the hash as the filename. + :param file_path: + :param remove_on_success: + :return: hash of the file + """ + logger.info(f"Saving file {file_path} as object") + path = Path(file_path) + as_bytes = path.read_bytes() + + file_hash = get_file_hash(as_bytes) + logger.info(f"File hash: {file_hash}") + + object_path = get_object_path(file_hash) + if os.path.exists(object_path): + logger.info(f"Object already exists: {object_path}") + return file_hash + + if not os.path.exists(os.path.dirname(object_path)): + os.makedirs(os.path.dirname(object_path)) + + logger.info(f"Saving object to: {object_path}") + with open(object_path, "wb") as f: + f.write(as_bytes) + + if remove_on_success: + logger.info(f"Removing file: {file_path}") + path.unlink() + + return file_hash diff --git a/src/worker/tasks/common/pdf_converter.py b/src/worker/tasks/common/pdf_converter.py index 92de243..aa3b667 100644 --- a/src/worker/tasks/common/pdf_converter.py +++ b/src/worker/tasks/common/pdf_converter.py @@ -1,12 +1,15 @@ +import datetime +import os from abc import ABC, abstractmethod from pathlib import Path +import pikepdf import pypandoc from PIL import Image from reportlab.lib.pagesizes import A4 from reportlab.pdfgen import canvas -from tasks.common.converter_utils import generate_uuid_filename +from tasks.common.converter_utils import generate_uuid_filename, detect_file_type class BaseConverter(ABC): @@ -21,6 +24,26 @@ class BaseConverter(ABC): def convert(self) -> str: """Convert input file to PDF and return the output path.""" pass + + def get_file_creation_date(self): + # Get file creation time (or modification time) + ts = os.path.getctime(self.input_path) # getmtime(self.input_path) for last modification + dt = datetime.datetime.fromtimestamp(ts) + + # PDF expects format D:YYYYMMDDHHmmss + creation_date = dt.strftime("D:%Y%m%d%H%M%S") + return creation_date + + def clean_pdf(self): + with pikepdf.open(self.output_path) as pdf: + pdf.Root.Metadata = None + + pdf.docinfo.clear() + pdf.docinfo["/Producer"] = "MyConverter" + pdf.docinfo["/CreationDate"] = self.get_file_creation_date() + pdf.docinfo["/Title"] = os.path.basename(self.input_path) + + pdf.save(self.output_path, fix_metadata_version=True, static_id=True) class TextToPdfConverter(BaseConverter): @@ -28,6 +51,13 @@ class TextToPdfConverter(BaseConverter): def convert(self) -> str: c = canvas.Canvas(str(self.output_path), pagesize=A4) + + # Fix metadata with deterministic values + info = c._doc.info + info.producer = "MyConverter" + info.creationDate = self.get_file_creation_date() + info.title = os.path.basename(self.input_path) + width, height = A4 with open(self.input_path, "r", encoding="utf-8") as f: y = height - 50 @@ -37,6 +67,7 @@ class TextToPdfConverter(BaseConverter): if y < 50: c.showPage() y = height - 50 + c.save() return str(self.output_path) @@ -81,3 +112,31 @@ class MarkdownToPdfConverter(BaseConverter): def convert(self) -> str: raise NotImplementedError("Markdown to PDF conversion not implemented.") + + +def convert_to_pdf(filepath: str, output_dir: str = ".") -> str: + """ + Convert any supported file to PDF. + + Args: + filepath (str): Path to the input file. + output_dir (str): Directory to save the output PDF. + + Returns: + str: Path to the generated PDF. + + Raises: + UnsupportedFileTypeError: If the input file type is not supported. + """ + file_type = detect_file_type(filepath) + + if file_type == "text": + converter = TextToPdfConverter(filepath, output_dir=output_dir) + elif file_type == "image": + converter = ImageToPdfConverter(filepath, output_dir=output_dir) + elif file_type == "word": + converter = WordToPdfConverter(filepath, output_dir=output_dir) + else: + raise ValueError(f"Unsupported file type: {file_type}") + + return converter.convert() diff --git a/src/worker/tasks/document_processing.py b/src/worker/tasks/document_processing.py index 227024e..5068a91 100644 --- a/src/worker/tasks/document_processing.py +++ b/src/worker/tasks/document_processing.py @@ -12,7 +12,8 @@ from app.config import settings from app.database.connection import get_database from app.services.document_service import DocumentService from app.services.job_service import JobService -from tasks.common.converter_utils import convert_to_pdf +from tasks.common.document_utils import save_as_object +from tasks.common.pdf_converter import convert_to_pdf from tasks.main import celery_app logger = logging.getLogger(__name__) @@ -66,6 +67,8 @@ def process_document(self, filepath: str) -> Dict[str, Any]: # Step 4: Create the pdf version of the document pdf_file_path = convert_to_pdf(filepath, settings.get_temp_folder()) + digest = save_as_object(pdf_file_path) + logger.info(f"Job {task_id} internal PDF file created: {digest}") # Step x: Mark job as completed job_service.mark_job_as_completed(job_id=job.id) diff --git a/src/worker/tasks/main.py b/src/worker/tasks/main.py index a32e82a..ada5228 100644 --- a/src/worker/tasks/main.py +++ b/src/worker/tasks/main.py @@ -7,10 +7,10 @@ import logging import os from celery import Celery +from celery.signals import worker_process_init from app.config import settings - # Environment variables REDIS_URL = settings.get_redis_url() MONGODB_URL = settings.get_mongodb_url() @@ -38,11 +38,21 @@ celery_app.conf.update( task_soft_time_limit=240, # 4 minutes ) -if __name__ == "__main__": - # initialize temp folder if needed + +def global_init(**kwargs): + """Initialize global variables.""" + logger.info(f"{'*' * 20}") + logger.info(f"{'--' * 5}" + " Starting MyDocManager worker " + f"{'--' * 5}") + logger.info(f"{'*' * 20}") tmp_folder = settings.get_temp_folder() if not os.path.exists(tmp_folder): logger.info(f"Creating temporary folder: {tmp_folder}") os.makedirs(tmp_folder) - + else: + logger.info(f"Temporary folder already exists: {os.path.abspath(tmp_folder)}") + +global_init() + +if __name__ == "__main__": + global_init() celery_app.start()