673 lines
25 KiB
Python
673 lines
25 KiB
Python
"""
|
|
Test suite for FileDocumentRepository with async/await support.
|
|
|
|
This module contains comprehensive tests for all FileDocumentRepository methods
|
|
using mongomock-motor for in-memory MongoDB testing.
|
|
"""
|
|
|
|
import pytest
|
|
from datetime import datetime
|
|
from typing import Dict, Any
|
|
|
|
import pytest_asyncio
|
|
from bson import ObjectId
|
|
from pymongo.errors import DuplicateKeyError, PyMongoError
|
|
from mongomock_motor import AsyncMongoMockClient
|
|
|
|
from app.database.repositories.document_repository import (
|
|
FileDocumentRepository,
|
|
MatchMethodBase,
|
|
SubsequenceMatching,
|
|
FuzzyMatching
|
|
)
|
|
from app.models.document import FileDocument, FileType, ExtractionMethod
|
|
|
|
|
|
@pytest_asyncio.fixture
|
|
async def in_memory_repository():
|
|
"""Create an in-memory FileDocumentRepository for testing."""
|
|
client = AsyncMongoMockClient()
|
|
db = client.test_database
|
|
repo = FileDocumentRepository(db)
|
|
await repo.initialize()
|
|
return repo
|
|
|
|
|
|
@pytest.fixture
|
|
def sample_file_document():
|
|
"""Sample FileDocument data for testing."""
|
|
return FileDocument(
|
|
filename="sample_document.pdf",
|
|
filepath="/home/user/documents/sample_document.pdf",
|
|
file_type=FileType.PDF,
|
|
extraction_method=ExtractionMethod.OCR,
|
|
metadata={"pages": 5, "language": "en", "author": "John Doe"},
|
|
detected_at=datetime.now(),
|
|
file_hash="a1b2c3d4e5f6789012345678901234567890abcdef1234567890abcdef123456",
|
|
encoding="utf-8",
|
|
file_size=1024000,
|
|
mime_type="application/pdf"
|
|
)
|
|
|
|
|
|
@pytest.fixture
|
|
def sample_update_data():
|
|
"""Sample update data for testing."""
|
|
return {
|
|
"extraction_method": ExtractionMethod.HYBRID,
|
|
"metadata": {"pages": 10, "language": "fr", "updated": True},
|
|
"file_size": 2048000
|
|
}
|
|
|
|
|
|
@pytest.fixture
|
|
def multiple_sample_files():
|
|
"""Multiple FileDocument objects for list/search testing."""
|
|
base_time = datetime.now()
|
|
return [
|
|
FileDocument(
|
|
filename="first_doc.txt",
|
|
filepath="/docs/first_doc.txt",
|
|
file_type=FileType.TXT,
|
|
extraction_method=ExtractionMethod.DIRECT_TEXT,
|
|
metadata={"words": 500},
|
|
detected_at=base_time,
|
|
file_hash="hash1" + "0" * 58,
|
|
encoding="utf-8",
|
|
file_size=5000,
|
|
mime_type="text/plain"
|
|
),
|
|
FileDocument(
|
|
filename="second_document.pdf",
|
|
filepath="/docs/second_document.pdf",
|
|
file_type=FileType.PDF,
|
|
extraction_method=ExtractionMethod.OCR,
|
|
metadata={"pages": 8},
|
|
detected_at=base_time,
|
|
file_hash="hash2" + "0" * 58,
|
|
encoding="utf-8",
|
|
file_size=10000,
|
|
mime_type="application/pdf"
|
|
),
|
|
FileDocument(
|
|
filename="third_file.docx",
|
|
filepath="/docs/third_file.docx",
|
|
file_type=FileType.DOCX,
|
|
extraction_method=ExtractionMethod.HYBRID,
|
|
metadata={"paragraphs": 15},
|
|
detected_at=base_time,
|
|
file_hash="hash3" + "0" * 58,
|
|
encoding="utf-8",
|
|
file_size=15000,
|
|
mime_type="application/vnd.openxmlformats-officedocument.wordprocessingml.document"
|
|
)
|
|
]
|
|
|
|
|
|
class TestFileDocumentRepositoryInitialization:
|
|
"""Tests for repository initialization."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_i_can_initialize_repository(self):
|
|
"""Test repository initialization."""
|
|
# Arrange
|
|
client = AsyncMongoMockClient()
|
|
db = client.test_database
|
|
repo = FileDocumentRepository(db)
|
|
await repo.initialize()
|
|
|
|
# Act & Assert (should not raise any exception)
|
|
assert repo.db is not None
|
|
assert repo.collection is not None
|
|
# TODO : check that the indexes are created
|
|
|
|
|
|
class TestFileDocumentRepositoryCreation:
|
|
"""Tests for file document creation functionality."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_i_can_create_file_document(self, in_memory_repository, sample_file_document):
|
|
"""Test successful file document creation."""
|
|
# Act
|
|
created_file = await in_memory_repository.create_document(sample_file_document)
|
|
|
|
# Assert
|
|
assert created_file is not None
|
|
assert created_file.filename == sample_file_document.filename
|
|
assert created_file.filepath == sample_file_document.filepath
|
|
assert created_file.file_type == sample_file_document.file_type
|
|
assert created_file.extraction_method == sample_file_document.extraction_method
|
|
assert created_file.metadata == sample_file_document.metadata
|
|
assert created_file.file_hash == sample_file_document.file_hash
|
|
assert created_file.file_size == sample_file_document.file_size
|
|
assert created_file.mime_type == sample_file_document.mime_type
|
|
assert created_file.id is not None
|
|
assert isinstance(created_file.id, ObjectId)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_i_can_create_file_document_without_id(self, in_memory_repository, sample_file_document):
|
|
"""Test creating file document with _id set to None (should be removed)."""
|
|
# Arrange
|
|
sample_file_document.id = None
|
|
|
|
# Act
|
|
created_file = await in_memory_repository.create_document(sample_file_document)
|
|
|
|
# Assert
|
|
assert created_file is not None
|
|
assert created_file.id is not None
|
|
assert isinstance(created_file.id, ObjectId)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_i_cannot_create_duplicate_file_document(self, in_memory_repository, sample_file_document):
|
|
"""Test that creating file document with duplicate filepath raises DuplicateKeyError."""
|
|
# Arrange
|
|
await in_memory_repository.create_document(sample_file_document)
|
|
duplicate_file = FileDocument(
|
|
filename="different_name.pdf",
|
|
filepath=sample_file_document.filepath, # Same filepath
|
|
file_type=FileType.PDF,
|
|
extraction_method=ExtractionMethod.OCR,
|
|
metadata={"different": "metadata"},
|
|
detected_at=datetime.now(),
|
|
file_hash="different_hash_123456789012345678901234567890123456789012345678",
|
|
encoding="utf-8",
|
|
file_size=2000,
|
|
mime_type="application/pdf"
|
|
)
|
|
|
|
# Act & Assert
|
|
with pytest.raises(DuplicateKeyError) as exc_info:
|
|
await in_memory_repository.create_document(duplicate_file)
|
|
|
|
assert "already exists" in str(exc_info.value)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_i_cannot_create_file_document_with_pymongo_error(self, in_memory_repository,
|
|
sample_file_document, mocker):
|
|
"""Test handling of PyMongo errors during file document creation."""
|
|
# Arrange
|
|
mocker.patch.object(in_memory_repository.collection, 'insert_one', side_effect=PyMongoError("Database error"))
|
|
|
|
# Act & Assert
|
|
with pytest.raises(ValueError) as exc_info:
|
|
await in_memory_repository.create_document(sample_file_document)
|
|
|
|
assert "Failed to create file document" in str(exc_info.value)
|
|
|
|
|
|
class TestFileDocumentRepositoryFinding:
|
|
"""Tests for file document finding functionality."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_i_can_find_document_by_valid_id(self, in_memory_repository, sample_file_document):
|
|
"""Test finding file document by valid ObjectId."""
|
|
# Arrange
|
|
created_file = await in_memory_repository.create_document(sample_file_document)
|
|
|
|
# Act
|
|
found_file = await in_memory_repository.find_document_by_id(str(created_file.id))
|
|
|
|
# Assert
|
|
assert found_file is not None
|
|
assert found_file.id == created_file.id
|
|
assert found_file.filename == created_file.filename
|
|
assert found_file.filepath == created_file.filepath
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_i_cannot_find_document_with_invalid_id(self, in_memory_repository):
|
|
"""Test that invalid ObjectId returns None."""
|
|
# Act
|
|
found_file = await in_memory_repository.find_document_by_id("invalid_id")
|
|
|
|
# Assert
|
|
assert found_file is None
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_i_cannot_find_document_by_nonexistent_id(self, in_memory_repository):
|
|
"""Test that nonexistent but valid ObjectId returns None."""
|
|
# Arrange
|
|
nonexistent_id = str(ObjectId())
|
|
|
|
# Act
|
|
found_file = await in_memory_repository.find_document_by_id(nonexistent_id)
|
|
|
|
# Assert
|
|
assert found_file is None
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_i_can_find_document_by_file_hash(self, in_memory_repository, sample_file_document):
|
|
"""Test finding file document by file hash."""
|
|
# Arrange
|
|
created_file = await in_memory_repository.create_document(sample_file_document)
|
|
|
|
# Act
|
|
found_file = await in_memory_repository.find_document_by_hash(sample_file_document.file_hash)
|
|
|
|
# Assert
|
|
assert found_file is not None
|
|
assert found_file.file_hash == created_file.file_hash
|
|
assert found_file.id == created_file.id
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_i_cannot_find_document_with_nonexistent_file_hash(self, in_memory_repository):
|
|
"""Test that nonexistent file hash returns None."""
|
|
# Act
|
|
found_file = await in_memory_repository.find_document_by_hash("nonexistent_hash")
|
|
|
|
# Assert
|
|
assert found_file is None
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_i_can_find_document_by_filepath(self, in_memory_repository, sample_file_document):
|
|
"""Test finding file document by filepath."""
|
|
# Arrange
|
|
created_file = await in_memory_repository.create_document(sample_file_document)
|
|
|
|
# Act
|
|
found_file = await in_memory_repository.find_document_by_filepath(sample_file_document.filepath)
|
|
|
|
# Assert
|
|
assert found_file is not None
|
|
assert found_file.filepath == created_file.filepath
|
|
assert found_file.id == created_file.id
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_i_cannot_find_document_with_nonexistent_filepath(self, in_memory_repository):
|
|
"""Test that nonexistent filepath returns None."""
|
|
# Act
|
|
found_file = await in_memory_repository.find_document_by_filepath("/nonexistent/path/file.pdf")
|
|
|
|
# Assert
|
|
assert found_file is None
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_i_cannot_find_document_with_pymongo_error(self, in_memory_repository, mocker):
|
|
"""Test handling of PyMongo errors during file document finding."""
|
|
# Arrange
|
|
mocker.patch.object(in_memory_repository.collection, 'find_one', side_effect=PyMongoError("Database error"))
|
|
|
|
# Act
|
|
found_file = await in_memory_repository.find_document_by_hash("test_hash")
|
|
|
|
# Assert
|
|
assert found_file is None
|
|
|
|
|
|
class TestFileDocumentRepositoryNameMatching:
|
|
"""Tests for file document name matching functionality."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_i_can_find_documents_by_name_with_fuzzy_matching(self, in_memory_repository, multiple_sample_files):
|
|
"""Test finding file documents by filename using fuzzy matching."""
|
|
# Arrange
|
|
for file_doc in multiple_sample_files:
|
|
await in_memory_repository.create_document(file_doc)
|
|
|
|
# Act
|
|
fuzzy_method = FuzzyMatching(threshold=0.5)
|
|
found_files = await in_memory_repository.find_document_by_name("document", fuzzy_method)
|
|
|
|
# Assert
|
|
assert len(found_files) >= 1
|
|
assert all(isinstance(file_doc, FileDocument) for file_doc in found_files)
|
|
# Should find files with "document" in the name
|
|
found_filenames = [f.filename for f in found_files]
|
|
assert any("document" in fname.lower() for fname in found_filenames)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_i_can_find_documents_by_name_with_subsequence_matching(self, in_memory_repository,
|
|
multiple_sample_files):
|
|
"""Test finding file documents by filename using subsequence matching."""
|
|
# Arrange
|
|
for file_doc in multiple_sample_files:
|
|
await in_memory_repository.create_document(file_doc)
|
|
|
|
# Act
|
|
subsequence_method = SubsequenceMatching()
|
|
found_files = await in_memory_repository.find_document_by_name("doc", subsequence_method)
|
|
|
|
# Assert
|
|
assert len(found_files) >= 1
|
|
assert all(isinstance(file_doc, FileDocument) for file_doc in found_files)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_i_can_find_documents_by_name_with_default_method(self, in_memory_repository, multiple_sample_files):
|
|
"""Test finding file documents by filename with default matching method."""
|
|
# Arrange
|
|
for file_doc in multiple_sample_files:
|
|
await in_memory_repository.create_document(file_doc)
|
|
|
|
# Act
|
|
found_files = await in_memory_repository.find_document_by_name("first")
|
|
|
|
# Assert
|
|
assert len(found_files) >= 0
|
|
assert all(isinstance(file_doc, FileDocument) for file_doc in found_files)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_i_cannot_find_documents_by_name_with_pymongo_error(self, in_memory_repository, mocker):
|
|
"""Test handling of PyMongo errors during document name matching."""
|
|
# Arrange
|
|
mocker.patch.object(in_memory_repository.collection, 'find', side_effect=PyMongoError("Database error"))
|
|
|
|
# Act
|
|
found_files = await in_memory_repository.find_document_by_name("test")
|
|
|
|
# Assert
|
|
assert found_files == []
|
|
|
|
|
|
class TestFileDocumentRepositoryListing:
|
|
"""Tests for file document listing functionality."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_i_can_list_documents_with_default_pagination(self, in_memory_repository, multiple_sample_files):
|
|
"""Test listing file documents with default pagination."""
|
|
# Arrange
|
|
for file_doc in multiple_sample_files:
|
|
await in_memory_repository.create_document(file_doc)
|
|
|
|
# Act
|
|
files = await in_memory_repository.list_documents()
|
|
|
|
# Assert
|
|
assert len(files) == len(multiple_sample_files)
|
|
assert all(isinstance(file_doc, FileDocument) for file_doc in files)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_i_can_list_documents_with_custom_pagination(self, in_memory_repository, multiple_sample_files):
|
|
"""Test listing file documents with custom pagination."""
|
|
# Arrange
|
|
for file_doc in multiple_sample_files:
|
|
await in_memory_repository.create_document(file_doc)
|
|
|
|
# Act
|
|
files_page1 = await in_memory_repository.list_documents(skip=0, limit=2)
|
|
files_page2 = await in_memory_repository.list_documents(skip=2, limit=2)
|
|
|
|
# Assert
|
|
assert len(files_page1) == 2
|
|
assert len(files_page2) == 1 # Only 3 total files
|
|
|
|
# Ensure no overlap between pages
|
|
page1_ids = [file_doc.id for file_doc in files_page1]
|
|
page2_ids = [file_doc.id for file_doc in files_page2]
|
|
assert len(set(page1_ids).intersection(set(page2_ids))) == 0
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_i_can_list_documents_sorted_by_detected_at(self, in_memory_repository, sample_file_document):
|
|
"""Test that file documents are sorted by detected_at in descending order."""
|
|
# Arrange
|
|
file1 = sample_file_document.model_copy()
|
|
file1.filepath = "/docs/file1.pdf"
|
|
file1.filename = "file1.pdf"
|
|
file1.file_hash = "hash1" + "0" * 58
|
|
file1.detected_at = datetime(2024, 1, 1, 10, 0, 0)
|
|
|
|
file2 = sample_file_document.model_copy()
|
|
file2.filepath = "/docs/file2.pdf"
|
|
file2.filename = "file2.pdf"
|
|
file2.file_hash = "hash2" + "0" * 58
|
|
file2.detected_at = datetime(2024, 1, 2, 10, 0, 0) # Later date
|
|
|
|
created_file1 = await in_memory_repository.create_document(file1)
|
|
created_file2 = await in_memory_repository.create_document(file2)
|
|
|
|
# Act
|
|
files = await in_memory_repository.list_documents()
|
|
|
|
# Assert
|
|
assert len(files) == 2
|
|
# Most recent (latest detected_at) should be first
|
|
assert files[0].id == created_file2.id
|
|
assert files[1].id == created_file1.id
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_i_can_list_empty_documents(self, in_memory_repository):
|
|
"""Test listing file documents from empty collection."""
|
|
# Act
|
|
files = await in_memory_repository.list_documents()
|
|
|
|
# Assert
|
|
assert files == []
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_i_cannot_list_documents_with_pymongo_error(self, in_memory_repository, mocker):
|
|
"""Test handling of PyMongo errors during file document listing."""
|
|
# Arrange
|
|
mocker.patch.object(in_memory_repository.collection, 'find', side_effect=PyMongoError("Database error"))
|
|
|
|
# Act
|
|
files = await in_memory_repository.list_documents()
|
|
|
|
# Assert
|
|
assert files == []
|
|
|
|
|
|
class TestFileDocumentRepositoryUpdate:
|
|
"""Tests for file document update functionality."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_i_can_update_document_successfully(self, in_memory_repository, sample_file_document,
|
|
sample_update_data):
|
|
"""Test successful file document update."""
|
|
# Arrange
|
|
created_file = await in_memory_repository.create_document(sample_file_document)
|
|
|
|
# Act
|
|
updated_file = await in_memory_repository.update_document(str(created_file.id), sample_update_data)
|
|
|
|
# Assert
|
|
assert updated_file is not None
|
|
assert updated_file.extraction_method == sample_update_data["extraction_method"]
|
|
assert updated_file.metadata == sample_update_data["metadata"]
|
|
assert updated_file.file_size == sample_update_data["file_size"]
|
|
assert updated_file.id == created_file.id
|
|
assert updated_file.filename == created_file.filename # Unchanged fields remain
|
|
assert updated_file.filepath == created_file.filepath
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_i_can_update_document_with_partial_data(self, in_memory_repository, sample_file_document):
|
|
"""Test updating file document with partial data."""
|
|
# Arrange
|
|
created_file = await in_memory_repository.create_document(sample_file_document)
|
|
partial_update = {"file_size": 999999}
|
|
|
|
# Act
|
|
updated_file = await in_memory_repository.update_document(str(created_file.id), partial_update)
|
|
|
|
# Assert
|
|
assert updated_file is not None
|
|
assert updated_file.file_size == 999999
|
|
assert updated_file.filename == created_file.filename # Should remain unchanged
|
|
assert updated_file.metadata == created_file.metadata # Should remain unchanged
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_i_can_update_document_filtering_none_values(self, in_memory_repository, sample_file_document):
|
|
"""Test that None values are filtered out from update data."""
|
|
# Arrange
|
|
created_file = await in_memory_repository.create_document(sample_file_document)
|
|
update_with_none = {"file_size": 777777, "metadata": None}
|
|
|
|
# Act
|
|
updated_file = await in_memory_repository.update_document(str(created_file.id), update_with_none)
|
|
|
|
# Assert
|
|
assert updated_file is not None
|
|
assert updated_file.file_size == 777777
|
|
assert updated_file.metadata == created_file.metadata # Should remain unchanged (None filtered out)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_i_can_update_document_with_empty_data(self, in_memory_repository, sample_file_document):
|
|
"""Test updating file document with empty data returns current document."""
|
|
# Arrange
|
|
created_file = await in_memory_repository.create_document(sample_file_document)
|
|
empty_update = {}
|
|
|
|
# Act
|
|
result = await in_memory_repository.update_document(str(created_file.id), empty_update)
|
|
|
|
# Assert
|
|
assert result is not None
|
|
assert result.filename == created_file.filename
|
|
assert result.filepath == created_file.filepath
|
|
assert result.metadata == created_file.metadata
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_i_cannot_update_document_with_invalid_id(self, in_memory_repository, sample_update_data):
|
|
"""Test that updating with invalid ID returns None."""
|
|
# Act
|
|
result = await in_memory_repository.update_document("invalid_id", sample_update_data)
|
|
|
|
# Assert
|
|
assert result is None
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_i_cannot_update_nonexistent_document(self, in_memory_repository, sample_update_data):
|
|
"""Test that updating nonexistent file document returns None."""
|
|
# Arrange
|
|
nonexistent_id = str(ObjectId())
|
|
|
|
# Act
|
|
result = await in_memory_repository.update_document(nonexistent_id, sample_update_data)
|
|
|
|
# Assert
|
|
assert result is None
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_i_cannot_update_document_with_pymongo_error(self, in_memory_repository, sample_file_document,
|
|
sample_update_data, mocker):
|
|
"""Test handling of PyMongo errors during file document update."""
|
|
# Arrange
|
|
created_file = await in_memory_repository.create_document(sample_file_document)
|
|
mocker.patch.object(in_memory_repository.collection, 'find_one_and_update',
|
|
side_effect=PyMongoError("Database error"))
|
|
|
|
# Act
|
|
result = await in_memory_repository.update_document(str(created_file.id), sample_update_data)
|
|
|
|
# Assert
|
|
assert result is None
|
|
|
|
|
|
class TestFileDocumentRepositoryDeletion:
|
|
"""Tests for file document deletion functionality."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_i_can_delete_existing_document(self, in_memory_repository, sample_file_document):
|
|
"""Test successful file document deletion."""
|
|
# Arrange
|
|
created_file = await in_memory_repository.create_document(sample_file_document)
|
|
|
|
# Act
|
|
deletion_result = await in_memory_repository.delete_document(str(created_file.id))
|
|
|
|
# Assert
|
|
assert deletion_result is True
|
|
|
|
# Verify document is actually deleted
|
|
found_file = await in_memory_repository.find_document_by_id(str(created_file.id))
|
|
assert found_file is None
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_i_cannot_delete_document_with_invalid_id(self, in_memory_repository):
|
|
"""Test that deleting with invalid ID returns False."""
|
|
# Act
|
|
result = await in_memory_repository.delete_document("invalid_id")
|
|
|
|
# Assert
|
|
assert result is False
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_i_cannot_delete_nonexistent_document(self, in_memory_repository):
|
|
"""Test that deleting nonexistent file document returns False."""
|
|
# Arrange
|
|
nonexistent_id = str(ObjectId())
|
|
|
|
# Act
|
|
result = await in_memory_repository.delete_document(nonexistent_id)
|
|
|
|
# Assert
|
|
assert result is False
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_i_cannot_delete_document_with_pymongo_error(self, in_memory_repository, sample_file_document, mocker):
|
|
"""Test handling of PyMongo errors during file document deletion."""
|
|
# Arrange
|
|
created_file = await in_memory_repository.create_document(sample_file_document)
|
|
mocker.patch.object(in_memory_repository.collection, 'delete_one', side_effect=PyMongoError("Database error"))
|
|
|
|
# Act
|
|
result = await in_memory_repository.delete_document(str(created_file.id))
|
|
|
|
# Assert
|
|
assert result is False
|
|
|
|
|
|
class TestFileDocumentRepositoryUtilities:
|
|
"""Tests for utility methods."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_i_can_count_documents(self, in_memory_repository, sample_file_document):
|
|
"""Test counting file documents."""
|
|
# Arrange
|
|
initial_count = await in_memory_repository.count_documents()
|
|
await in_memory_repository.create_document(sample_file_document)
|
|
|
|
# Act
|
|
final_count = await in_memory_repository.count_documents()
|
|
|
|
# Assert
|
|
assert final_count == initial_count + 1
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_i_can_count_zero_documents(self, in_memory_repository):
|
|
"""Test counting file documents in empty collection."""
|
|
# Act
|
|
count = await in_memory_repository.count_documents()
|
|
|
|
# Assert
|
|
assert count == 0
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_i_cannot_count_documents_with_pymongo_error(self, in_memory_repository, mocker):
|
|
"""Test handling of PyMongo errors during file document counting."""
|
|
# Arrange
|
|
mocker.patch.object(in_memory_repository.collection, 'count_documents', side_effect=PyMongoError("Database error"))
|
|
|
|
# Act
|
|
count = await in_memory_repository.count_documents()
|
|
|
|
# Assert
|
|
assert count == 0
|
|
|
|
|
|
class TestMatchingMethods:
|
|
"""Tests for matching method classes."""
|
|
|
|
def test_i_can_create_fuzzy_matching_with_default_threshold(self):
|
|
"""Test creating FuzzyMatching with default threshold."""
|
|
# Act
|
|
fuzzy = FuzzyMatching()
|
|
|
|
# Assert
|
|
assert fuzzy.threshold == 0.6
|
|
|
|
def test_i_can_create_fuzzy_matching_with_custom_threshold(self):
|
|
"""Test creating FuzzyMatching with custom threshold."""
|
|
# Act
|
|
fuzzy = FuzzyMatching(threshold=0.8)
|
|
|
|
# Assert
|
|
assert fuzzy.threshold == 0.8
|
|
|
|
def test_i_can_create_subsequence_matching(self):
|
|
"""Test creating SubsequenceMatching."""
|
|
# Act
|
|
subsequence = SubsequenceMatching()
|
|
|
|
# Assert
|
|
assert isinstance(subsequence, MatchMethodBase)
|
|
assert isinstance(subsequence, SubsequenceMatching)
|