200 lines
6.2 KiB
Python
200 lines
6.2 KiB
Python
import datetime
|
|
import hashlib
|
|
import os
|
|
import uuid
|
|
from abc import ABC
|
|
from pathlib import Path
|
|
from typing import Self
|
|
|
|
import pikepdf
|
|
import pypandoc
|
|
from PIL import Image
|
|
from reportlab.lib.pagesizes import A4
|
|
from reportlab.pdfgen import canvas
|
|
|
|
from tasks.common.converter_utils import detect_file_type
|
|
|
|
|
|
class BaseConverter(ABC):
|
|
"""Abstract base class for file converters to PDF."""
|
|
|
|
def __init__(self, input_path: str, output_dir: str = ".") -> None:
|
|
self.input_path = Path(input_path)
|
|
self.output_dir = Path(output_dir)
|
|
self.output_path = self.output_dir / f"{self.generate_uuid_filename()}.pdf"
|
|
|
|
def convert(self) -> Self:
|
|
"""Convert input file to PDF and return the output path."""
|
|
pass
|
|
|
|
@staticmethod
|
|
def generate_uuid_filename() -> str:
|
|
"""Generate a unique filename using UUID4."""
|
|
return str(uuid.uuid4())
|
|
|
|
def get_deterministic_date(self) -> str:
|
|
"""
|
|
Generate a deterministic date based on file content.
|
|
This ensures the same file always produces the same PDF.
|
|
"""
|
|
# Option 1: Use a fixed date
|
|
# return "D:20000101000000"
|
|
|
|
# Option 2: Generate date from content hash (recommended)
|
|
with open(self.input_path, 'rb') as f:
|
|
content = f.read()
|
|
content_hash = hashlib.sha256(content).hexdigest()
|
|
|
|
# Use first 14 characters of hash to create a valid date
|
|
# Format: D:YYYYMMDDHHmmss
|
|
hash_int = int(content_hash[:14], 16)
|
|
|
|
# Create a date between 2000-2099 to keep it reasonable
|
|
year = 2000 + (hash_int % 100)
|
|
month = 1 + (hash_int % 12)
|
|
day = 1 + (hash_int % 28) # Stay safe with 28 days
|
|
hour = hash_int % 24
|
|
minute = hash_int % 60
|
|
second = hash_int % 60
|
|
|
|
return f"D:{year:04d}{month:02d}{day:02d}{hour:02d}{minute:02d}{second:02d}"
|
|
|
|
def get_file_creation_date(self):
|
|
# Get file creation time (or modification time)
|
|
ts = os.path.getctime(self.input_path) # getmtime(self.input_path) for last modification
|
|
dt = datetime.datetime.fromtimestamp(ts)
|
|
|
|
# PDF expects format D:YYYYMMDDHHmmss
|
|
creation_date = dt.strftime("D:%Y%m%d%H%M%S")
|
|
return creation_date
|
|
|
|
def clean_pdf(self) -> Self:
|
|
"""Remove all non-deterministic metadata from PDF."""
|
|
with pikepdf.open(self.output_path, allow_overwriting_input=True) as pdf:
|
|
# Remove XMP metadata if it exists
|
|
if hasattr(pdf.Root, 'Metadata'):
|
|
del pdf.Root.Metadata
|
|
|
|
# Clear all document info by deleting each key
|
|
for key in list(pdf.docinfo.keys()):
|
|
del pdf.docinfo[key]
|
|
|
|
# Set deterministic metadata
|
|
pdf.docinfo["/Producer"] = "MyConverter"
|
|
pdf.docinfo["/Creator"] = "MyConverter"
|
|
pdf.docinfo["/CreationDate"] = self.get_deterministic_date()
|
|
pdf.docinfo["/ModDate"] = self.get_deterministic_date()
|
|
pdf.docinfo["/Title"] = self.input_path.name
|
|
|
|
# Save with deterministic IDs
|
|
# compress=True ensures consistent compression
|
|
# deterministic_id=True (if available) or static_id=True
|
|
pdf.save(
|
|
self.output_path,
|
|
fix_metadata_version=True,
|
|
compress_streams=True,
|
|
stream_decode_level=pikepdf.StreamDecodeLevel.generalized,
|
|
object_stream_mode=pikepdf.ObjectStreamMode.disable,
|
|
deterministic_id=True # Use this if pikepdf >= 8.0.0, otherwise use static_id=True
|
|
)
|
|
|
|
return self
|
|
|
|
|
|
class TextToPdfConverter(BaseConverter):
|
|
"""Converter for text files to PDF."""
|
|
|
|
def convert(self) -> Self:
|
|
c = canvas.Canvas(str(self.output_path), pagesize=A4)
|
|
|
|
# Fix metadata with deterministic values
|
|
info = c._doc.info
|
|
info.producer = "MyConverter"
|
|
info.creationDate = self.get_file_creation_date()
|
|
info.title = os.path.basename(self.input_path)
|
|
|
|
width, height = A4
|
|
with open(self.input_path, "r", encoding="utf-8") as f:
|
|
y = height - 50
|
|
for line in f:
|
|
c.drawString(50, y, line.strip())
|
|
y -= 15
|
|
if y < 50:
|
|
c.showPage()
|
|
y = height - 50
|
|
|
|
c.save()
|
|
return self
|
|
|
|
|
|
class ImageToPdfConverter(BaseConverter):
|
|
"""Converter for image files to PDF."""
|
|
|
|
def convert(self) -> Self:
|
|
image = Image.open(self.input_path)
|
|
rgb_image = image.convert("RGB")
|
|
rgb_image.save(self.output_path)
|
|
return self
|
|
|
|
|
|
class WordToPdfConverter(BaseConverter):
|
|
"""Converter for Word files (.docx) to PDF using pypandoc."""
|
|
|
|
def convert(self) -> Self:
|
|
pypandoc.convert_file(
|
|
str(self.input_path), "pdf", outputfile=str(self.output_path)
|
|
)
|
|
return self
|
|
|
|
|
|
# Placeholders for future extensions
|
|
class HtmlToPdfConverter(BaseConverter):
|
|
"""Placeholder for HTML to PDF converter."""
|
|
|
|
def convert(self) -> Self:
|
|
raise NotImplementedError("HTML to PDF conversion not implemented.")
|
|
|
|
|
|
class ExcelToPdfConverter(BaseConverter):
|
|
"""Placeholder for Excel to PDF converter."""
|
|
|
|
def convert(self) -> Self:
|
|
raise NotImplementedError("Excel to PDF conversion not implemented.")
|
|
|
|
|
|
class MarkdownToPdfConverter(BaseConverter):
|
|
"""Placeholder for Markdown to PDF converter."""
|
|
|
|
def convert(self) -> Self:
|
|
raise NotImplementedError("Markdown to PDF conversion not implemented.")
|
|
|
|
|
|
def convert_to_pdf(filepath: str, output_dir: str = ".") -> str:
|
|
"""
|
|
Convert any supported file to PDF.
|
|
|
|
Args:
|
|
filepath (str): Path to the input file.
|
|
output_dir (str): Directory to save the output PDF.
|
|
|
|
Returns:
|
|
str: Path to the generated PDF.
|
|
|
|
Raises:
|
|
UnsupportedFileTypeError: If the input file type is not supported.
|
|
"""
|
|
file_type = detect_file_type(filepath)
|
|
|
|
if file_type == "text":
|
|
converter = TextToPdfConverter(filepath, output_dir=output_dir)
|
|
elif file_type == "image":
|
|
converter = ImageToPdfConverter(filepath, output_dir=output_dir)
|
|
elif file_type == "word":
|
|
converter = WordToPdfConverter(filepath, output_dir=output_dir)
|
|
else:
|
|
raise ValueError(f"Unsupported file type: {file_type}")
|
|
|
|
converter.convert()
|
|
converter.clean_pdf()
|
|
return str(converter.output_path)
|