Fixed unit tests

This commit is contained in:
2025-09-19 21:06:09 +02:00
parent c3ea80363f
commit e8b306ac4a
6 changed files with 195 additions and 66 deletions

View File

@@ -12,6 +12,20 @@ from difflib import SequenceMatcher
from motor.motor_asyncio import AsyncIOMotorCollection from motor.motor_asyncio import AsyncIOMotorCollection
from app.models.document import FileDocument from app.models.document import FileDocument
from app.database.connection import get_database from app.database.connection import get_database
from app.utils.ducment_matching import fuzzy_matching, subsequence_matching
class MatchMethodBase:
pass
class SubsequenceMatching(MatchMethodBase):
pass
class FuzzyMatching(MatchMethodBase):
def __init__(self, threshold: float = 0.6):
self.threshold = threshold
class FileDocumentRepository: class FileDocumentRepository:
@@ -28,6 +42,14 @@ class FileDocumentRepository:
self.collection: AsyncIOMotorCollection = self.db.files self.collection: AsyncIOMotorCollection = self.db.files
self._ensure_indexes() self._ensure_indexes()
async def initialize(self):
"""
Initialize repository by ensuring required indexes exist.
Should be called after repository instantiation to setup database indexes.
"""
await self._ensure_indexes()
async def _ensure_indexes(self): async def _ensure_indexes(self):
""" """
Ensure required database indexes exist. Ensure required database indexes exist.
@@ -64,7 +86,7 @@ class FileDocumentRepository:
return file_data return file_data
except DuplicateKeyError as e: except DuplicateKeyError as e:
raise DuplicateKeyError(f"File with same hash already exists: {e}") raise DuplicateKeyError(f"File with same file path already exists: {e}")
except PyMongoError as e: except PyMongoError as e:
raise ValueError(f"Failed to create file document: {e}") raise ValueError(f"Failed to create file document: {e}")
@@ -128,13 +150,13 @@ class FileDocumentRepository:
except PyMongoError: except PyMongoError:
return None return None
async def find_document_by_name(self, filename: str, similarity_threshold: float = 0.6) -> List[FileDocument]: async def find_document_by_name(self, filename: str, matching_method: MatchMethodBase = None) -> List[FileDocument]:
""" """
Find file documents by filename using fuzzy matching. Find file documents by filename using fuzzy matching.
Args: Args:
filename (str): Filename to search for filename (str): Filename to search for
similarity_threshold (float): Minimum similarity ratio (0.0 to 1.0) matching_method (MatchMethodBase): Minimum similarity ratio (0.0 to 1.0)
Returns: Returns:
List[FileDocument]: List of matching files sorted by similarity score List[FileDocument]: List of matching files sorted by similarity score
@@ -143,21 +165,12 @@ class FileDocumentRepository:
# Get all files from database # Get all files from database
cursor = self.collection.find({}) cursor = self.collection.find({})
all_files = await cursor.to_list(length=None) all_files = await cursor.to_list(length=None)
all_documents = [FileDocument(**file_doc) for file_doc in all_files]
matches = [] if isinstance(matching_method, FuzzyMatching):
for file_doc in all_files: return fuzzy_matching(filename, all_documents, matching_method.threshold)
file_obj = FileDocument(**file_doc)
# Calculate similarity between search term and filename
similarity = SequenceMatcher(None, filename.lower(), file_obj.filename.lower()).ratio()
if similarity >= similarity_threshold:
matches.append((file_obj, similarity))
# Sort by similarity score (highest first) return subsequence_matching(filename, all_documents)
matches.sort(key=lambda x: x[1], reverse=True)
# Return only the FileDocument objects
return [match[0] for match in matches]
except PyMongoError: except PyMongoError:
return [] return []

View File

@@ -34,6 +34,14 @@ class UserRepository:
self.collection: AsyncIOMotorCollection = database.users self.collection: AsyncIOMotorCollection = database.users
self._ensure_indexes() self._ensure_indexes()
async def initialize(self):
"""
Initialize repository by ensuring required indexes exist.
Should be called after repository instantiation to setup database indexes.
"""
await self._ensure_indexes()
async def _ensure_indexes(self): async def _ensure_indexes(self):
""" """
Ensure required database indexes exist. Ensure required database indexes exist.

