Adding document service
This commit is contained in:
380
src/file-processor/app/services/document_service.py
Normal file
380
src/file-processor/app/services/document_service.py
Normal file
@@ -0,0 +1,380 @@
|
||||
"""
|
||||
Document service for orchestrated file and content management.
|
||||
|
||||
This service coordinates between FileDocument and DocumentContent repositories
|
||||
while maintaining data consistency through MongoDB transactions.
|
||||
"""
|
||||
|
||||
import hashlib
|
||||
import magic
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from typing import List, Optional, Dict, Any, Tuple
|
||||
|
||||
from motor.motor_asyncio import AsyncIOMotorClientSession
|
||||
from pymongo.errors import PyMongoError
|
||||
|
||||
from app.database.connection import get_database
|
||||
from app.database.repositories.document_repository import FileDocumentRepository
|
||||
from app.database.repositories.document_content_repository import DocumentContentRepository
|
||||
from app.models.document import (
|
||||
FileDocument,
|
||||
DocumentContent,
|
||||
FileType,
|
||||
ProcessingStatus
|
||||
)
|
||||
from app.models.types import PyObjectId
|
||||
|
||||
|
||||
class DocumentService:
|
||||
"""
|
||||
Service for orchestrated document and content management.
|
||||
|
||||
Provides high-level operations that coordinate between file documents
|
||||
and their content while ensuring data consistency through transactions.
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
"""Initialize the document service with repository dependencies."""
|
||||
self.db = get_database()
|
||||
self.file_repository = FileDocumentRepository(self.db)
|
||||
self.content_repository = DocumentContentRepository(self.db)
|
||||
|
||||
def _calculate_file_hash(self, file_bytes: bytes) -> str:
|
||||
"""
|
||||
Calculate SHA256 hash of file content.
|
||||
|
||||
Args:
|
||||
file_bytes: Raw file content as bytes
|
||||
|
||||
Returns:
|
||||
Hexadecimal SHA256 hash string
|
||||
"""
|
||||
return hashlib.sha256(file_bytes).hexdigest()
|
||||
|
||||
def _detect_file_type(self, file_path: str) -> FileType:
|
||||
"""
|
||||
Detect file type from file extension.
|
||||
|
||||
Args:
|
||||
file_path: Path to the file
|
||||
|
||||
Returns:
|
||||
FileType enum value
|
||||
|
||||
Raises:
|
||||
ValueError: If file type is not supported
|
||||
"""
|
||||
extension = Path(file_path).suffix.lower().lstrip('.')
|
||||
|
||||
try:
|
||||
return FileType(extension)
|
||||
except ValueError:
|
||||
raise ValueError(f"Unsupported file type: {extension}")
|
||||
|
||||
def _detect_mime_type(self, file_bytes: bytes) -> str:
|
||||
"""
|
||||
Detect MIME type from file content.
|
||||
|
||||
Args:
|
||||
file_bytes: Raw file content as bytes
|
||||
|
||||
Returns:
|
||||
MIME type string
|
||||
"""
|
||||
return magic.from_buffer(file_bytes, mime=True)
|
||||
|
||||
async def create_document(
|
||||
self,
|
||||
file_path: str,
|
||||
file_bytes: bytes,
|
||||
encoding: str = "utf-8"
|
||||
) -> FileDocument:
|
||||
"""
|
||||
Create a new document with automatic deduplication.
|
||||
|
||||
This method handles the creation of both FileDocument and DocumentContent
|
||||
with proper deduplication based on file hash. If content with the same
|
||||
hash already exists, only a new FileDocument is created.
|
||||
|
||||
Args:
|
||||
file_path: Full path to the file
|
||||
file_bytes: Raw file content as bytes
|
||||
encoding: Character encoding for text content
|
||||
|
||||
Returns:
|
||||
Created FileDocument instance
|
||||
|
||||
Raises:
|
||||
ValueError: If file type is not supported
|
||||
PyMongoError: If database operation fails
|
||||
"""
|
||||
# Calculate automatic attributes
|
||||
file_hash = self._calculate_file_hash(file_bytes)
|
||||
file_type = self._detect_file_type(file_path)
|
||||
mime_type = self._detect_mime_type(file_bytes)
|
||||
file_size = len(file_bytes)
|
||||
filename = Path(file_path).name
|
||||
detected_at = datetime.utcnow()
|
||||
|
||||
# Start MongoDB transaction
|
||||
async with await self.db.client.start_session() as session:
|
||||
async with session.start_transaction():
|
||||
try:
|
||||
# Check if content already exists
|
||||
existing_content = await self.content_repository.find_document_content_by_file_hash(
|
||||
file_hash, session=session
|
||||
)
|
||||
|
||||
# Create DocumentContent if it doesn't exist
|
||||
if not existing_content:
|
||||
content_data = DocumentContent(
|
||||
file_hash=file_hash,
|
||||
content="", # Will be populated by processing workers
|
||||
encoding=encoding,
|
||||
file_size=file_size,
|
||||
mime_type=mime_type
|
||||
)
|
||||
await self.content_repository.create_document_content(
|
||||
content_data, session=session
|
||||
)
|
||||
|
||||
# Create FileDocument
|
||||
file_data = FileDocument(
|
||||
filename=filename,
|
||||
filepath=file_path,
|
||||
file_type=file_type,
|
||||
extraction_method=None, # Will be set by processing workers
|
||||
metadata={}, # Empty for now
|
||||
detected_at=detected_at,
|
||||
file_hash=file_hash
|
||||
)
|
||||
|
||||
created_file = await self.file_repository.create_document(
|
||||
file_data, session=session
|
||||
)
|
||||
|
||||
return created_file
|
||||
|
||||
except Exception as e:
|
||||
# Transaction will automatically rollback
|
||||
raise PyMongoError(f"Failed to create document: {str(e)}")
|
||||
|
||||
async def get_document_by_id(self, document_id: PyObjectId) -> Optional[FileDocument]:
|
||||
"""
|
||||
Retrieve a document by its ID.
|
||||
|
||||
Args:
|
||||
document_id: Document ObjectId
|
||||
|
||||
Returns:
|
||||
FileDocument if found, None otherwise
|
||||
"""
|
||||
return await self.file_repository.find_document_by_id(document_id)
|
||||
|
||||
async def get_document_by_hash(self, file_hash: str) -> Optional[FileDocument]:
|
||||
"""
|
||||
Retrieve a document by its file hash.
|
||||
|
||||
Args:
|
||||
file_hash: SHA256 hash of file content
|
||||
|
||||
Returns:
|
||||
FileDocument if found, None otherwise
|
||||
"""
|
||||
return await self.file_repository.find_document_by_hash(file_hash)
|
||||
|
||||
async def get_document_by_filepath(self, filepath: str) -> Optional[FileDocument]:
|
||||
"""
|
||||
Retrieve a document by its file path.
|
||||
|
||||
Args:
|
||||
filepath: Full path to the file
|
||||
|
||||
Returns:
|
||||
FileDocument if found, None otherwise
|
||||
"""
|
||||
return await self.file_repository.find_document_by_filepath(filepath)
|
||||
|
||||
async def get_document_with_content(
|
||||
self,
|
||||
document_id: PyObjectId
|
||||
) -> Optional[Tuple[FileDocument, DocumentContent]]:
|
||||
"""
|
||||
Retrieve a document with its associated content.
|
||||
|
||||
Args:
|
||||
document_id: Document ObjectId
|
||||
|
||||
Returns:
|
||||
Tuple of (FileDocument, DocumentContent) if found, None otherwise
|
||||
"""
|
||||
document = await self.get_document_by_id(document_id)
|
||||
if not document:
|
||||
return None
|
||||
|
||||
content = await self.content_repository.find_document_content_by_file_hash(
|
||||
document.file_hash
|
||||
)
|
||||
if not content:
|
||||
return None
|
||||
|
||||
return (document, content)
|
||||
|
||||
async def list_documents(
|
||||
self,
|
||||
skip: int = 0,
|
||||
limit: int = 100
|
||||
) -> List[FileDocument]:
|
||||
"""
|
||||
List documents with pagination.
|
||||
|
||||
Args:
|
||||
skip: Number of documents to skip
|
||||
limit: Maximum number of documents to return
|
||||
|
||||
Returns:
|
||||
List of FileDocument instances
|
||||
"""
|
||||
return await self.file_repository.list_documents(skip=skip, limit=limit)
|
||||
|
||||
async def count_documents(self) -> int:
|
||||
"""
|
||||
Get total number of documents.
|
||||
|
||||
Returns:
|
||||
Total document count
|
||||
"""
|
||||
return await self.file_repository.count_documents()
|
||||
|
||||
async def update_document(
|
||||
self,
|
||||
document_id: PyObjectId,
|
||||
update_data: Dict[str, Any]
|
||||
) -> Optional[FileDocument]:
|
||||
"""
|
||||
Update document metadata.
|
||||
|
||||
Args:
|
||||
document_id: Document ObjectId
|
||||
update_data: Dictionary with fields to update
|
||||
|
||||
Returns:
|
||||
Updated FileDocument if found, None otherwise
|
||||
"""
|
||||
return await self.file_repository.update_document(document_id, update_data)
|
||||
|
||||
async def delete_document(self, document_id: PyObjectId) -> bool:
|
||||
"""
|
||||
Delete a document and its orphaned content.
|
||||
|
||||
This method removes the FileDocument and checks if the associated
|
||||
DocumentContent is orphaned (no other files reference it). If orphaned,
|
||||
the content is also deleted.
|
||||
|
||||
Args:
|
||||
document_id: Document ObjectId
|
||||
|
||||
Returns:
|
||||
True if document was deleted, False otherwise
|
||||
|
||||
Raises:
|
||||
PyMongoError: If database operation fails
|
||||
"""
|
||||
# Start MongoDB transaction
|
||||
async with await self.db.client.start_session() as session:
|
||||
async with session.start_transaction():
|
||||
try:
|
||||
# Get document to find its hash
|
||||
document = await self.file_repository.find_document_by_id(
|
||||
document_id, session=session
|
||||
)
|
||||
if not document:
|
||||
return False
|
||||
|
||||
# Delete the document
|
||||
deleted = await self.file_repository.delete_document(
|
||||
document_id, session=session
|
||||
)
|
||||
if not deleted:
|
||||
return False
|
||||
|
||||
# Check if content is orphaned
|
||||
remaining_files = await self.file_repository.find_document_by_hash(
|
||||
document.file_hash, session=session
|
||||
)
|
||||
|
||||
# If no other files reference this content, delete it
|
||||
if not remaining_files:
|
||||
content = await self.content_repository.find_document_content_by_file_hash(
|
||||
document.file_hash, session=session
|
||||
)
|
||||
if content:
|
||||
await self.content_repository.delete_document_content(
|
||||
content.id, session=session
|
||||
)
|
||||
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
# Transaction will automatically rollback
|
||||
raise PyMongoError(f"Failed to delete document: {str(e)}")
|
||||
|
||||
async def content_exists(self, file_hash: str) -> bool:
|
||||
"""
|
||||
Check if content with given hash exists.
|
||||
|
||||
Args:
|
||||
file_hash: SHA256 hash of file content
|
||||
|
||||
Returns:
|
||||
True if content exists, False otherwise
|
||||
"""
|
||||
return await self.content_repository.content_exists(file_hash)
|
||||
|
||||
async def get_content_by_hash(self, file_hash: str) -> Optional[DocumentContent]:
|
||||
"""
|
||||
Retrieve content by file hash.
|
||||
|
||||
Args:
|
||||
file_hash: SHA256 hash of file content
|
||||
|
||||
Returns:
|
||||
DocumentContent if found, None otherwise
|
||||
"""
|
||||
return await self.content_repository.find_document_content_by_file_hash(file_hash)
|
||||
|
||||
async def update_document_content(
|
||||
self,
|
||||
file_hash: str,
|
||||
content: str,
|
||||
encoding: str = "utf-8"
|
||||
) -> Optional[DocumentContent]:
|
||||
"""
|
||||
Update the extracted content for a document.
|
||||
|
||||
This method is typically called by processing workers to store
|
||||
the extracted text content.
|
||||
|
||||
Args:
|
||||
file_hash: SHA256 hash of file content
|
||||
content: Extracted text content
|
||||
encoding: Character encoding
|
||||
|
||||
Returns:
|
||||
Updated DocumentContent if found, None otherwise
|
||||
"""
|
||||
existing_content = await self.content_repository.find_document_content_by_file_hash(
|
||||
file_hash
|
||||
)
|
||||
if not existing_content:
|
||||
return None
|
||||
|
||||
update_data = {
|
||||
"content": content,
|
||||
"encoding": encoding
|
||||
}
|
||||
|
||||
return await self.content_repository.update_document_content(
|
||||
existing_content.id, update_data
|
||||
)
|
||||
Reference in New Issue
Block a user