diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..80fb52b --- /dev/null +++ b/Makefile @@ -0,0 +1,32 @@ +.PHONY: init up down restart logs clean + +init: + @echo "Creating directories and setting permissions..." + @mkdir -p ./volumes/watched_files ./volumes/objects + @chown -R 1002:1002 ./volumes/watched_files ./volumes/objects + @echo "✓ Directories initialized" + +up: init + @echo "Starting services..." + @docker-compose up -d + @echo "✓ Services started" + +down: + @docker-compose down + +restart: + @docker-compose restart + +logs: + @docker-compose logs -f + +clean: down + @echo "Cleaning volumes..." + @sudo rm -rf ./volumes + @echo "✓ Volumes cleaned" + +rebuild: clean init + @echo "Rebuilding images..." + @docker-compose build --no-cache + @docker-compose up -d + @echo "✓ Services rebuilt and started" \ No newline at end of file diff --git a/src/file-processor/app/models/job.py b/src/file-processor/app/models/job.py index d71109e..3e1be5e 100644 --- a/src/file-processor/app/models/job.py +++ b/src/file-processor/app/models/job.py @@ -14,6 +14,8 @@ class ProcessingStatus(str, Enum): PENDING = "pending" PROCESSING = "processing" COMPLETED = "completed" + SAVING_OBJECT = "saving_object" + SAVING_PDF = "saving_pdf" FAILED = "failed" diff --git a/src/worker/tasks/common/converter_utils.py b/src/worker/tasks/common/converter_utils.py index 5696575..61f65b2 100644 --- a/src/worker/tasks/common/converter_utils.py +++ b/src/worker/tasks/common/converter_utils.py @@ -1,5 +1,4 @@ import subprocess -import uuid from pathlib import Path import magic # python-magic @@ -10,11 +9,6 @@ class UnsupportedFileTypeError(Exception): pass -def generate_uuid_filename() -> str: - """Generate a unique filename using UUID4.""" - return str(uuid.uuid4()) - - def detect_file_type(file_path: str) -> str: """ Detect the type of file using python-magic. @@ -70,5 +64,3 @@ def compress_pdf(input_pdf: str, output_pdf: str, quality: str = "ebook") -> Non result = subprocess.run(cmd) if result.returncode != 0: raise RuntimeError(f"Ghostscript failed with return code {result.returncode}") - - diff --git a/src/worker/tasks/common/pdf_converter.py b/src/worker/tasks/common/pdf_converter.py index aa3b667..2012f56 100644 --- a/src/worker/tasks/common/pdf_converter.py +++ b/src/worker/tasks/common/pdf_converter.py @@ -1,7 +1,10 @@ import datetime +import hashlib import os -from abc import ABC, abstractmethod +import uuid +from abc import ABC from pathlib import Path +from typing import Self import pikepdf import pypandoc @@ -9,7 +12,7 @@ from PIL import Image from reportlab.lib.pagesizes import A4 from reportlab.pdfgen import canvas -from tasks.common.converter_utils import generate_uuid_filename, detect_file_type +from tasks.common.converter_utils import detect_file_type class BaseConverter(ABC): @@ -18,13 +21,44 @@ class BaseConverter(ABC): def __init__(self, input_path: str, output_dir: str = ".") -> None: self.input_path = Path(input_path) self.output_dir = Path(output_dir) - self.output_path = self.output_dir / f"{generate_uuid_filename()}.pdf" + self.output_path = self.output_dir / f"{self.generate_uuid_filename()}.pdf" - @abstractmethod - def convert(self) -> str: + def convert(self) -> Self: """Convert input file to PDF and return the output path.""" pass + @staticmethod + def generate_uuid_filename() -> str: + """Generate a unique filename using UUID4.""" + return str(uuid.uuid4()) + + def get_deterministic_date(self) -> str: + """ + Generate a deterministic date based on file content. + This ensures the same file always produces the same PDF. + """ + # Option 1: Use a fixed date + # return "D:20000101000000" + + # Option 2: Generate date from content hash (recommended) + with open(self.input_path, 'rb') as f: + content = f.read() + content_hash = hashlib.sha256(content).hexdigest() + + # Use first 14 characters of hash to create a valid date + # Format: D:YYYYMMDDHHmmss + hash_int = int(content_hash[:14], 16) + + # Create a date between 2000-2099 to keep it reasonable + year = 2000 + (hash_int % 100) + month = 1 + (hash_int % 12) + day = 1 + (hash_int % 28) # Stay safe with 28 days + hour = hash_int % 24 + minute = hash_int % 60 + second = hash_int % 60 + + return f"D:{year:04d}{month:02d}{day:02d}{hour:02d}{minute:02d}{second:02d}" + def get_file_creation_date(self): # Get file creation time (or modification time) ts = os.path.getctime(self.input_path) # getmtime(self.input_path) for last modification @@ -34,22 +68,43 @@ class BaseConverter(ABC): creation_date = dt.strftime("D:%Y%m%d%H%M%S") return creation_date - def clean_pdf(self): - with pikepdf.open(self.output_path) as pdf: - pdf.Root.Metadata = None + def clean_pdf(self) -> Self: + """Remove all non-deterministic metadata from PDF.""" + with pikepdf.open(self.output_path, allow_overwriting_input=True) as pdf: + # Remove XMP metadata if it exists + if hasattr(pdf.Root, 'Metadata'): + del pdf.Root.Metadata + + # Clear all document info by deleting each key + for key in list(pdf.docinfo.keys()): + del pdf.docinfo[key] - pdf.docinfo.clear() + # Set deterministic metadata pdf.docinfo["/Producer"] = "MyConverter" - pdf.docinfo["/CreationDate"] = self.get_file_creation_date() - pdf.docinfo["/Title"] = os.path.basename(self.input_path) + pdf.docinfo["/Creator"] = "MyConverter" + pdf.docinfo["/CreationDate"] = self.get_deterministic_date() + pdf.docinfo["/ModDate"] = self.get_deterministic_date() + pdf.docinfo["/Title"] = self.input_path.name - pdf.save(self.output_path, fix_metadata_version=True, static_id=True) + # Save with deterministic IDs + # compress=True ensures consistent compression + # deterministic_id=True (if available) or static_id=True + pdf.save( + self.output_path, + fix_metadata_version=True, + compress_streams=True, + stream_decode_level=pikepdf.StreamDecodeLevel.generalized, + object_stream_mode=pikepdf.ObjectStreamMode.disable, + deterministic_id=True # Use this if pikepdf >= 8.0.0, otherwise use static_id=True + ) + + return self class TextToPdfConverter(BaseConverter): """Converter for text files to PDF.""" - def convert(self) -> str: + def convert(self) -> Self: c = canvas.Canvas(str(self.output_path), pagesize=A4) # Fix metadata with deterministic values @@ -69,48 +124,48 @@ class TextToPdfConverter(BaseConverter): y = height - 50 c.save() - return str(self.output_path) + return self class ImageToPdfConverter(BaseConverter): """Converter for image files to PDF.""" - def convert(self) -> str: + def convert(self) -> Self: image = Image.open(self.input_path) rgb_image = image.convert("RGB") rgb_image.save(self.output_path) - return str(self.output_path) + return self class WordToPdfConverter(BaseConverter): """Converter for Word files (.docx) to PDF using pypandoc.""" - def convert(self) -> str: + def convert(self) -> Self: pypandoc.convert_file( str(self.input_path), "pdf", outputfile=str(self.output_path) ) - return str(self.output_path) + return self # Placeholders for future extensions class HtmlToPdfConverter(BaseConverter): """Placeholder for HTML to PDF converter.""" - def convert(self) -> str: + def convert(self) -> Self: raise NotImplementedError("HTML to PDF conversion not implemented.") class ExcelToPdfConverter(BaseConverter): """Placeholder for Excel to PDF converter.""" - def convert(self) -> str: + def convert(self) -> Self: raise NotImplementedError("Excel to PDF conversion not implemented.") class MarkdownToPdfConverter(BaseConverter): """Placeholder for Markdown to PDF converter.""" - def convert(self) -> str: + def convert(self) -> Self: raise NotImplementedError("Markdown to PDF conversion not implemented.") @@ -139,4 +194,6 @@ def convert_to_pdf(filepath: str, output_dir: str = ".") -> str: else: raise ValueError(f"Unsupported file type: {file_type}") - return converter.convert() + converter.convert() + converter.clean_pdf() + return str(converter.output_path) diff --git a/src/worker/tasks/document_processing.py b/src/worker/tasks/document_processing.py index 5068a91..6524901 100644 --- a/src/worker/tasks/document_processing.py +++ b/src/worker/tasks/document_processing.py @@ -46,7 +46,7 @@ def process_document(self, filepath: str) -> Dict[str, Any]: Raises: Exception: Any processing error (will trigger retry) """ - task_id = self.request.id + task_id = self.request.id logger.info(f"Starting document processing task {task_id} for file: {filepath}") # get services @@ -54,16 +54,24 @@ def process_document(self, filepath: str) -> Dict[str, Any]: job = None try: - # Step 1: Insert the document in DB + # Step 1: Create the document and a new job record for the document document = document_service.create_document(filepath) - logger.info(f"Job {task_id} created for document {document.id} with file path: {filepath}") - - # Step 2: Create a new job record for the document job = job_service.create_job(task_id=task_id, document_id=document.id) + job_service.mark_job_as_started(job_id=job.id) + logger.info(f"Task {task_id} created for document {document.id} with file path: {filepath} and job id: {job.id}") + + logger.info(f"Job {task_id} marked as PROCESSING") + + raw_file_hash = save_as_object(filepath) + logger.info(f"Job {task_id} saved document as object: {raw_file_hash}") + + # Step 4: Create the pdf version of the document + pdf_file_hash = convert_to_pdf(filepath, raw_file_hash) + logger.info(f"Job {task_id} saved PDF with hash: {pdf_file_hash}") + + # Step 3: Mark job as started - job_service.mark_job_as_started(job_id=job.id) - logger.info(f"Job {task_id} marked as PROCESSING") # Step 4: Create the pdf version of the document pdf_file_path = convert_to_pdf(filepath, settings.get_temp_folder())