View File

@@ -0,0 +1,60 @@
from difflib import SequenceMatcher
from app.models.document import FileDocument
def _is_subsequence(query: str, target: str) -> tuple[bool, float]:
"""
Check if query is a subsequence of target (case-insensitive).
Returns (match, score).
Score is higher when the query letters are closer together in the target.
"""
query = query.lower()
target = target.lower()
positions = []
idx = 0
for char in query:
idx = target.find(char, idx)
if idx == -1:
return False, 0.0
positions.append(idx)
idx += 1
# Smallest window containing all matched chars
window_size = positions[-1] - positions[0] + 1
# Score: ratio of query length vs window size (compactness)
score = len(query) / window_size
return True, score
def fuzzy_matching(filename: str, documents: list[FileDocument], similarity_threshold: float = 0.7):
matches = []
for file_doc in documents:
# Calculate similarity between search term and filename
similarity = SequenceMatcher(None, filename.lower(), file_doc.filename.lower()).ratio()
if similarity >= similarity_threshold:
matches.append((file_doc, similarity))
# Sort by similarity score (highest first)
matches.sort(key=lambda x: x[1], reverse=True)
# Return only the FileDocument objects
return [match[0] for match in matches]
def subsequence_matching(query: str, documents: list[FileDocument]):
matches = []
for file_doc in documents:
matched, score = _is_subsequence(query, file_doc.filename)
if matched:
matches.append((file_doc, score))
# Sort by score (highest first)
matches.sort(key=lambda x: x[1], reverse=True)
# Return only the FileDocument objects
return [match[0] for match in matches]

View File

