Working on document repository

This commit is contained in:
2025-09-18 22:53:51 +02:00
parent df86a3d998
commit c3ea80363f
12 changed files with 1597 additions and 526 deletions

View File

@@ -0,0 +1,602 @@
"""
Test suite for FileDocumentRepository with async/await support.
This module contains comprehensive tests for all FileDocumentRepository methods
using mongomock-motor for in-memory MongoDB testing.
"""
import pytest
from datetime import datetime
from typing import Dict, Any
import pytest_asyncio
from bson import ObjectId
from pymongo.errors import DuplicateKeyError, PyMongoError
from mongomock_motor import AsyncMongoMockClient
from app.database.repositories.document_repository import FileDocumentRepository
from app.models.document import FileDocument, FileType
@pytest_asyncio.fixture
async def in_memory_repository():
"""Create an in-memory FileDocumentRepository for testing."""
client = AsyncMongoMockClient()
db = client.test_database
repo = FileDocumentRepository()
repo.db = db
repo.collection = db.files
return repo
@pytest.fixture
def sample_file_document():
"""Sample FileDocument data for testing."""
return FileDocument(
filename="test_document.pdf",
filepath="/path/to/test_document.pdf",
file_hash="a1b2c3d4e5f6789012345678901234567890abcdef1234567890abcdef123456",
file_type=FileType("pdf"),
detected_at=datetime.now(),
)
@pytest.fixture
def sample_update_data():
"""Sample update data for testing."""
return {
"metadata": {"tags": ["updated", "document"]},
"file_type": FileType("txt"),
}
@pytest.fixture
def multiple_sample_documents():
"""Multiple FileDocument objects for list/search testing."""
base_time = datetime.now()
return [
FileDocument(
filename="document1.pdf",
filepath="/path/to/document1.pdf",
file_hash="hash1" + "0" * 58,
file_type=FileType("pdf"),
detected_at=base_time,
),
FileDocument(
filename="similar_document.pdf",
filepath="/path/to/similar_document.pdf",
file_hash="hash2" + "0" * 58,
file_type=FileType("pdf"),
detected_at=base_time,
),
FileDocument(
filename="completely_different.txt",
filepath="/path/to/completely_different.txt",
file_hash="hash3" + "0" * 58,
file_type=FileType("pdf"),
detected_at=base_time,
)
]
class TestFileDocumentRepositoryInitialization:
"""Tests for repository initialization."""
@pytest.mark.asyncio
async def test_i_can_initialize_repository(self):
"""Test repository initialization."""
# Arrange
repo = FileDocumentRepository()
# Act & Assert (should not raise any exception)
assert repo.db is not None
assert repo.collection is not None
class TestFileDocumentRepositoryCreation:
"""Tests for file document creation functionality."""
@pytest.mark.asyncio
async def test_i_can_create_document(self, in_memory_repository, sample_file_document):
"""Test successful file document creation."""
# Act
created_doc = await in_memory_repository.create_document(sample_file_document)
# Assert
assert created_doc is not None
assert created_doc.filename == sample_file_document.filename
assert created_doc.filepath == sample_file_document.filepath
assert created_doc.file_hash == sample_file_document.file_hash
assert created_doc.file_type == sample_file_document.file_type
assert created_doc.id is not None
assert isinstance(created_doc.id, ObjectId)
@pytest.mark.asyncio
async def test_i_can_create_document_without_id(self, in_memory_repository, sample_file_document):
"""Test creating document with _id set to None (should be removed)."""
# Arrange
sample_file_document.id = None
# Act
created_doc = await in_memory_repository.create_document(sample_file_document)
# Assert
assert created_doc is not None
assert created_doc.id is not None
assert isinstance(created_doc.id, ObjectId)
@pytest.mark.asyncio
async def test_i_cannot_create_duplicate_document(self, in_memory_repository, sample_file_document):
"""Test that creating document with duplicate hash raises DuplicateKeyError."""
# Arrange
await in_memory_repository.create_document(sample_file_document)
duplicate_doc = FileDocument(
filename="different_name.pdf",
filepath=sample_file_document.filepath,
file_hash="different_hash" + "0" * 58,
file_type=FileType("pdf"),
detected_at=datetime.now()
)
# Act & Assert
with pytest.raises(DuplicateKeyError) as exc_info:
await in_memory_repository.create_document(duplicate_doc)
assert "already exists" in str(exc_info.value)
@pytest.mark.asyncio
async def test_i_cannot_create_document_with_pymongo_error(self, in_memory_repository, sample_file_document, mocker):
"""Test handling of PyMongo errors during document creation."""
# Arrange
mocker.patch.object(in_memory_repository.collection, 'insert_one', side_effect=PyMongoError("Database error"))
# Act & Assert
with pytest.raises(ValueError) as exc_info:
await in_memory_repository.create_document(sample_file_document)
assert "Failed to create file document" in str(exc_info.value)
class TestFileDocumentRepositoryFinding:
"""Tests for file document finding functionality."""
@pytest.mark.asyncio
async def test_i_can_find_document_by_valid_id(self, in_memory_repository, sample_file_document):
"""Test finding document by valid ObjectId."""
# Arrange
created_doc = await in_memory_repository.create_document(sample_file_document)
# Act
found_doc = await in_memory_repository.find_document_by_id(str(created_doc.id))
# Assert
assert found_doc is not None
assert found_doc.id == created_doc.id
assert found_doc.filename == created_doc.filename
assert found_doc.file_hash == created_doc.file_hash
@pytest.mark.asyncio
async def test_i_cannot_find_document_with_invalid_id(self, in_memory_repository):
"""Test that invalid ObjectId returns None."""
# Act
found_doc = await in_memory_repository.find_document_by_id("invalid_id")
# Assert
assert found_doc is None
@pytest.mark.asyncio
async def test_i_cannot_find_document_by_nonexistent_id(self, in_memory_repository):
"""Test that nonexistent but valid ObjectId returns None."""
# Arrange
nonexistent_id = str(ObjectId())
# Act
found_doc = await in_memory_repository.find_document_by_id(nonexistent_id)
# Assert
assert found_doc is None
@pytest.mark.asyncio
async def test_i_can_find_document_by_hash(self, in_memory_repository, sample_file_document):
"""Test finding document by file hash."""
# Arrange
created_doc = await in_memory_repository.create_document(sample_file_document)
# Act
found_doc = await in_memory_repository.find_document_by_hash(sample_file_document.file_hash)
# Assert
assert found_doc is not None
assert found_doc.file_hash == created_doc.file_hash
assert found_doc.id == created_doc.id
@pytest.mark.asyncio
async def test_i_cannot_find_document_with_nonexistent_hash(self, in_memory_repository):
"""Test that nonexistent hash returns None."""
# Act
found_doc = await in_memory_repository.find_document_by_hash("nonexistent_hash")
# Assert
assert found_doc is None
@pytest.mark.asyncio
async def test_i_can_find_document_by_filepath(self, in_memory_repository, sample_file_document):
"""Test finding document by exact filepath."""
# Arrange
created_doc = await in_memory_repository.create_document(sample_file_document)
# Act
found_doc = await in_memory_repository.find_document_by_filepath(sample_file_document.filepath)
# Assert
assert found_doc is not None
assert found_doc.filepath == created_doc.filepath
assert found_doc.id == created_doc.id
@pytest.mark.asyncio
async def test_i_cannot_find_document_with_nonexistent_filepath(self, in_memory_repository):
"""Test that nonexistent filepath returns None."""
# Act
found_doc = await in_memory_repository.find_document_by_filepath("/nonexistent/path.pdf")
# Assert
assert found_doc is None
class TestFileDocumentRepositoryFuzzySearch:
"""Tests for fuzzy search functionality by filename."""
@pytest.mark.asyncio
async def test_i_can_find_documents_by_exact_name(self, in_memory_repository, multiple_sample_documents):
"""Test finding documents with exact filename match."""
# Arrange
for doc in multiple_sample_documents:
await in_memory_repository.create_document(doc)
# Act
found_docs = await in_memory_repository.find_document_by_name("document1.pdf")
# Assert
assert len(found_docs) == 1
assert found_docs[0].filename == "document1.pdf"
@pytest.mark.asyncio
async def test_i_can_find_documents_by_fuzzy_name(self, in_memory_repository, multiple_sample_documents):
"""Test finding documents with fuzzy matching using default threshold."""
# Arrange
for doc in multiple_sample_documents:
await in_memory_repository.create_document(doc)
# Act
found_docs = await in_memory_repository.find_document_by_name("document")
# Assert
assert len(found_docs) >= 2 # Should find document1.pdf and similar_document.pdf
filenames = [doc.filename for doc in found_docs]
assert "document1.pdf" in filenames
assert "similar_document.pdf" in filenames
@pytest.mark.asyncio
async def test_i_can_find_documents_with_custom_threshold(self, in_memory_repository, multiple_sample_documents):
"""Test finding documents with custom similarity threshold."""
# Arrange
for doc in multiple_sample_documents:
await in_memory_repository.create_document(doc)
# Act - Very high threshold should only match exact or very similar names
found_docs = await in_memory_repository.find_document_by_name("document1.pdf", similarity_threshold=0.9)
# Assert
assert len(found_docs) == 1
assert found_docs[0].filename == "document1.pdf"
@pytest.mark.asyncio
async def test_i_can_find_documents_sorted_by_similarity(self, in_memory_repository, multiple_sample_documents):
"""Test that documents are sorted by similarity score (highest first)."""
# Arrange
for doc in multiple_sample_documents:
await in_memory_repository.create_document(doc)
# Act
found_docs = await in_memory_repository.find_document_by_name("document1", similarity_threshold=0.3)
# Assert
assert len(found_docs) >= 1
# First result should be the most similar (document1.pdf)
assert found_docs[0].filename == "document1.pdf"
@pytest.mark.asyncio
async def test_i_cannot_find_documents_below_threshold(self, in_memory_repository, multiple_sample_documents):
"""Test that no documents are returned when similarity is below threshold."""
# Arrange
for doc in multiple_sample_documents:
await in_memory_repository.create_document(doc)
# Act
found_docs = await in_memory_repository.find_document_by_name("xyz", similarity_threshold=0.6)
# Assert
assert len(found_docs) == 0
@pytest.mark.asyncio
async def test_i_cannot_find_documents_by_name_with_pymongo_error(self, in_memory_repository, mocker):
"""Test handling of PyMongo errors during name search."""
# Arrange
mocker.patch.object(in_memory_repository.collection, 'find', side_effect=PyMongoError("Database error"))
# Act
found_docs = await in_memory_repository.find_document_by_name("test")
# Assert
assert found_docs == []
class TestFileDocumentRepositoryListing:
"""Tests for document listing functionality."""
@pytest.mark.asyncio
async def test_i_can_list_documents_with_default_pagination(self, in_memory_repository, multiple_sample_documents):
"""Test listing documents with default pagination."""
# Arrange
for doc in multiple_sample_documents:
await in_memory_repository.create_document(doc)
# Act
docs = await in_memory_repository.list_documents()
# Assert
assert len(docs) == len(multiple_sample_documents)
assert all(isinstance(doc, FileDocument) for doc in docs)
@pytest.mark.asyncio
async def test_i_can_list_documents_with_custom_pagination(self, in_memory_repository, multiple_sample_documents):
"""Test listing documents with custom pagination."""
# Arrange
for doc in multiple_sample_documents:
await in_memory_repository.create_document(doc)
# Act
docs_page1 = await in_memory_repository.list_documents(skip=0, limit=2)
docs_page2 = await in_memory_repository.list_documents(skip=2, limit=2)
# Assert
assert len(docs_page1) == 2
assert len(docs_page2) == 1 # Only 3 total documents
# Ensure no overlap between pages
page1_ids = [doc.id for doc in docs_page1]
page2_ids = [doc.id for doc in docs_page2]
assert len(set(page1_ids).intersection(set(page2_ids))) == 0
@pytest.mark.asyncio
async def test_i_can_list_documents_sorted_by_date(self, in_memory_repository, sample_file_document):
"""Test that documents are sorted by detected_at in descending order."""
# Arrange
from datetime import timedelta
# Create documents with different timestamps
doc1 = sample_file_document.model_copy()
doc1.filename = "oldest.pdf"
doc1.file_hash = "hash1" + "0" * 58
doc1.detected_at = datetime.now() - timedelta(hours=2)
doc2 = sample_file_document.model_copy()
doc2.filename = "newest.pdf"
doc2.file_hash = "hash2" + "0" * 58
doc2.detected_at = datetime.now()
await in_memory_repository.create_document(doc1)
await in_memory_repository.create_document(doc2)
# Act
docs = await in_memory_repository.list_documents()
# Assert
assert len(docs) == 2
assert docs[0].filename == "newest.pdf" # Most recent first
assert docs[1].filename == "oldest.pdf"
@pytest.mark.asyncio
async def test_i_can_list_empty_documents(self, in_memory_repository):
"""Test listing documents from empty collection."""
# Act
docs = await in_memory_repository.list_documents()
# Assert
assert docs == []
@pytest.mark.asyncio
async def test_i_cannot_list_documents_with_pymongo_error(self, in_memory_repository, mocker):
"""Test handling of PyMongo errors during document listing."""
# Arrange
mocker.patch.object(in_memory_repository.collection, 'find', side_effect=PyMongoError("Database error"))
# Act
docs = await in_memory_repository.list_documents()
# Assert
assert docs == []
class TestFileDocumentRepositoryUpdate:
"""Tests for document update functionality."""
@pytest.mark.asyncio
async def test_i_can_update_document_successfully(self, in_memory_repository, sample_file_document,
sample_update_data):
"""Test successful document update."""
# Arrange
created_doc = await in_memory_repository.create_document(sample_file_document)
# Act
updated_doc = await in_memory_repository.update_document(str(created_doc.id), sample_update_data)
# Assert
assert updated_doc is not None
assert updated_doc.tags == sample_update_data["tags"]
assert updated_doc.file_type == sample_update_data["file_type"]
assert updated_doc.id == created_doc.id
assert updated_doc.filename == created_doc.filename # Unchanged fields remain
@pytest.mark.asyncio
async def test_i_can_update_document_with_partial_data(self, in_memory_repository, sample_file_document):
"""Test updating document with partial data."""
# Arrange
created_doc = await in_memory_repository.create_document(sample_file_document)
partial_update = {"tags": ["new_tag"]}
# Act
updated_doc = await in_memory_repository.update_document(str(created_doc.id), partial_update)
# Assert
assert updated_doc is not None
assert updated_doc.tags == ["new_tag"]
assert updated_doc.filename == created_doc.filename # Should remain unchanged
assert updated_doc.file_type == created_doc.file_type # Should remain unchanged
@pytest.mark.asyncio
async def test_i_can_update_document_filtering_none_values(self, in_memory_repository, sample_file_document):
"""Test that None values are filtered out from update data."""
# Arrange
created_doc = await in_memory_repository.create_document(sample_file_document)
update_with_none = {"tags": ["new_tag"], "file_type": None}
# Act
updated_doc = await in_memory_repository.update_document(str(created_doc.id), update_with_none)
# Assert
assert updated_doc is not None
assert updated_doc.tags == ["new_tag"]
assert updated_doc.file_type == created_doc.file_type # Should remain unchanged (None filtered out)
@pytest.mark.asyncio
async def test_i_can_update_document_with_empty_data(self, in_memory_repository, sample_file_document):
"""Test updating document with empty data returns current document."""
# Arrange
created_doc = await in_memory_repository.create_document(sample_file_document)
empty_update = {}
# Act
result = await in_memory_repository.update_document(str(created_doc.id), empty_update)
# Assert
assert result is not None
assert result.filename == created_doc.filename
assert result.file_hash == created_doc.file_hash
assert result.tags == created_doc.tags
@pytest.mark.asyncio
async def test_i_cannot_update_document_with_invalid_id(self, in_memory_repository, sample_update_data):
"""Test that updating with invalid ID returns None."""
# Act
result = await in_memory_repository.update_document("invalid_id", sample_update_data)
# Assert
assert result is None
@pytest.mark.asyncio
async def test_i_cannot_update_nonexistent_document(self, in_memory_repository, sample_update_data):
"""Test that updating nonexistent document returns None."""
# Arrange
nonexistent_id = str(ObjectId())
# Act
result = await in_memory_repository.update_document(nonexistent_id, sample_update_data)
# Assert
assert result is None
@pytest.mark.asyncio
async def test_i_cannot_update_document_with_pymongo_error(self, in_memory_repository, sample_file_document,
sample_update_data, mocker):
"""Test handling of PyMongo errors during document update."""
# Arrange
created_doc = await in_memory_repository.create_document(sample_file_document)
mocker.patch.object(in_memory_repository.collection, 'find_one_and_update',
side_effect=PyMongoError("Database error"))
# Act
result = await in_memory_repository.update_document(str(created_doc.id), sample_update_data)
# Assert
assert result is None
class TestFileDocumentRepositoryDeletion:
"""Tests for document deletion functionality."""
@pytest.mark.asyncio
async def test_i_can_delete_existing_document(self, in_memory_repository, sample_file_document):
"""Test successful document deletion."""
# Arrange
created_doc = await in_memory_repository.create_document(sample_file_document)
# Act
deletion_result = await in_memory_repository.delete_document(str(created_doc.id))
# Assert
assert deletion_result is True
# Verify document is actually deleted
found_doc = await in_memory_repository.find_document_by_id(str(created_doc.id))
assert found_doc is None
@pytest.mark.asyncio
async def test_i_cannot_delete_document_with_invalid_id(self, in_memory_repository):
"""Test that deleting with invalid ID returns False."""
# Act
result = await in_memory_repository.delete_document("invalid_id")
# Assert
assert result is False
@pytest.mark.asyncio
async def test_i_cannot_delete_nonexistent_document(self, in_memory_repository):
"""Test that deleting nonexistent document returns False."""
# Arrange
nonexistent_id = str(ObjectId())
# Act
result = await in_memory_repository.delete_document(nonexistent_id)
# Assert
assert result is False
@pytest.mark.asyncio
async def test_i_cannot_delete_document_with_pymongo_error(self, in_memory_repository, sample_file_document, mocker):
"""Test handling of PyMongo errors during document deletion."""
# Arrange
created_doc = await in_memory_repository.create_document(sample_file_document)
mocker.patch.object(in_memory_repository.collection, 'delete_one', side_effect=PyMongoError("Database error"))
# Act
result = await in_memory_repository.delete_document(str(created_doc.id))
# Assert
assert result is False
class TestFileDocumentRepositoryUtilities:
"""Tests for utility methods."""
@pytest.mark.asyncio
async def test_i_can_count_documents(self, in_memory_repository, sample_file_document):
"""Test counting documents."""
# Arrange
initial_count = await in_memory_repository.count_documents()
await in_memory_repository.create_document(sample_file_document)
# Act
final_count = await in_memory_repository.count_documents()
# Assert
assert final_count == initial_count + 1
@pytest.mark.asyncio
async def test_i_can_count_zero_documents(self, in_memory_repository):
"""Test counting documents in empty collection."""
# Act
count = await in_memory_repository.count_documents()
# Assert
assert count == 0

