Working on pdf creation
This commit is contained in:
@@ -61,6 +61,7 @@ services:
|
||||
- ./src/worker:/app
|
||||
- ./src/file-processor/app:/app/app # <- Added: shared access file-processor app
|
||||
- ./volumes/watched_files:/watched_files
|
||||
- ./volumes/objects:/objects
|
||||
depends_on:
|
||||
- redis
|
||||
- mongodb
|
||||
|
||||
@@ -12,10 +12,14 @@ RUN apt-get update && apt-get install -y --no-install-recommends \
|
||||
texlive-xetex \
|
||||
&& rm -rf /var/lib/apt/lists/*
|
||||
|
||||
|
||||
# Copy requirements and install dependencies
|
||||
COPY requirements.txt .
|
||||
RUN pip install --no-cache-dir -r requirements.txt
|
||||
|
||||
# Change the user
|
||||
USER 1002:1002
|
||||
|
||||
# Copy application code
|
||||
COPY . .
|
||||
|
||||
@@ -24,5 +28,6 @@ ENV PYTHONPATH=/app
|
||||
# Expose port
|
||||
EXPOSE 8000
|
||||
|
||||
|
||||
# Command will be overridden by docker-compose
|
||||
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]
|
||||
@@ -105,4 +105,4 @@ def get_watch_folder() -> str:
|
||||
|
||||
def get_temp_folder() -> str:
|
||||
"""Directory to store temporary files"""
|
||||
return os.getenv("TEMP_DIRECTORY", "/temp")
|
||||
return os.getenv("TEMP_DIRECTORY", "/tmp")
|
||||
|
||||
@@ -5,6 +5,7 @@ email-validator==2.3.0
|
||||
fastapi==0.116.1
|
||||
httptools==0.6.4
|
||||
motor==3.7.1
|
||||
pikepdf==9.11.0
|
||||
pillow==11.3.0
|
||||
pydantic==2.11.9
|
||||
PyJWT==2.10.1
|
||||
|
||||
@@ -12,12 +12,17 @@ RUN apt-get update && apt-get install -y --no-install-recommends \
|
||||
texlive-xetex \
|
||||
&& rm -rf /var/lib/apt/lists/*
|
||||
|
||||
|
||||
# Copy requirements and install dependencies
|
||||
COPY requirements.txt .
|
||||
RUN pip install --no-cache-dir -r requirements.txt
|
||||
|
||||
# Change the user
|
||||
USER 1002:1002
|
||||
|
||||
# Copy application code
|
||||
COPY . .
|
||||
|
||||
|
||||
# Command will be overridden by docker-compose
|
||||
CMD ["celery", "-A", "main", "worker", "--loglevel=info"]
|
||||
|
||||
@@ -5,6 +5,7 @@ email-validator==2.3.0
|
||||
fastapi==0.116.1
|
||||
httptools==0.6.4
|
||||
motor==3.7.1
|
||||
pikepdf==9.11.0
|
||||
pillow==11.3.0
|
||||
pydantic==2.11.9
|
||||
PyJWT==2.10.1
|
||||
|
||||
64
src/worker/tasks/common/document_utils.py
Normal file
64
src/worker/tasks/common/document_utils.py
Normal file
@@ -0,0 +1,64 @@
|
||||
import hashlib
|
||||
import logging
|
||||
import os
|
||||
from pathlib import Path
|
||||
|
||||
from app.config import settings
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def get_file_hash(file_bytes: bytes) -> str:
|
||||
"""
|
||||
Calculate SHA256 hash of file content.
|
||||
|
||||
Args:
|
||||
file_bytes: Raw file content as bytes
|
||||
|
||||
Returns:
|
||||
Hexadecimal SHA256 hash string
|
||||
"""
|
||||
return hashlib.sha256(file_bytes).hexdigest()
|
||||
|
||||
|
||||
def get_object_path(file_hash):
|
||||
"""
|
||||
|
||||
:param file_hash:
|
||||
:return:
|
||||
"""
|
||||
root = settings.get_objects_folder()
|
||||
return os.path.join(root, file_hash[:24], file_hash)
|
||||
|
||||
|
||||
def save_as_object(file_path, remove_on_success=True) -> str:
|
||||
"""
|
||||
Read the file, get the hash and save using the hash as the filename.
|
||||
:param file_path:
|
||||
:param remove_on_success:
|
||||
:return: hash of the file
|
||||
"""
|
||||
logger.info(f"Saving file {file_path} as object")
|
||||
path = Path(file_path)
|
||||
as_bytes = path.read_bytes()
|
||||
|
||||
file_hash = get_file_hash(as_bytes)
|
||||
logger.info(f"File hash: {file_hash}")
|
||||
|
||||
object_path = get_object_path(file_hash)
|
||||
if os.path.exists(object_path):
|
||||
logger.info(f"Object already exists: {object_path}")
|
||||
return file_hash
|
||||
|
||||
if not os.path.exists(os.path.dirname(object_path)):
|
||||
os.makedirs(os.path.dirname(object_path))
|
||||
|
||||
logger.info(f"Saving object to: {object_path}")
|
||||
with open(object_path, "wb") as f:
|
||||
f.write(as_bytes)
|
||||
|
||||
if remove_on_success:
|
||||
logger.info(f"Removing file: {file_path}")
|
||||
path.unlink()
|
||||
|
||||
return file_hash
|
||||
@@ -1,12 +1,15 @@
|
||||
import datetime
|
||||
import os
|
||||
from abc import ABC, abstractmethod
|
||||
from pathlib import Path
|
||||
|
||||
import pikepdf
|
||||
import pypandoc
|
||||
from PIL import Image
|
||||
from reportlab.lib.pagesizes import A4
|
||||
from reportlab.pdfgen import canvas
|
||||
|
||||
from tasks.common.converter_utils import generate_uuid_filename
|
||||
from tasks.common.converter_utils import generate_uuid_filename, detect_file_type
|
||||
|
||||
|
||||
class BaseConverter(ABC):
|
||||
@@ -21,6 +24,26 @@ class BaseConverter(ABC):
|
||||
def convert(self) -> str:
|
||||
"""Convert input file to PDF and return the output path."""
|
||||
pass
|
||||
|
||||
def get_file_creation_date(self):
|
||||
# Get file creation time (or modification time)
|
||||
ts = os.path.getctime(self.input_path) # getmtime(self.input_path) for last modification
|
||||
dt = datetime.datetime.fromtimestamp(ts)
|
||||
|
||||
# PDF expects format D:YYYYMMDDHHmmss
|
||||
creation_date = dt.strftime("D:%Y%m%d%H%M%S")
|
||||
return creation_date
|
||||
|
||||
def clean_pdf(self):
|
||||
with pikepdf.open(self.output_path) as pdf:
|
||||
pdf.Root.Metadata = None
|
||||
|
||||
pdf.docinfo.clear()
|
||||
pdf.docinfo["/Producer"] = "MyConverter"
|
||||
pdf.docinfo["/CreationDate"] = self.get_file_creation_date()
|
||||
pdf.docinfo["/Title"] = os.path.basename(self.input_path)
|
||||
|
||||
pdf.save(self.output_path, fix_metadata_version=True, static_id=True)
|
||||
|
||||
|
||||
class TextToPdfConverter(BaseConverter):
|
||||
@@ -28,6 +51,13 @@ class TextToPdfConverter(BaseConverter):
|
||||
|
||||
def convert(self) -> str:
|
||||
c = canvas.Canvas(str(self.output_path), pagesize=A4)
|
||||
|
||||
# Fix metadata with deterministic values
|
||||
info = c._doc.info
|
||||
info.producer = "MyConverter"
|
||||
info.creationDate = self.get_file_creation_date()
|
||||
info.title = os.path.basename(self.input_path)
|
||||
|
||||
width, height = A4
|
||||
with open(self.input_path, "r", encoding="utf-8") as f:
|
||||
y = height - 50
|
||||
@@ -37,6 +67,7 @@ class TextToPdfConverter(BaseConverter):
|
||||
if y < 50:
|
||||
c.showPage()
|
||||
y = height - 50
|
||||
|
||||
c.save()
|
||||
return str(self.output_path)
|
||||
|
||||
@@ -81,3 +112,31 @@ class MarkdownToPdfConverter(BaseConverter):
|
||||
|
||||
def convert(self) -> str:
|
||||
raise NotImplementedError("Markdown to PDF conversion not implemented.")
|
||||
|
||||
|
||||
def convert_to_pdf(filepath: str, output_dir: str = ".") -> str:
|
||||
"""
|
||||
Convert any supported file to PDF.
|
||||
|
||||
Args:
|
||||
filepath (str): Path to the input file.
|
||||
output_dir (str): Directory to save the output PDF.
|
||||
|
||||
Returns:
|
||||
str: Path to the generated PDF.
|
||||
|
||||
Raises:
|
||||
UnsupportedFileTypeError: If the input file type is not supported.
|
||||
"""
|
||||
file_type = detect_file_type(filepath)
|
||||
|
||||
if file_type == "text":
|
||||
converter = TextToPdfConverter(filepath, output_dir=output_dir)
|
||||
elif file_type == "image":
|
||||
converter = ImageToPdfConverter(filepath, output_dir=output_dir)
|
||||
elif file_type == "word":
|
||||
converter = WordToPdfConverter(filepath, output_dir=output_dir)
|
||||
else:
|
||||
raise ValueError(f"Unsupported file type: {file_type}")
|
||||
|
||||
return converter.convert()
|
||||
|
||||
@@ -12,7 +12,8 @@ from app.config import settings
|
||||
from app.database.connection import get_database
|
||||
from app.services.document_service import DocumentService
|
||||
from app.services.job_service import JobService
|
||||
from tasks.common.converter_utils import convert_to_pdf
|
||||
from tasks.common.document_utils import save_as_object
|
||||
from tasks.common.pdf_converter import convert_to_pdf
|
||||
from tasks.main import celery_app
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -66,6 +67,8 @@ def process_document(self, filepath: str) -> Dict[str, Any]:
|
||||
|
||||
# Step 4: Create the pdf version of the document
|
||||
pdf_file_path = convert_to_pdf(filepath, settings.get_temp_folder())
|
||||
digest = save_as_object(pdf_file_path)
|
||||
logger.info(f"Job {task_id} internal PDF file created: {digest}")
|
||||
|
||||
# Step x: Mark job as completed
|
||||
job_service.mark_job_as_completed(job_id=job.id)
|
||||
|
||||
@@ -7,10 +7,10 @@ import logging
|
||||
import os
|
||||
|
||||
from celery import Celery
|
||||
from celery.signals import worker_process_init
|
||||
|
||||
from app.config import settings
|
||||
|
||||
|
||||
# Environment variables
|
||||
REDIS_URL = settings.get_redis_url()
|
||||
MONGODB_URL = settings.get_mongodb_url()
|
||||
@@ -38,11 +38,21 @@ celery_app.conf.update(
|
||||
task_soft_time_limit=240, # 4 minutes
|
||||
)
|
||||
|
||||
if __name__ == "__main__":
|
||||
# initialize temp folder if needed
|
||||
|
||||
def global_init(**kwargs):
|
||||
"""Initialize global variables."""
|
||||
logger.info(f"{'*' * 20}")
|
||||
logger.info(f"{'--' * 5}" + " Starting MyDocManager worker " + f"{'--' * 5}")
|
||||
logger.info(f"{'*' * 20}")
|
||||
tmp_folder = settings.get_temp_folder()
|
||||
if not os.path.exists(tmp_folder):
|
||||
logger.info(f"Creating temporary folder: {tmp_folder}")
|
||||
os.makedirs(tmp_folder)
|
||||
|
||||
else:
|
||||
logger.info(f"Temporary folder already exists: {os.path.abspath(tmp_folder)}")
|
||||
|
||||
global_init()
|
||||
|
||||
if __name__ == "__main__":
|
||||
global_init()
|
||||
celery_app.start()
|
||||
|
||||
Reference in New Issue
Block a user