@@ -26,6 +26,7 @@ async def in_memory_repository():
repo = FileDocumentRepository() repo = FileDocumentRepository()
repo.db = db repo.db = db
repo.collection = db.files repo.collection = db.files
await repo.initialize()
return repo return repo
@@ -87,6 +88,7 @@ class TestFileDocumentRepositoryInitialization:
"""Test repository initialization.""" """Test repository initialization."""
# Arrange # Arrange
repo = FileDocumentRepository() repo = FileDocumentRepository()
await repo.initialize()
# Act & Assert (should not raise any exception) # Act & Assert (should not raise any exception)
assert repo.db is not None assert repo.db is not None
@@ -276,48 +278,6 @@ class TestFileDocumentRepositoryFuzzySearch:
assert "document1.pdf" in filenames assert "document1.pdf" in filenames
assert "similar_document.pdf" in filenames assert "similar_document.pdf" in filenames
@pytest.mark.asyncio
async def test_i_can_find_documents_with_custom_threshold(self, in_memory_repository, multiple_sample_documents):
"""Test finding documents with custom similarity threshold."""
# Arrange
for doc in multiple_sample_documents:
await in_memory_repository.create_document(doc)
# Act - Very high threshold should only match exact or very similar names
found_docs = await in_memory_repository.find_document_by_name("document1.pdf", similarity_threshold=0.9)
# Assert
assert len(found_docs) == 1
assert found_docs[0].filename == "document1.pdf"
@pytest.mark.asyncio
async def test_i_can_find_documents_sorted_by_similarity(self, in_memory_repository, multiple_sample_documents):
"""Test that documents are sorted by similarity score (highest first)."""
# Arrange
for doc in multiple_sample_documents:
await in_memory_repository.create_document(doc)
# Act
found_docs = await in_memory_repository.find_document_by_name("document1", similarity_threshold=0.3)
# Assert
assert len(found_docs) >= 1
# First result should be the most similar (document1.pdf)
assert found_docs[0].filename == "document1.pdf"
@pytest.mark.asyncio
async def test_i_cannot_find_documents_below_threshold(self, in_memory_repository, multiple_sample_documents):
"""Test that no documents are returned when similarity is below threshold."""
# Arrange
for doc in multiple_sample_documents:
await in_memory_repository.create_document(doc)
# Act
found_docs = await in_memory_repository.find_document_by_name("xyz", similarity_threshold=0.6)
# Assert
assert len(found_docs) == 0
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_i_cannot_find_documents_by_name_with_pymongo_error(self, in_memory_repository, mocker): async def test_i_cannot_find_documents_by_name_with_pymongo_error(self, in_memory_repository, mocker):
"""Test handling of PyMongo errors during name search.""" """Test handling of PyMongo errors during name search."""
@@ -377,11 +337,13 @@ class TestFileDocumentRepositoryListing:
# Create documents with different timestamps # Create documents with different timestamps
doc1 = sample_file_document.model_copy() doc1 = sample_file_document.model_copy()
doc1.filename = "oldest.pdf" doc1.filename = "oldest.pdf"
doc1.filepath = f"/path/to/{doc1.filename}"
doc1.file_hash = "hash1" + "0" * 58 doc1.file_hash = "hash1" + "0" * 58
doc1.detected_at = datetime.now() - timedelta(hours=2) doc1.detected_at = datetime.now() - timedelta(hours=2)
doc2 = sample_file_document.model_copy() doc2 = sample_file_document.model_copy()
doc2.filename = "newest.pdf" doc2.filename = "newest.pdf"
doc2.filepath = f"/path/to/{doc2.filename}"
doc2.file_hash = "hash2" + "0" * 58 doc2.file_hash = "hash2" + "0" * 58
doc2.detected_at = datetime.now() doc2.detected_at = datetime.now()
@@ -433,7 +395,6 @@ class TestFileDocumentRepositoryUpdate:
# Assert # Assert
assert updated_doc is not None assert updated_doc is not None
assert updated_doc.tags == sample_update_data["tags"]
assert updated_doc.file_type == sample_update_data["file_type"] assert updated_doc.file_type == sample_update_data["file_type"]
assert updated_doc.id == created_doc.id assert updated_doc.id == created_doc.id
assert updated_doc.filename == created_doc.filename # Unchanged fields remain assert updated_doc.filename == created_doc.filename # Unchanged fields remain
@@ -443,30 +404,30 @@ class TestFileDocumentRepositoryUpdate:
"""Test updating document with partial data.""" """Test updating document with partial data."""
# Arrange # Arrange
created_doc = await in_memory_repository.create_document(sample_file_document) created_doc = await in_memory_repository.create_document(sample_file_document)
partial_update = {"tags": ["new_tag"]} partial_update = {"file_type": FileType("txt")}
# Act # Act
updated_doc = await in_memory_repository.update_document(str(created_doc.id), partial_update) updated_doc = await in_memory_repository.update_document(str(created_doc.id), partial_update)
# Assert # Assert
assert updated_doc is not None assert updated_doc is not None
assert updated_doc.tags == ["new_tag"] assert updated_doc.file_type == FileType("txt")
assert updated_doc.filename == created_doc.filename # Should remain unchanged assert updated_doc.filename == created_doc.filename # Should remain unchanged
assert updated_doc.file_type == created_doc.file_type # Should remain unchanged assert updated_doc.filepath == created_doc.filepath # Should remain unchanged
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_i_can_update_document_filtering_none_values(self, in_memory_repository, sample_file_document): async def test_i_can_update_document_filtering_none_values(self, in_memory_repository, sample_file_document):
"""Test that None values are filtered out from update data.""" """Test that None values are filtered out from update data."""
# Arrange # Arrange
created_doc = await in_memory_repository.create_document(sample_file_document) created_doc = await in_memory_repository.create_document(sample_file_document)
update_with_none = {"tags": ["new_tag"], "file_type": None} update_with_none = {"metadata": {"tags": ["updated", "document"]}, "file_type": None}
# Act # Act
updated_doc = await in_memory_repository.update_document(str(created_doc.id), update_with_none) updated_doc = await in_memory_repository.update_document(str(created_doc.id), update_with_none)
# Assert # Assert
assert updated_doc is not None assert updated_doc is not None
assert updated_doc.tags == ["new_tag"] assert updated_doc.metadata == {"tags": ["updated", "document"]}
assert updated_doc.file_type == created_doc.file_type # Should remain unchanged (None filtered out) assert updated_doc.file_type == created_doc.file_type # Should remain unchanged (None filtered out)
@pytest.mark.asyncio @pytest.mark.asyncio
@@ -483,7 +444,7 @@ class TestFileDocumentRepositoryUpdate:
assert result is not None assert result is not None
assert result.filename == created_doc.filename assert result.filename == created_doc.filename
assert result.file_hash == created_doc.file_hash assert result.file_hash == created_doc.file_hash
assert result.tags == created_doc.tags assert result.metadata == created_doc.metadata
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_i_cannot_update_document_with_invalid_id(self, in_memory_repository, sample_update_data): async def test_i_cannot_update_document_with_invalid_id(self, in_memory_repository, sample_update_data):