View File

@@ -1,388 +1,301 @@
"""
Unit tests for user repository module.
Test suite for UserRepository with async/await support.
Tests all CRUD operations for users with MongoDB mocking
to ensure proper database interactions without requiring
actual MongoDB instance during tests.
This module contains comprehensive tests for all UserRepository methods
using mongomock-motor for in-memory MongoDB testing.
"""
import pytest
from unittest.mock import Mock, MagicMock
from datetime import datetime
import pytest_asyncio
from bson import ObjectId
from pymongo.errors import DuplicateKeyError
from mongomock_motor import AsyncMongoMockClient
from app.database.repositories.user_repository import UserRepository
from app.models.user import UserCreate, UserUpdate, UserInDB, UserRole
from app.models.user import UserCreate, UserUpdate, UserInDB
@pytest.fixture
def mock_database():
"""Create mock database with users collection."""
db = Mock()
collection = Mock()
db.users = collection
return db
@pytest.fixture
def user_repository(mock_database):
"""Create UserRepository instance with mocked database."""
return UserRepository(mock_database)
@pytest_asyncio.fixture
async def in_memory_repository():
"""Create an in-memory UserRepository for testing."""
client = AsyncMongoMockClient()
db = client.test_database
repo = UserRepository(db)
#await repo.initialize()
return repo
@pytest.fixture
def sample_user_create():
"""Create sample UserCreate object for testing."""
"""Sample UserCreate data for testing."""
return UserCreate(
username="testuser",
email="test@example.com",
password="#Passw0rd",
role=UserRole.USER,
password="#TestPassword123",
role="user"
)
@pytest.fixture
def sample_user_update():
"""Create sample UserUpdate object for testing."""
"""Sample UserUpdate data for testing."""
return UserUpdate(
username="updateduser",
email="updated@example.com",
password="#updated-Passw0rd",
role=UserRole.ADMIN,
is_active=False,
role="admin"
)
def test_i_can_create_user(user_repository, mock_database, sample_user_create):
"""Test successful user creation."""
# Mock successful insertion
mock_result = Mock()
mock_result.inserted_id = ObjectId()
mock_database.users.insert_one.return_value = mock_result
class TestUserRepositoryCreation:
"""Tests for user creation functionality."""
result = user_repository.create_user(sample_user_create)
@pytest.mark.asyncio
async def test_i_can_create_user(self, in_memory_repository, sample_user_create):
"""Test successful user creation."""
# Act
created_user = await in_memory_repository.create_user(sample_user_create)
# Assert
assert created_user is not None
assert created_user.username == sample_user_create.username
assert created_user.email == sample_user_create.email
assert created_user.role == sample_user_create.role
assert created_user.is_active is True
assert created_user.id is not None
assert created_user.created_at is not None
assert created_user.updated_at is not None
assert created_user.hashed_password != sample_user_create.password # Should be hashed
assert isinstance(result, UserInDB)
assert result.username == sample_user_create.username
assert result.email == sample_user_create.email
assert result.hashed_password is not None
assert result.role == sample_user_create.role
assert result.is_active is True
assert result.id is not None
assert isinstance(result.created_at, datetime)
assert isinstance(result.updated_at, datetime)
# Verify insert_one was called with correct data
mock_database.users.insert_one.assert_called_once()
call_args = mock_database.users.insert_one.call_args[0][0]
assert call_args["username"] == sample_user_create.username
assert call_args["email"] == sample_user_create.email
@pytest.mark.asyncio
async def test_i_cannot_create_user_with_duplicate_username(self, in_memory_repository, sample_user_create):
"""Test that creating user with duplicate username raises DuplicateKeyError."""
# Arrange
await in_memory_repository.create_user(sample_user_create)
# Act & Assert
with pytest.raises(DuplicateKeyError) as exc_info:
await in_memory_repository.create_user(sample_user_create)
assert "already exists" in str(exc_info.value)
def test_i_cannot_create_duplicate_username(user_repository, mock_database, sample_user_create):
"""Test that creating user with duplicate username raises DuplicateKeyError."""
# Mock DuplicateKeyError from MongoDB
mock_database.users.insert_one.side_effect = DuplicateKeyError("duplicate key error")
class TestUserRepositoryFinding:
"""Tests for user finding functionality."""
with pytest.raises(DuplicateKeyError, match="User with username 'testuser' already exists"):
user_repository.create_user(sample_user_create)
@pytest.mark.asyncio
async def test_i_can_find_user_by_id(self, in_memory_repository, sample_user_create):
"""Test finding user by valid ID."""
# Arrange
created_user = await in_memory_repository.create_user(sample_user_create)
# Act
found_user = await in_memory_repository.find_user_by_id(str(created_user.id))
# Assert
assert found_user is not None
assert found_user.id == created_user.id
assert found_user.username == created_user.username
assert found_user.email == created_user.email
@pytest.mark.asyncio
async def test_i_cannot_find_user_by_invalid_id(self, in_memory_repository):
"""Test that invalid ObjectId returns None."""
# Act
found_user = await in_memory_repository.find_user_by_id("invalid_id")
# Assert
assert found_user is None
@pytest.mark.asyncio
async def test_i_cannot_find_user_by_nonexistent_id(self, in_memory_repository):
"""Test that nonexistent but valid ObjectId returns None."""
# Arrange
nonexistent_id = str(ObjectId())
# Act
found_user = await in_memory_repository.find_user_by_id(nonexistent_id)
# Assert
assert found_user is None
@pytest.mark.asyncio
async def test_i_can_find_user_by_username(self, in_memory_repository, sample_user_create):
"""Test finding user by username."""
# Arrange
created_user = await in_memory_repository.create_user(sample_user_create)
# Act
found_user = await in_memory_repository.find_user_by_username(sample_user_create.username)
# Assert
assert found_user is not None
assert found_user.username == created_user.username
assert found_user.id == created_user.id
@pytest.mark.asyncio
async def test_i_cannot_find_user_by_nonexistent_username(self, in_memory_repository):
"""Test that nonexistent username returns None."""
# Act
found_user = await in_memory_repository.find_user_by_username("nonexistent")
# Assert
assert found_user is None
@pytest.mark.asyncio
async def test_i_can_find_user_by_email(self, in_memory_repository, sample_user_create):
"""Test finding user by email."""
# Arrange
created_user = await in_memory_repository.create_user(sample_user_create)
# Act
found_user = await in_memory_repository.find_user_by_email(str(sample_user_create.email))
# Assert
assert found_user is not None
assert found_user.email == created_user.email
assert found_user.id == created_user.id
@pytest.mark.asyncio
async def test_i_cannot_find_user_by_nonexistent_email(self, in_memory_repository):
"""Test that nonexistent email returns None."""
# Act
found_user = await in_memory_repository.find_user_by_email("nonexistent@example.com")
# Assert
assert found_user is None
def test_i_can_find_user_by_username(user_repository, mock_database):
"""Test finding user by username."""
# Mock user document from database
user_doc = {
"_id": ObjectId(),
"username": "testuser",
"email": "test@example.com",
"hashed_password": "hashed_password_123",
"role": "user",
"is_active": True,
"created_at": datetime.now(),
"updated_at": datetime.now()
}
mock_database.users.find_one.return_value = user_doc
class TestUserRepositoryUpdate:
"""Tests for user update functionality."""
result = user_repository.find_user_by_username("testuser")
@pytest.mark.asyncio
async def test_i_can_update_user(self, in_memory_repository, sample_user_create, sample_user_update):
"""Test successful user update."""
# Arrange
created_user = await in_memory_repository.create_user(sample_user_create)
original_updated_at = created_user.updated_at
# Act
updated_user = await in_memory_repository.update_user(str(created_user.id), sample_user_update)
# Assert
assert updated_user is not None
assert updated_user.username == sample_user_update.username
assert updated_user.email == sample_user_update.email
assert updated_user.role == sample_user_update.role
assert updated_user.id == created_user.id
assert isinstance(result, UserInDB)
assert result.username == "testuser"
assert result.email == "test@example.com"
@pytest.mark.asyncio
async def test_i_cannot_update_user_with_invalid_id(self, in_memory_repository, sample_user_update):
"""Test that updating with invalid ID returns None."""
# Act
result = await in_memory_repository.update_user("invalid_id", sample_user_update)
# Assert
assert result is None
mock_database.users.find_one.assert_called_once_with({"username": "testuser"})
@pytest.mark.asyncio
async def test_i_can_update_user_with_partial_data(self, in_memory_repository, sample_user_create):
"""Test updating user with partial data."""
# Arrange
created_user = await in_memory_repository.create_user(sample_user_create)
partial_update = UserUpdate(username="newusername")
# Act
updated_user = await in_memory_repository.update_user(str(created_user.id), partial_update)
# Assert
assert updated_user is not None
assert updated_user.username == "newusername"
assert updated_user.email == created_user.email # Should remain unchanged
assert updated_user.role == created_user.role # Should remain unchanged
@pytest.mark.asyncio
async def test_i_can_update_user_with_empty_data(self, in_memory_repository, sample_user_create):
"""Test updating user with empty data returns current user."""
# Arrange
created_user = await in_memory_repository.create_user(sample_user_create)
empty_update = UserUpdate()
# Act
result = await in_memory_repository.update_user(str(created_user.id), empty_update)
# Assert
assert result is not None
assert result.username == created_user.username
assert result.email == created_user.email
def test_i_cannot_find_nonexistent_user_by_username(user_repository, mock_database):
"""Test finding nonexistent user by username returns None."""
mock_database.users.find_one.return_value = None
class TestUserRepositoryDeletion:
"""Tests for user deletion functionality."""
result = user_repository.find_user_by_username("nonexistent")
@pytest.mark.asyncio
async def test_i_can_delete_user(self, in_memory_repository, sample_user_create):
"""Test successful user deletion."""
# Arrange
created_user = await in_memory_repository.create_user(sample_user_create)
# Act
deletion_result = await in_memory_repository.delete_user(str(created_user.id))
# Assert
assert deletion_result is True
# Verify user is actually deleted
found_user = await in_memory_repository.find_user_by_id(str(created_user.id))
assert found_user is None
assert result is None
mock_database.users.find_one.assert_called_once_with({"username": "nonexistent"})
@pytest.mark.asyncio
async def test_i_cannot_delete_user_with_invalid_id(self, in_memory_repository):
"""Test that deleting with invalid ID returns False."""
# Act
result = await in_memory_repository.delete_user("invalid_id")
# Assert
assert result is False
@pytest.mark.asyncio
async def test_i_cannot_delete_nonexistent_user(self, in_memory_repository):
"""Test that deleting nonexistent user returns False."""
# Arrange
nonexistent_id = str(ObjectId())
# Act
result = await in_memory_repository.delete_user(nonexistent_id)
# Assert
assert result is False
def test_i_can_find_user_by_id(user_repository, mock_database):
"""Test finding user by ID."""
user_id = ObjectId()
user_doc = {
"_id": user_id,
"username": "testuser",
"email": "test@example.com",
"hashed_password": "hashed_password_123",
"role": "user",
"is_active": True,
"created_at": datetime.now(),
"updated_at": datetime.now()
}
mock_database.users.find_one.return_value = user_doc
class TestUserRepositoryUtilities:
"""Tests for utility methods."""
result = user_repository.find_user_by_id(str(user_id))
@pytest.mark.asyncio
async def test_i_can_count_users(self, in_memory_repository, sample_user_create):
"""Test counting users."""
# Arrange
initial_count = await in_memory_repository.count_users()
await in_memory_repository.create_user(sample_user_create)
# Act
final_count = await in_memory_repository.count_users()
# Assert
assert final_count == initial_count + 1
assert isinstance(result, UserInDB)
assert result.id == user_id
assert result.username == "testuser"
mock_database.users.find_one.assert_called_once_with({"_id": user_id})
@pytest.mark.asyncio
async def test_i_can_check_user_exists(self, in_memory_repository, sample_user_create):
"""Test checking if user exists."""
# Arrange
await in_memory_repository.create_user(sample_user_create)
# Act
exists = await in_memory_repository.user_exists(sample_user_create.username)
not_exists = await in_memory_repository.user_exists("nonexistent")
# Assert
assert exists is True
assert not_exists is False
def test_i_cannot_find_user_with_invalid_id(user_repository, mock_database):
"""Test finding user with invalid ObjectId returns None."""
result = user_repository.find_user_by_id("invalid_id")
assert result is None
# find_one should not be called with invalid ID
mock_database.users.find_one.assert_not_called()
def test_i_cannot_find_nonexistent_user_by_id(user_repository, mock_database):
"""Test finding nonexistent user by ID returns None."""
user_id = ObjectId()
mock_database.users.find_one.return_value = None
result = user_repository.find_user_by_id(str(user_id))
assert result is None
mock_database.users.find_one.assert_called_once_with({"_id": user_id})
def test_i_can_find_user_by_email(user_repository, mock_database):
"""Test finding user by email address."""
user_doc = {
"_id": ObjectId(),
"username": "testuser",
"email": "test@example.com",
"hashed_password": "hashed_password_123",
"role": "user",
"is_active": True,
"created_at": datetime.now(),
"updated_at": datetime.now()
}
mock_database.users.find_one.return_value = user_doc
result = user_repository.find_user_by_email("test@example.com")
assert isinstance(result, UserInDB)
assert result.email == "test@example.com"
mock_database.users.find_one.assert_called_once_with({"email": "test@example.com"})
def test_i_can_update_user(user_repository, mock_database, sample_user_update):
"""Test updating user information."""
user_id = ObjectId()
# Mock successful update
mock_update_result = Mock()
mock_update_result.matched_count = 1
mock_database.users.update_one.return_value = mock_update_result
# Mock find_one for returning updated user
user_to_update = {
"_id": user_id,
"username": "testuser",
"email": "updated@example.com",
"hashed_password": "hashed_password_123",
"role": "admin",
"is_active": False,
"created_at": datetime.now(),
"updated_at": datetime.now()
}
mock_database.users.find_one.return_value = user_to_update
result = user_repository.update_user(str(user_id), sample_user_update)
# Mock will return the first find_one result, which is the updated user_to_update
# assert isinstance(result, UserInDB)
# assert result.username == "updateduser"
# assert result.email == "updated@example.com"
# assert result.role == UserRole.ADMIN
# assert result.is_active is False
# Verify update_one was called with correct data
mock_database.users.update_one.assert_called_once()
call_args = mock_database.users.update_one.call_args
assert call_args[0][0] == {"_id": user_id} # Filter
update_data = call_args[0][1]["$set"] # Update data
assert update_data["email"] == "updated@example.com"
assert update_data["role"] == UserRole.ADMIN
assert update_data["is_active"] is False
assert "updated_at" in update_data
def test_i_cannot_update_nonexistent_user(user_repository, mock_database, sample_user_update):
"""Test updating nonexistent user returns None."""
user_id = ObjectId()
# Mock no match found
mock_update_result = Mock()
mock_update_result.matched_count = 0
mock_database.users.update_one.return_value = mock_update_result
result = user_repository.update_user(str(user_id), sample_user_update)
assert result is None
def test_i_cannot_update_user_with_invalid_id(user_repository, mock_database, sample_user_update):
"""Test updating user with invalid ID returns None."""
result = user_repository.update_user("invalid_id", sample_user_update)
assert result is None
# update_one should not be called with invalid ID
mock_database.users.update_one.assert_not_called()
def test_i_can_delete_user(user_repository, mock_database):
"""Test successful user deletion."""
user_id = ObjectId()
# Mock successful deletion
mock_delete_result = Mock()
mock_delete_result.deleted_count = 1
mock_database.users.delete_one.return_value = mock_delete_result
result = user_repository.delete_user(str(user_id))
assert result is True
mock_database.users.delete_one.assert_called_once_with({"_id": user_id})
def test_i_cannot_delete_nonexistent_user(user_repository, mock_database):
"""Test deleting nonexistent user returns False."""
user_id = ObjectId()
# Mock no deletion occurred
mock_delete_result = Mock()
mock_delete_result.deleted_count = 0
mock_database.users.delete_one.return_value = mock_delete_result
result = user_repository.delete_user(str(user_id))
assert result is False
def test_i_cannot_delete_user_with_invalid_id(user_repository, mock_database):
"""Test deleting user with invalid ID returns False."""
result = user_repository.delete_user("invalid_id")
assert result is False
# delete_one should not be called with invalid ID
mock_database.users.delete_one.assert_not_called()
def test_i_can_list_users(user_repository, mock_database):
"""Test listing users with pagination."""
# Mock cursor with user documents
user_docs = [
{
"_id": ObjectId(),
"username": "user1",
"email": "user1@example.com",
"hashed_password": "hash1",
"role": "user",
"is_active": True,
"created_at": datetime.now(),
"updated_at": datetime.now()
},
{
"_id": ObjectId(),
"username": "user2",
"email": "user2@example.com",
"hashed_password": "hash2",
"role": "admin",
"is_active": False,
"created_at": datetime.now(),
"updated_at": datetime.now()
}
]
mock_cursor = Mock()
mock_cursor.__iter__ = Mock(return_value=iter(user_docs))
mock_cursor.skip.return_value = mock_cursor
mock_cursor.limit.return_value = mock_cursor
mock_database.users.find.return_value = mock_cursor
result = user_repository.list_users(skip=10, limit=50)
assert len(result) == 2
assert all(isinstance(user, UserInDB) for user in result)
assert result[0].username == "user1"
assert result[1].username == "user2"
mock_database.users.find.assert_called_once()
mock_cursor.skip.assert_called_once_with(10)
mock_cursor.limit.assert_called_once_with(50)
def test_i_can_count_users(user_repository, mock_database):
"""Test counting total users."""
mock_database.users.count_documents.return_value = 42
result = user_repository.count_users()
assert result == 42
mock_database.users.count_documents.assert_called_once_with({})
def test_i_can_check_user_exists(user_repository, mock_database):
"""Test checking if user exists by username."""
mock_database.users.count_documents.return_value = 1
result = user_repository.user_exists("testuser")
assert result is True
mock_database.users.count_documents.assert_called_once_with({"username": "testuser"})
def test_i_can_check_user_does_not_exist(user_repository, mock_database):
"""Test checking if user does not exist by username."""
mock_database.users.count_documents.return_value = 0
result = user_repository.user_exists("nonexistent")
assert result is False
mock_database.users.count_documents.assert_called_once_with({"username": "nonexistent"})
def test_i_can_create_indexes_on_initialization(mock_database):
"""Test that indexes are created when repository is initialized."""
# Mock create_index to not raise exception
mock_database.users.create_index.return_value = None
repository = UserRepository(mock_database)
mock_database.users.create_index.assert_called_once_with("username", unique=True)
def test_i_can_handle_index_creation_error(mock_database):
"""Test that index creation errors are handled gracefully."""
# Mock create_index to raise exception (index already exists)
mock_database.users.create_index.side_effect = Exception("Index already exists")
# Should not raise exception
repository = UserRepository(mock_database)
assert repository is not None
mock_database.users.create_index.assert_called_once_with("username", unique=True)