View File

@@ -23,7 +23,7 @@ async def in_memory_repository():
client = AsyncMongoMockClient() client = AsyncMongoMockClient()
db = client.test_database db = client.test_database
repo = UserRepository(db) repo = UserRepository(db)
#await repo.initialize() await repo.initialize()
return repo return repo

View File

@@ -0,0 +1,87 @@
import os
from datetime import datetime
import pytest
from app.models.document import FileDocument, FileType
from app.utils.ducment_matching import fuzzy_matching, subsequence_matching
def get_doc(filename: str = None):
"""Sample FileDocument data for testing."""
return FileDocument(
filename=f"{filename}",
filepath=f"/path/to/{filename}",
file_hash="a1b2c3d4e5f6789012345678901234567890abcdef1234567890abcdef123456",
file_type=FileType(os.path.splitext(filename)[1].lstrip(".") or "txt"),
detected_at=datetime.now(),
)
class TestFuzzyMatching:
def test_i_can_find_exact_match_with_fuzzy(self):
# Exact match should always pass
docs = [get_doc(filename="hello.txt")]
result = fuzzy_matching("hello.txt", docs)
assert len(result) == 1
assert result[0].filename == "hello.txt"
def test_i_can_find_close_match_with_fuzzy(self):
# "helo.txt" should match "hello.txt" with high similarity
docs = [get_doc(filename="hello.txt")]
result = fuzzy_matching("helo.txt", docs, similarity_threshold=0.7)
assert len(result) == 1
assert result[0].filename == "hello.txt"
def test_i_cannot_find_dissimilar_match_with_fuzzy(self):
# "world.txt" should not match "hello.txt"
docs = [get_doc(filename="hello.txt")]
result = fuzzy_matching("world.txt", docs, similarity_threshold=0.7)
assert len(result) == 0
def test_i_can_sort_by_similarity_in_fuzzy(self):
# "helo.txt" is closer to "hello.txt" than "hllll.txt"
docs = [
get_doc(filename="hello.txt"),
get_doc(filename="hllll.txt"),
]
result = fuzzy_matching("helo.txt", docs, similarity_threshold=0.5)
assert result[0].filename == "hello.txt"
class TestSubsequenceMatching:
def test_i_can_match_subsequence_simple(self):
# "ifb" should match "ilFaitBeau.txt"
docs = [get_doc(filename="ilFaitBeau.txt")]
result = subsequence_matching("ifb", docs)
assert len(result) == 1
assert result[0].filename == "ilFaitBeau.txt"
def test_i_cannot_match_wrong_order_subsequence(self):
# "fib" should not match "ilFaitBeau.txt" because the order is wrong
docs = [get_doc(filename="ilFaitBeau.txt")]
result = subsequence_matching("bfi", docs)
assert len(result) == 0
def test_i_can_match_multiple_documents_subsequence(self):
# "ifb" should match both filenames, but "ilFaitBeau.txt" has a higher score
docs = [
get_doc(filename="ilFaitBeau.txt"),
get_doc(filename="information_base.txt"),
]
result = subsequence_matching("ifb", docs)
assert len(result) == 2
assert result[0].filename == "ilFaitBeau.txt"
assert result[1].filename == "information_base.txt"
def test_i_cannot_match_unrelated_subsequence(self):
# "xyz" should not match any file
docs = [get_doc(filename="ilFaitBeau.txt")]
result = subsequence_matching("xyz", docs)
assert len(result) == 0
def test_i_can_handle_case_insensitivity_in_subsequence(self):
# Matching should be case-insensitive
docs = [get_doc(filename="HelloWorld.txt")]
result = subsequence_matching("hw", docs)
assert len(result) == 1
assert result[0].filename == "HelloWorld.txt"