Implemented default pipeline

This commit is contained in:
2025-09-26 22:08:39 +02:00
parent f1b551d243
commit 4de732b0ae
56 changed files with 4534 additions and 2837 deletions

0
tests/api/__init__.py Normal file
View File

View File

@@ -0,0 +1,149 @@
from datetime import datetime
from unittest.mock import MagicMock
import pytest
from fastapi import status, HTTPException
from fastapi.testclient import TestClient
from mongomock.mongo_client import MongoClient
from app.api.dependencies import get_auth_service, get_user_service, get_current_user
from app.main import app # Assuming you have FastAPI app defined in app/main.py
from app.models.auth import UserRole
from app.models.types import PyObjectId
from app.models.user import UserInDB
from app.services.auth_service import AuthService
from app.services.user_service import UserService
@pytest.fixture
def client():
return TestClient(app)
@pytest.fixture
def fake_user():
return UserInDB(
_id=PyObjectId(),
username="testuser",
email="test@example.com",
role=UserRole.USER,
is_active=True,
hashed_password="hashed-secret",
created_at=datetime(2025, 1, 1),
updated_at=datetime(2025, 1, 2),
)
def override_auth_service():
mock = MagicMock(spec=AuthService)
mock.verify_user_password.return_value = True
mock.create_access_token.return_value = "fake-jwt-token"
return mock
def override_user_service(fake_user):
mock = MagicMock(spec=UserService)
mock.get_user_by_username.return_value = fake_user
return mock
def override_get_current_user(fake_user):
def _override():
return fake_user
return _override
def override_get_database():
def _override():
client = MongoClient()
db = client.test_database
return db
return _override
# ---------------------- TESTS FOR /auth/login ----------------------
class TestLogin:
def test_i_can_login_with_valid_credentials(self, client, fake_user):
auth_service = override_auth_service()
user_service = override_user_service(fake_user)
client.app.dependency_overrides[get_auth_service] = lambda: auth_service
client.app.dependency_overrides[get_user_service] = lambda: user_service
response = client.post(
"/auth/login",
data={"username": "testuser", "password": "secret"},
)
assert response.status_code == status.HTTP_200_OK
data = response.json()
assert "access_token" in data
assert data["user"]["username"] == "testuser"
def test_i_cannot_login_with_invalid_username(self, client):
auth_service = override_auth_service()
user_service = MagicMock(spec=UserService)
user_service.get_user_by_username.return_value = None
client.app.dependency_overrides[get_auth_service] = lambda: auth_service
client.app.dependency_overrides[get_user_service] = lambda: user_service
response = client.post(
"/auth/login",
data={"username": "unknown", "password": "secret"},
)
assert response.status_code == status.HTTP_401_UNAUTHORIZED
def test_i_cannot_login_with_inactive_user(self, client, fake_user):
fake_user.is_active = False
auth_service = override_auth_service()
user_service = override_user_service(fake_user)
client.app.dependency_overrides[get_auth_service] = lambda: auth_service
client.app.dependency_overrides[get_user_service] = lambda: user_service
response = client.post(
"/auth/login",
data={"username": "testuser", "password": "secret"},
)
assert response.status_code == status.HTTP_401_UNAUTHORIZED
def test_i_cannot_login_with_wrong_password(self, client, fake_user):
auth_service = override_auth_service()
auth_service.verify_user_password.return_value = False
user_service = override_user_service(fake_user)
client.app.dependency_overrides[get_auth_service] = lambda: auth_service
client.app.dependency_overrides[get_user_service] = lambda: user_service
response = client.post(
"/auth/login",
data={"username": "testuser", "password": "wrong"},
)
assert response.status_code == status.HTTP_401_UNAUTHORIZED
# ---------------------- TESTS FOR /auth/me ----------------------
class TesteMe:
def test_i_can_get_current_user_profile(self, client, fake_user):
client.app.dependency_overrides[get_current_user] = override_get_current_user(fake_user)
response = client.get("/auth/me")
assert response.status_code == status.HTTP_200_OK
data = response.json()
assert data["username"] == fake_user.username
assert data["email"] == fake_user.email
def test_i_cannot_get_profile_without_authentication(self, client, monkeypatch):
def raise_http_exception():
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED)
client.app.dependency_overrides[get_current_user] = raise_http_exception
response = client.get("/auth/me")
assert response.status_code == status.HTTP_401_UNAUTHORIZED

167
tests/api/test_users.py Normal file
View File

@@ -0,0 +1,167 @@
# File: tests/api/test_users.py
from datetime import datetime
from unittest.mock import MagicMock
import pytest
from fastapi import status
from fastapi.testclient import TestClient
from app.api.dependencies import get_admin_user, get_user_service
from app.main import app
from app.models.auth import UserRole
from app.models.types import PyObjectId
from app.models.user import UserInDB, UserCreate
from app.services.user_service import UserService
# -----------------------
# Fixtures
# -----------------------
@pytest.fixture
def fake_user_admin():
return UserInDB(
_id=PyObjectId(),
username="admin",
email="admin@example.com",
role=UserRole.ADMIN,
is_active=True,
hashed_password="hashed-secret",
created_at=datetime(2025, 1, 1),
updated_at=datetime(2025, 1, 2),
)
@pytest.fixture
def fake_user_response():
return UserInDB(
_id=PyObjectId(),
username="other",
email="other@example.com",
role=UserRole.USER,
is_active=True,
hashed_password="hashed-secret-2",
created_at=datetime(2025, 1, 1),
updated_at=datetime(2025, 1, 2),
)
@pytest.fixture
def client(fake_user_admin):
# Fake admin dependency
def get_admin_user_override():
return fake_user_admin
# Fake user service
user_service_mock = MagicMock(spec=UserService)
def get_user_service_override():
return user_service_mock
client = TestClient(app)
client.app.dependency_overrides = {
get_admin_user: get_admin_user_override,
get_user_service: get_user_service_override
}
client.user_service_mock = user_service_mock
return client
# -----------------------
# Tests
# -----------------------
class TestListUsers:
def test_i_can_list_users(self, client, fake_user_admin, fake_user_response):
client.user_service_mock.list_users.return_value = [fake_user_admin, fake_user_response]
response = client.get("/users")
assert response.status_code == status.HTTP_200_OK
data = response.json()
assert len(data) == 2
assert data[0]["username"] == "admin"
def test_i_can_list_users_when_empty(self, client):
client.user_service_mock.list_users.return_value = []
response = client.get("/users")
assert response.status_code == status.HTTP_200_OK
assert response.json() == []
class TestGetUserById:
def test_i_can_get_user_by_id(self, client, fake_user_response):
client.user_service_mock.get_user_by_id.return_value = fake_user_response
response = client.get(f"/users/{fake_user_response.id}")
assert response.status_code == status.HTTP_200_OK
data = response.json()
assert data["username"] == fake_user_response.username
def test_i_cannot_get_user_by_id_not_found(self, client):
client.user_service_mock.get_user_by_id.return_value = None
response = client.get("/users/64f0c9f4b0d1c8b7b8e1f0a2")
assert response.status_code == status.HTTP_404_NOT_FOUND
assert response.json()["detail"] == "User not found"
class TestCreateUser:
def test_i_can_create_user(self, client, fake_user_response):
user_data = UserCreate(username="newuser",
email="new@example.com",
password="#Passw0rd!",
role=UserRole.USER)
client.user_service_mock.create_user.return_value = fake_user_response
response = client.post("/users", json=user_data.model_dump(mode="json"))
assert response.status_code == status.HTTP_201_CREATED
data = response.json()
assert data["username"] == fake_user_response.username
def test_i_cannot_create_user_when_service_raises_value_error(self, client):
user_data = {"username": "baduser", "email": "bad@example.com", "role": "user", "password": "password"}
client.user_service_mock.create_user.side_effect = ValueError("Invalid data")
response = client.post("/users", json=user_data)
assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY
class TestUpdateUser:
def test_i_can_update_user(self, client, fake_user_response):
user_data = {"username": "updateduser", "email": "updated@example.com"}
client.user_service_mock.update_user.return_value = fake_user_response
response = client.put(f"/users/{fake_user_response.id}", json=user_data)
assert response.status_code == status.HTTP_200_OK
data = response.json()
assert data["username"] == fake_user_response.username
def test_i_cannot_update_user_not_found(self, client):
client.user_service_mock.update_user.return_value = None
user_data = {"username": "updateduser"}
response = client.put("/users/64f0c9f4b0d1c8b7b8e1f0a2", json=user_data)
assert response.status_code == status.HTTP_404_NOT_FOUND
assert response.json()["detail"] == "User not found"
def test_i_cannot_update_user_when_service_raises_value_error(self, client):
client.user_service_mock.update_user.side_effect = ValueError("Invalid update")
user_data = {"username": "badupdate"}
response = client.put("/users/64f0c9f4b0d1c8b7b8e1f0a2", json=user_data)
assert response.status_code == status.HTTP_400_BAD_REQUEST
assert response.json()["detail"] == "Invalid update"
class TestDeleteUser:
def test_i_can_delete_user(self, client):
client.user_service_mock.delete_user.return_value = True
response = client.delete("/users/64f0c9f4b0d1c8b7b8e1f0a1")
assert response.status_code == status.HTTP_200_OK
data = response.json()
assert data["message"] == "User successfully deleted"
def test_i_cannot_delete_user_not_found(self, client):
client.user_service_mock.delete_user.return_value = False
response = client.delete("/users/64f0c9f4b0d1c8b7b8e1f0a2")
assert response.status_code == status.HTTP_404_NOT_FOUND
assert response.json()["detail"] == "User not found"

View File

0
tests/models/__init__.py Normal file
View File

View File

@@ -10,8 +10,8 @@ from pydantic import ValidationError
from datetime import datetime
from bson import ObjectId
from app.models.user import UserCreate, UserUpdate, UserInDB, UserResponse
from app.models.auth import UserRole
from app.models.user import UserCreate, UserUpdate, UserInDB
from app.models.auth import UserRole, UserResponse
class TestUserCreateModel:
@@ -349,7 +349,7 @@ class TestUserResponseModel:
# Convert to response model (excluding password_hash)
user_response = UserResponse(
id=user_in_db.id,
_id=user_in_db.id,
username=user_in_db.username,
email=user_in_db.email,
role=user_in_db.role,

View File

View File

@@ -0,0 +1,611 @@
"""
Test suite for FileDocumentRepository with async/support.
This module contains comprehensive tests for all FileDocumentRepository methods
using mongomock-motor for in-memory MongoDB testing.
"""
from datetime import datetime
import pytest
from bson import ObjectId
from mongomock.mongo_client import MongoClient
from pymongo.errors import PyMongoError
from app.database.repositories.document_repository import (
FileDocumentRepository,
MatchMethodBase,
SubsequenceMatching,
FuzzyMatching
)
from app.models.document import FileDocument, FileType, ExtractionMethod
@pytest.fixture
def in_memory_repository():
"""Create an in-memory FileDocumentRepository for testing."""
client = MongoClient()
db = client.test_database
repo = FileDocumentRepository(db)
repo.initialize()
return repo
@pytest.fixture
def sample_file_document():
"""Sample FileDocument data for testing."""
return FileDocument(
filename="sample_document.pdf",
filepath="/home/user/documents/sample_document.pdf",
file_type=FileType.PDF,
extraction_method=ExtractionMethod.OCR,
metadata={"pages": 5, "language": "en", "author": "John Doe"},
detected_at=datetime.now(),
file_hash="a1b2c3d4e5f6789012345678901234567890abcdef1234567890abcdef123456",
encoding="utf-8",
file_size=1024000,
mime_type="application/pdf"
)
@pytest.fixture
def sample_update_data():
"""Sample update data for testing."""
return {
"extraction_method": ExtractionMethod.HYBRID,
"metadata": {"pages": 10, "language": "fr", "updated": True},
"file_size": 2048000
}
@pytest.fixture
def multiple_sample_files():
"""Multiple FileDocument objects for list/search testing."""
base_time = datetime.now()
return [
FileDocument(
filename="first_doc.txt",
filepath="/docs/first_doc.txt",
file_type=FileType.TXT,
extraction_method=ExtractionMethod.DIRECT_TEXT,
metadata={"words": 500},
detected_at=base_time,
file_hash="hash1" + "0" * 58,
encoding="utf-8",
file_size=5000,
mime_type="text/plain"
),
FileDocument(
filename="second_document.pdf",
filepath="/docs/second_document.pdf",
file_type=FileType.PDF,
extraction_method=ExtractionMethod.OCR,
metadata={"pages": 8},
detected_at=base_time,
file_hash="hash2" + "0" * 58,
encoding="utf-8",
file_size=10000,
mime_type="application/pdf"
),
FileDocument(
filename="third_file.docx",
filepath="/docs/third_file.docx",
file_type=FileType.DOCX,
extraction_method=ExtractionMethod.HYBRID,
metadata={"paragraphs": 15},
detected_at=base_time,
file_hash="hash3" + "0" * 58,
encoding="utf-8",
file_size=15000,
mime_type="application/vnd.openxmlformats-officedocument.wordprocessingml.document"
)
]
class TestFileDocumentRepositoryInitialization:
"""Tests for repository initialization."""
def test_i_can_initialize_repository(self):
"""Test repository initialization."""
# Arrange
client = MongoClient()
db = client.test_database
repo = FileDocumentRepository(db)
repo.initialize()
# Act & Assert (should not raise any exception)
assert repo.db is not None
assert repo.collection is not None
# TODO : check that the indexes are created
class TestFileDocumentRepositoryCreation:
"""Tests for file document creation functionality."""
def test_i_can_create_file_document(self, in_memory_repository, sample_file_document):
"""Test successful file document creation."""
# Act
created_file = in_memory_repository.create_document(sample_file_document)
# Assert
assert created_file is not None
assert created_file.filename == sample_file_document.filename
assert created_file.filepath == sample_file_document.filepath
assert created_file.file_type == sample_file_document.file_type
assert created_file.extraction_method == sample_file_document.extraction_method
assert created_file.metadata == sample_file_document.metadata
assert created_file.file_hash == sample_file_document.file_hash
assert created_file.file_size == sample_file_document.file_size
assert created_file.mime_type == sample_file_document.mime_type
assert created_file.id is not None
assert isinstance(created_file.id, ObjectId)
def test_i_can_create_file_document_without_id(self, in_memory_repository, sample_file_document):
"""Test creating file document with _id set to None (should be removed)."""
# Arrange
sample_file_document.id = None
# Act
created_file = in_memory_repository.create_document(sample_file_document)
# Assert
assert created_file is not None
assert created_file.id is not None
assert isinstance(created_file.id, ObjectId)
def test_i_cannot_create_file_document_with_pymongo_error(self, in_memory_repository,
sample_file_document, mocker):
"""Test handling of PyMongo errors during file document creation."""
# Arrange
mocker.patch.object(in_memory_repository.collection, 'insert_one', side_effect=PyMongoError("Database error"))
# Act & Assert
with pytest.raises(ValueError) as exc_info:
in_memory_repository.create_document(sample_file_document)
assert "Failed to create file document" in str(exc_info.value)
class TestFileDocumentRepositoryFinding:
"""Tests for file document finding functionality."""
def test_i_can_find_document_by_valid_id(self, in_memory_repository, sample_file_document):
"""Test finding file document by valid ObjectId."""
# Arrange
created_file = in_memory_repository.create_document(sample_file_document)
# Act
found_file = in_memory_repository.find_document_by_id(str(created_file.id))
# Assert
assert found_file is not None
assert found_file.id == created_file.id
assert found_file.filename == created_file.filename
assert found_file.filepath == created_file.filepath
def test_i_cannot_find_document_with_invalid_id(self, in_memory_repository):
"""Test that invalid ObjectId returns None."""
# Act
found_file = in_memory_repository.find_document_by_id("invalid_id")
# Assert
assert found_file is None
def test_i_cannot_find_document_by_nonexistent_id(self, in_memory_repository):
"""Test that nonexistent but valid ObjectId returns None."""
# Arrange
nonexistent_id = str(ObjectId())
# Act
found_file = in_memory_repository.find_document_by_id(nonexistent_id)
# Assert
assert found_file is None
def test_i_can_find_document_by_file_hash(self, in_memory_repository, sample_file_document):
"""Test finding file document by file hash."""
# Arrange
created_file = in_memory_repository.create_document(sample_file_document)
# Act
found_file = in_memory_repository.find_document_by_hash(sample_file_document.file_hash)
# Assert
assert found_file is not None
assert found_file.file_hash == created_file.file_hash
assert found_file.id == created_file.id
def test_i_cannot_find_document_with_nonexistent_file_hash(self, in_memory_repository):
"""Test that nonexistent file hash returns None."""
# Act
found_file = in_memory_repository.find_document_by_hash("nonexistent_hash")
# Assert
assert found_file is None
def test_i_can_find_document_by_filepath(self, in_memory_repository, sample_file_document):
"""Test finding file document by filepath."""
# Arrange
created_file = in_memory_repository.create_document(sample_file_document)
# Act
found_file = in_memory_repository.find_document_by_filepath(sample_file_document.filepath)
# Assert
assert found_file is not None
assert found_file.filepath == created_file.filepath
assert found_file.id == created_file.id
def test_i_cannot_find_document_with_nonexistent_filepath(self, in_memory_repository):
"""Test that nonexistent filepath returns None."""
# Act
found_file = in_memory_repository.find_document_by_filepath("/nonexistent/path/file.pdf")
# Assert
assert found_file is None
def test_i_cannot_find_document_with_pymongo_error(self, in_memory_repository, mocker):
"""Test handling of PyMongo errors during file document finding."""
# Arrange
mocker.patch.object(in_memory_repository.collection, 'find_one', side_effect=PyMongoError("Database error"))
# Act
found_file = in_memory_repository.find_document_by_hash("test_hash")
# Assert
assert found_file is None
class TestFileDocumentRepositoryNameMatching:
"""Tests for file document name matching functionality."""
def test_i_can_find_documents_by_name_with_fuzzy_matching(self, in_memory_repository, multiple_sample_files):
"""Test finding file documents by filename using fuzzy matching."""
# Arrange
for file_doc in multiple_sample_files:
in_memory_repository.create_document(file_doc)
# Act
fuzzy_method = FuzzyMatching(threshold=0.5)
found_files = in_memory_repository.find_document_by_name("document", fuzzy_method)
# Assert
assert len(found_files) >= 1
assert all(isinstance(file_doc, FileDocument) for file_doc in found_files)
# Should find files with "document" in the name
found_filenames = [f.filename for f in found_files]
assert any("document" in fname.lower() for fname in found_filenames)
def test_i_can_find_documents_by_name_with_subsequence_matching(self, in_memory_repository,
multiple_sample_files):
"""Test finding file documents by filename using subsequence matching."""
# Arrange
for file_doc in multiple_sample_files:
in_memory_repository.create_document(file_doc)
# Act
subsequence_method = SubsequenceMatching()
found_files = in_memory_repository.find_document_by_name("doc", subsequence_method)
# Assert
assert len(found_files) >= 1
assert all(isinstance(file_doc, FileDocument) for file_doc in found_files)
def test_i_can_find_documents_by_name_with_default_method(self, in_memory_repository, multiple_sample_files):
"""Test finding file documents by filename with default matching method."""
# Arrange
for file_doc in multiple_sample_files:
in_memory_repository.create_document(file_doc)
# Act
found_files = in_memory_repository.find_document_by_name("first")
# Assert
assert len(found_files) >= 0
assert all(isinstance(file_doc, FileDocument) for file_doc in found_files)
def test_i_cannot_find_documents_by_name_with_pymongo_error(self, in_memory_repository, mocker):
"""Test handling of PyMongo errors during document name matching."""
# Arrange
mocker.patch.object(in_memory_repository.collection, 'find', side_effect=PyMongoError("Database error"))
# Act
found_files = in_memory_repository.find_document_by_name("test")
# Assert
assert found_files == []
class TestFileDocumentRepositoryListing:
"""Tests for file document listing functionality."""
def test_i_can_list_documents_with_default_pagination(self, in_memory_repository, multiple_sample_files):
"""Test listing file documents with default pagination."""
# Arrange
for file_doc in multiple_sample_files:
in_memory_repository.create_document(file_doc)
# Act
files = in_memory_repository.list_documents()
# Assert
assert len(files) == len(multiple_sample_files)
assert all(isinstance(file_doc, FileDocument) for file_doc in files)
def test_i_can_list_documents_with_custom_pagination(self, in_memory_repository, multiple_sample_files):
"""Test listing file documents with custom pagination."""
# Arrange
for file_doc in multiple_sample_files:
in_memory_repository.create_document(file_doc)
# Act
files_page1 = in_memory_repository.list_documents(skip=0, limit=2)
files_page2 = in_memory_repository.list_documents(skip=2, limit=2)
# Assert
assert len(files_page1) == 2
assert len(files_page2) == 1 # Only 3 total files
# Ensure no overlap between pages
page1_ids = [file_doc.id for file_doc in files_page1]
page2_ids = [file_doc.id for file_doc in files_page2]
assert len(set(page1_ids).intersection(set(page2_ids))) == 0
def test_i_can_list_documents_sorted_by_detected_at(self, in_memory_repository, sample_file_document):
"""Test that file documents are sorted by detected_at in descending order."""
# Arrange
file1 = sample_file_document.model_copy()
file1.filepath = "/docs/file1.pdf"
file1.filename = "file1.pdf"
file1.file_hash = "hash1" + "0" * 58
file1.detected_at = datetime(2024, 1, 1, 10, 0, 0)
file2 = sample_file_document.model_copy()
file2.filepath = "/docs/file2.pdf"
file2.filename = "file2.pdf"
file2.file_hash = "hash2" + "0" * 58
file2.detected_at = datetime(2024, 1, 2, 10, 0, 0) # Later date
created_file1 = in_memory_repository.create_document(file1)
created_file2 = in_memory_repository.create_document(file2)
# Act
files = in_memory_repository.list_documents()
# Assert
assert len(files) == 2
# Most recent (latest detected_at) should be first
assert files[0].id == created_file2.id
assert files[1].id == created_file1.id
def test_i_can_list_empty_documents(self, in_memory_repository):
"""Test listing file documents from empty collection."""
# Act
files = in_memory_repository.list_documents()
# Assert
assert files == []
def test_i_cannot_list_documents_with_pymongo_error(self, in_memory_repository, mocker):
"""Test handling of PyMongo errors during file document listing."""
# Arrange
mocker.patch.object(in_memory_repository.collection, 'find', side_effect=PyMongoError("Database error"))
# Act
files = in_memory_repository.list_documents()
# Assert
assert files == []
class TestFileDocumentRepositoryUpdate:
"""Tests for file document update functionality."""
def test_i_can_update_document_successfully(self, in_memory_repository, sample_file_document,
sample_update_data):
"""Test successful file document update."""
# Arrange
created_file = in_memory_repository.create_document(sample_file_document)
# Act
updated_file = in_memory_repository.update_document(str(created_file.id), sample_update_data)
# Assert
assert updated_file is not None
assert updated_file.extraction_method == sample_update_data["extraction_method"]
assert updated_file.metadata == sample_update_data["metadata"]
assert updated_file.file_size == sample_update_data["file_size"]
assert updated_file.id == created_file.id
assert updated_file.filename == created_file.filename # Unchanged fields remain
assert updated_file.filepath == created_file.filepath
def test_i_can_update_document_with_partial_data(self, in_memory_repository, sample_file_document):
"""Test updating file document with partial data."""
# Arrange
created_file = in_memory_repository.create_document(sample_file_document)
partial_update = {"file_size": 999999}
# Act
updated_file = in_memory_repository.update_document(str(created_file.id), partial_update)
# Assert
assert updated_file is not None
assert updated_file.file_size == 999999
assert updated_file.filename == created_file.filename # Should remain unchanged
assert updated_file.metadata == created_file.metadata # Should remain unchanged
def test_i_can_update_document_filtering_none_values(self, in_memory_repository, sample_file_document):
"""Test that None values are filtered out from update data."""
# Arrange
created_file = in_memory_repository.create_document(sample_file_document)
update_with_none = {"file_size": 777777, "metadata": None}
# Act
updated_file = in_memory_repository.update_document(str(created_file.id), update_with_none)
# Assert
assert updated_file is not None
assert updated_file.file_size == 777777
assert updated_file.metadata == created_file.metadata # Should remain unchanged (None filtered out)
def test_i_can_update_document_with_empty_data(self, in_memory_repository, sample_file_document):
"""Test updating file document with empty data returns current document."""
# Arrange
created_file = in_memory_repository.create_document(sample_file_document)
empty_update = {}
# Act
result = in_memory_repository.update_document(str(created_file.id), empty_update)
# Assert
assert result is not None
assert result.filename == created_file.filename
assert result.filepath == created_file.filepath
assert result.metadata == created_file.metadata
def test_i_cannot_update_document_with_invalid_id(self, in_memory_repository, sample_update_data):
"""Test that updating with invalid ID returns None."""
# Act
result = in_memory_repository.update_document("invalid_id", sample_update_data)
# Assert
assert result is None
def test_i_cannot_update_nonexistent_document(self, in_memory_repository, sample_update_data):
"""Test that updating nonexistent file document returns None."""
# Arrange
nonexistent_id = str(ObjectId())
# Act
result = in_memory_repository.update_document(nonexistent_id, sample_update_data)
# Assert
assert result is None
def test_i_cannot_update_document_with_pymongo_error(self, in_memory_repository, sample_file_document,
sample_update_data, mocker):
"""Test handling of PyMongo errors during file document update."""
# Arrange
created_file = in_memory_repository.create_document(sample_file_document)
mocker.patch.object(in_memory_repository.collection, 'find_one_and_update',
side_effect=PyMongoError("Database error"))
# Act
result = in_memory_repository.update_document(str(created_file.id), sample_update_data)
# Assert
assert result is None
class TestFileDocumentRepositoryDeletion:
"""Tests for file document deletion functionality."""
def test_i_can_delete_existing_document(self, in_memory_repository, sample_file_document):
"""Test successful file document deletion."""
# Arrange
created_file = in_memory_repository.create_document(sample_file_document)
# Act
deletion_result = in_memory_repository.delete_document(str(created_file.id))
# Assert
assert deletion_result is True
# Verify document is actually deleted
found_file = in_memory_repository.find_document_by_id(str(created_file.id))
assert found_file is None
def test_i_cannot_delete_document_with_invalid_id(self, in_memory_repository):
"""Test that deleting with invalid ID returns False."""
# Act
result = in_memory_repository.delete_document("invalid_id")
# Assert
assert result is False
def test_i_cannot_delete_nonexistent_document(self, in_memory_repository):
"""Test that deleting nonexistent file document returns False."""
# Arrange
nonexistent_id = str(ObjectId())
# Act
result = in_memory_repository.delete_document(nonexistent_id)
# Assert
assert result is False
def test_i_cannot_delete_document_with_pymongo_error(self, in_memory_repository, sample_file_document, mocker):
"""Test handling of PyMongo errors during file document deletion."""
# Arrange
created_file = in_memory_repository.create_document(sample_file_document)
mocker.patch.object(in_memory_repository.collection, 'delete_one', side_effect=PyMongoError("Database error"))
# Act
result = in_memory_repository.delete_document(str(created_file.id))
# Assert
assert result is False
class TestFileDocumentRepositoryUtilities:
"""Tests for utility methods."""
def test_i_can_count_documents(self, in_memory_repository, sample_file_document):
"""Test counting file documents."""
# Arrange
initial_count = in_memory_repository.count_documents()
in_memory_repository.create_document(sample_file_document)
# Act
final_count = in_memory_repository.count_documents()
# Assert
assert final_count == initial_count + 1
def test_i_can_count_zero_documents(self, in_memory_repository):
"""Test counting file documents in empty collection."""
# Act
count = in_memory_repository.count_documents()
# Assert
assert count == 0
def test_i_cannot_count_documents_with_pymongo_error(self, in_memory_repository, mocker):
"""Test handling of PyMongo errors during file document counting."""
# Arrange
mocker.patch.object(in_memory_repository.collection, 'count_documents', side_effect=PyMongoError("Database error"))
# Act
count = in_memory_repository.count_documents()
# Assert
assert count == 0
class TestMatchingMethods:
"""Tests for matching method classes."""
def test_i_can_create_fuzzy_matching_with_default_threshold(self):
"""Test creating FuzzyMatching with default threshold."""
# Act
fuzzy = FuzzyMatching()
# Assert
assert fuzzy.threshold == 0.6
def test_i_can_create_fuzzy_matching_with_custom_threshold(self):
"""Test creating FuzzyMatching with custom threshold."""
# Act
fuzzy = FuzzyMatching(threshold=0.8)
# Assert
assert fuzzy.threshold == 0.8
def test_i_can_create_subsequence_matching(self):
"""Test creating SubsequenceMatching."""
# Act
subsequence = SubsequenceMatching()
# Assert
assert isinstance(subsequence, MatchMethodBase)
assert isinstance(subsequence, SubsequenceMatching)

View File

@@ -0,0 +1,496 @@
"""
Test suite for JobRepository with async/support.
This module contains comprehensive tests for all JobRepository methods
using mongomock-motor for in-memory MongoDB testing.
"""
from datetime import datetime
import pytest
from bson import ObjectId
from mongomock.mongo_client import MongoClient
from mongomock_motor import AsyncMongoMockClient
from pymongo.errors import PyMongoError
from app.database.repositories.job_repository import JobRepository
from app.exceptions.job_exceptions import JobRepositoryError
from app.models.job import ProcessingJob, ProcessingStatus
from app.models.types import PyObjectId
@pytest.fixture
def in_memory_repository():
"""Create an in-memory JobRepository for testing."""
client = MongoClient()
db = client.test_database
repo = JobRepository(db)
repo.initialize()
return repo
@pytest.fixture
def sample_document_id():
"""Sample document ObjectId for testing."""
return PyObjectId()
@pytest.fixture
def sample_task_id():
"""Sample Celery task ID for testing."""
return "celery-task-12345-abcde"
@pytest.fixture
def multiple_sample_jobs():
"""Multiple ProcessingJob objects for testing."""
doc_id_1 = ObjectId()
doc_id_2 = ObjectId()
base_time = datetime.utcnow()
return [
ProcessingJob(
document_id=doc_id_1,
status=ProcessingStatus.PENDING,
task_id="task-1",
created_at=base_time,
started_at=None,
completed_at=None,
error_message=None
),
ProcessingJob(
document_id=doc_id_2,
status=ProcessingStatus.PROCESSING,
task_id="task-2",
created_at=base_time,
started_at=base_time,
completed_at=None,
error_message=None
),
ProcessingJob(
document_id=doc_id_1,
status=ProcessingStatus.COMPLETED,
task_id="task-3",
created_at=base_time,
started_at=base_time,
completed_at=base_time,
error_message=None
)
]
class TestJobRepositoryInitialization:
"""Tests for repository initialization."""
def test_i_can_initialize_repository(self):
"""Test repository initialization."""
# Arrange
client = AsyncMongoMockClient()
db = client.test_database
repo = JobRepository(db)
# Act
initialized_repo = repo.initialize()
# Assert
assert initialized_repo is repo
assert repo.db is not None
assert repo.collection is not None
class TestJobRepositoryCreation:
"""Tests for job creation functionality."""
def test_i_can_create_job_with_task_id(self, in_memory_repository, sample_document_id, sample_task_id):
"""Test successful job creation with task ID."""
# Act
created_job = in_memory_repository.create_job(sample_document_id, sample_task_id)
# Assert
assert created_job is not None
assert created_job.document_id == sample_document_id
assert created_job.task_id == sample_task_id
assert created_job.status == ProcessingStatus.PENDING
assert created_job.created_at is not None
assert created_job.started_at is None
assert created_job.completed_at is None
assert created_job.error_message is None
assert created_job.id is not None
assert isinstance(created_job.id, ObjectId)
def test_i_can_create_job_without_task_id(self, in_memory_repository, sample_document_id):
"""Test successful job creation without task ID."""
# Act
created_job = in_memory_repository.create_job(sample_document_id)
# Assert
assert created_job is not None
assert created_job.document_id == sample_document_id
assert created_job.task_id is None
assert created_job.status == ProcessingStatus.PENDING
assert created_job.created_at is not None
assert created_job.started_at is None
assert created_job.completed_at is None
assert created_job.error_message is None
assert created_job.id is not None
assert isinstance(created_job.id, ObjectId)
def test_i_cannot_create_duplicate_job_for_document(self, in_memory_repository, sample_document_id,
sample_task_id):
"""Test that creating job with duplicate document_id raises DuplicateKeyError."""
# Arrange
in_memory_repository.create_job(sample_document_id, sample_task_id)
# Act & Assert
with pytest.raises(JobRepositoryError) as exc_info:
in_memory_repository.create_job(sample_document_id, "different-task-id")
assert "create_job" in str(exc_info.value)
def test_i_cannot_create_job_with_pymongo_error(self, in_memory_repository, sample_document_id, mocker):
"""Test handling of PyMongo errors during job creation."""
# Arrange
mocker.patch.object(in_memory_repository.collection, 'insert_one', side_effect=PyMongoError("Database error"))
# Act & Assert
with pytest.raises(JobRepositoryError) as exc_info:
in_memory_repository.create_job(sample_document_id)
assert "create_job" in str(exc_info.value)
class TestJobRepositoryFinding:
"""Tests for job finding functionality."""
def test_i_can_find_job_by_valid_id(self, in_memory_repository, sample_document_id, sample_task_id):
"""Test finding job by valid ObjectId."""
# Arrange
created_job = in_memory_repository.create_job(sample_document_id, sample_task_id)
# Act
found_job = in_memory_repository.find_job_by_id(created_job.id)
# Assert
assert found_job is not None
assert found_job.id == created_job.id
assert found_job.document_id == created_job.document_id
assert found_job.task_id == created_job.task_id
assert found_job.status == created_job.status
def test_i_cannot_find_job_by_nonexistent_id(self, in_memory_repository):
"""Test that nonexistent ObjectId returns None."""
# Arrange
nonexistent_id = PyObjectId()
# Act
found_job = in_memory_repository.find_job_by_id(nonexistent_id)
# Assert
assert found_job is None
def test_i_cannot_find_job_with_pymongo_error(self, in_memory_repository, mocker):
"""Test handling of PyMongo errors during job finding."""
# Arrange
mocker.patch.object(in_memory_repository.collection, 'find_one', side_effect=PyMongoError("Database error"))
# Act & Assert
with pytest.raises(JobRepositoryError) as exc_info:
in_memory_repository.find_job_by_id(PyObjectId())
assert "get_job_by_id" in str(exc_info.value)
def test_i_can_find_jobs_by_document_id(self, in_memory_repository, sample_document_id, sample_task_id):
"""Test finding jobs by document ID."""
# Arrange
created_job = in_memory_repository.create_job(sample_document_id, sample_task_id)
# Act
found_jobs = in_memory_repository.find_jobs_by_document_id(sample_document_id)
# Assert
assert len(found_jobs) == 1
assert found_jobs[0].id == created_job.id
assert found_jobs[0].document_id == sample_document_id
def test_i_can_find_empty_jobs_list_for_nonexistent_document(self, in_memory_repository):
"""Test that nonexistent document ID returns empty list."""
# Arrange
nonexistent_id = ObjectId()
# Act
found_jobs = in_memory_repository.find_jobs_by_document_id(nonexistent_id)
# Assert
assert found_jobs == []
def test_i_cannot_find_jobs_by_document_with_pymongo_error(self, in_memory_repository, mocker):
"""Test handling of PyMongo errors during finding jobs by document ID."""
# Arrange
mocker.patch.object(in_memory_repository.collection, 'find', side_effect=PyMongoError("Database error"))
# Act & Assert
with pytest.raises(JobRepositoryError) as exc_info:
in_memory_repository.find_jobs_by_document_id(PyObjectId())
assert "get_jobs_by_file_id" in str(exc_info.value)
@pytest.mark.parametrize("status", [
ProcessingStatus.PENDING,
ProcessingStatus.PROCESSING,
ProcessingStatus.COMPLETED
])
def test_i_can_find_jobs_by_pending_status(self, in_memory_repository, sample_document_id, status):
"""Test finding jobs by PENDING status."""
# Arrange
created_job = in_memory_repository.create_job(sample_document_id)
in_memory_repository.update_job_status(created_job.id, status)
# Act
found_jobs = in_memory_repository.get_jobs_by_status(status)
# Assert
assert len(found_jobs) == 1
assert found_jobs[0].id == created_job.id
assert found_jobs[0].status == status
def test_i_can_find_jobs_by_failed_status(self, in_memory_repository, sample_document_id):
"""Test finding jobs by FAILED status."""
# Arrange
created_job = in_memory_repository.create_job(sample_document_id)
in_memory_repository.update_job_status(created_job.id, ProcessingStatus.FAILED, "Test error")
# Act
found_jobs = in_memory_repository.get_jobs_by_status(ProcessingStatus.FAILED)
# Assert
assert len(found_jobs) == 1
assert found_jobs[0].id == created_job.id
assert found_jobs[0].status == ProcessingStatus.FAILED
assert found_jobs[0].error_message == "Test error"
def test_i_can_find_empty_jobs_list_for_unused_status(self, in_memory_repository):
"""Test that unused status returns empty list."""
# Act
found_jobs = in_memory_repository.get_jobs_by_status(ProcessingStatus.COMPLETED)
# Assert
assert found_jobs == []
def test_i_cannot_find_jobs_by_status_with_pymongo_error(self, in_memory_repository, mocker):
"""Test handling of PyMongo errors during finding jobs by status."""
# Arrange
mocker.patch.object(in_memory_repository.collection, 'find', side_effect=PyMongoError("Database error"))
# Act & Assert
with pytest.raises(JobRepositoryError) as exc_info:
in_memory_repository.get_jobs_by_status(ProcessingStatus.PENDING)
assert "get_jobs_by_status" in str(exc_info.value)
class TestJobRepositoryStatusUpdate:
"""Tests for job status update functionality."""
def test_i_can_update_job_status_to_processing(self, in_memory_repository, sample_document_id):
"""Test updating job status to PROCESSING with started_at timestamp."""
# Arrange
created_job = in_memory_repository.create_job(sample_document_id)
# Act
updated_job = in_memory_repository.update_job_status(created_job.id, ProcessingStatus.PROCESSING)
# Assert
assert updated_job is not None
assert updated_job.id == created_job.id
assert updated_job.status == ProcessingStatus.PROCESSING
assert updated_job.started_at is not None
assert updated_job.completed_at is None
assert updated_job.error_message is None
def test_i_can_update_job_status_to_completed(self, in_memory_repository, sample_document_id):
"""Test updating job status to COMPLETED with completed_at timestamp."""
# Arrange
created_job = in_memory_repository.create_job(sample_document_id)
in_memory_repository.update_job_status(created_job.id, ProcessingStatus.PROCESSING)
# Act
updated_job = in_memory_repository.update_job_status(created_job.id, ProcessingStatus.COMPLETED)
# Assert
assert updated_job is not None
assert updated_job.id == created_job.id
assert updated_job.status == ProcessingStatus.COMPLETED
assert updated_job.started_at is not None
assert updated_job.completed_at is not None
assert updated_job.error_message is None
def test_i_can_update_job_status_to_failed_with_error(self, in_memory_repository, sample_document_id):
"""Test updating job status to FAILED with error message and completed_at timestamp."""
# Arrange
created_job = in_memory_repository.create_job(sample_document_id)
error_message = "Processing failed due to invalid format"
# Act
updated_job = in_memory_repository.update_job_status(
created_job.id, ProcessingStatus.FAILED, error_message
)
# Assert
assert updated_job is not None
assert updated_job.id == created_job.id
assert updated_job.status == ProcessingStatus.FAILED
assert updated_job.completed_at is not None
assert updated_job.error_message == error_message
def test_i_can_update_job_status_to_failed_without_error(self, in_memory_repository, sample_document_id):
"""Test updating job status to FAILED without error message."""
# Arrange
created_job = in_memory_repository.create_job(sample_document_id)
# Act
updated_job = in_memory_repository.update_job_status(created_job.id, ProcessingStatus.FAILED)
# Assert
assert updated_job is not None
assert updated_job.id == created_job.id
assert updated_job.status == ProcessingStatus.FAILED
assert updated_job.completed_at is not None
assert updated_job.error_message is None
def test_i_cannot_update_nonexistent_job_status(self, in_memory_repository):
"""Test that updating nonexistent job returns None."""
# Arrange
nonexistent_id = ObjectId()
# Act
result = in_memory_repository.update_job_status(nonexistent_id, ProcessingStatus.COMPLETED)
# Assert
assert result is None
def test_i_cannot_update_job_status_with_pymongo_error(self, in_memory_repository, sample_document_id, mocker):
"""Test handling of PyMongo errors during job status update."""
# Arrange
created_job = in_memory_repository.create_job(sample_document_id)
mocker.patch.object(in_memory_repository.collection, 'find_one_and_update',
side_effect=PyMongoError("Database error"))
# Act & Assert
with pytest.raises(JobRepositoryError) as exc_info:
in_memory_repository.update_job_status(created_job.id, ProcessingStatus.COMPLETED)
assert "update_job_status" in str(exc_info.value)
class TestJobRepositoryDeletion:
"""Tests for job deletion functionality."""
def test_i_can_delete_existing_job(self, in_memory_repository, sample_document_id):
"""Test successful job deletion."""
# Arrange
created_job = in_memory_repository.create_job(sample_document_id)
# Act
deletion_result = in_memory_repository.delete_job(created_job.id)
# Assert
assert deletion_result is True
# Verify job is actually deleted
found_job = in_memory_repository.find_job_by_id(created_job.id)
assert found_job is None
def test_i_cannot_delete_nonexistent_job(self, in_memory_repository):
"""Test that deleting nonexistent job returns False."""
# Arrange
nonexistent_id = ObjectId()
# Act
result = in_memory_repository.delete_job(nonexistent_id)
# Assert
assert result is False
def test_i_cannot_delete_job_with_pymongo_error(self, in_memory_repository, sample_document_id, mocker):
"""Test handling of PyMongo errors during job deletion."""
# Arrange
created_job = in_memory_repository.create_job(sample_document_id)
mocker.patch.object(in_memory_repository.collection, 'delete_one', side_effect=PyMongoError("Database error"))
# Act & Assert
with pytest.raises(JobRepositoryError) as exc_info:
in_memory_repository.delete_job(created_job.id)
assert "delete_job" in str(exc_info.value)
class TestJobRepositoryComplexScenarios:
"""Tests for complex job repository scenarios."""
def test_i_can_handle_complete_job_lifecycle(self, in_memory_repository, sample_document_id, sample_task_id):
"""Test complete job lifecycle from creation to completion."""
# Create job
job = in_memory_repository.create_job(sample_document_id, sample_task_id)
assert job.status == ProcessingStatus.PENDING
assert job.started_at is None
assert job.completed_at is None
# Start processing
job = in_memory_repository.update_job_status(job.id, ProcessingStatus.PROCESSING)
assert job.status == ProcessingStatus.PROCESSING
assert job.started_at is not None
assert job.completed_at is None
# Complete job
job = in_memory_repository.update_job_status(job.id, ProcessingStatus.COMPLETED)
assert job.status == ProcessingStatus.COMPLETED
assert job.started_at is not None
assert job.completed_at is not None
assert job.error_message is None
def test_i_can_handle_job_failure_scenario(self, in_memory_repository, sample_document_id, sample_task_id):
"""Test job failure scenario with error message."""
# Create and start job
job = in_memory_repository.create_job(sample_document_id, sample_task_id)
job = in_memory_repository.update_job_status(job.id, ProcessingStatus.PROCESSING)
# Fail job with error
error_msg = "File format not supported"
job = in_memory_repository.update_job_status(job.id, ProcessingStatus.FAILED, error_msg)
# Assert failure state
assert job.status == ProcessingStatus.FAILED
assert job.started_at is not None
assert job.completed_at is not None
assert job.error_message == error_msg
def test_i_can_handle_multiple_documents_with_different_statuses(self, in_memory_repository):
"""Test managing multiple jobs for different documents with various statuses."""
# Create jobs for different documents
doc1 = PyObjectId()
doc2 = PyObjectId()
doc3 = PyObjectId()
job1 = in_memory_repository.create_job(doc1, "task-1")
job2 = in_memory_repository.create_job(doc2, "task-2")
job3 = in_memory_repository.create_job(doc3, "task-3")
# Update to different statuses
in_memory_repository.update_job_status(job1.id, ProcessingStatus.PROCESSING)
in_memory_repository.update_job_status(job2.id, ProcessingStatus.COMPLETED)
in_memory_repository.update_job_status(job3.id, ProcessingStatus.FAILED, "Error occurred")
# Verify status queries
pending_jobs = in_memory_repository.get_jobs_by_status(ProcessingStatus.PENDING)
processing_jobs = in_memory_repository.get_jobs_by_status(ProcessingStatus.PROCESSING)
completed_jobs = in_memory_repository.get_jobs_by_status(ProcessingStatus.COMPLETED)
failed_jobs = in_memory_repository.get_jobs_by_status(ProcessingStatus.FAILED)
assert len(pending_jobs) == 0
assert len(processing_jobs) == 1
assert len(completed_jobs) == 1
assert len(failed_jobs) == 1
assert processing_jobs[0].id == job1.id
assert completed_jobs[0].id == job2.id
assert failed_jobs[0].id == job3.id

View File

@@ -1,29 +1,26 @@
"""
Test suite for UserRepository with async/await support.
Test suite for UserRepository with async/support.
This module contains comprehensive tests for all UserRepository methods
using mongomock-motor for in-memory MongoDB testing.
"""
import pytest
from datetime import datetime
import pytest_asyncio
from bson import ObjectId
from mongomock.mongo_client import MongoClient
from pymongo.errors import DuplicateKeyError
from mongomock_motor import AsyncMongoMockClient
from app.database.repositories.user_repository import UserRepository
from app.models.user import UserCreate, UserUpdate, UserInDB
from app.models.user import UserCreate, UserUpdate
@pytest_asyncio.fixture
async def in_memory_repository():
@pytest.fixture
def in_memory_repository():
"""Create an in-memory UserRepository for testing."""
client = AsyncMongoMockClient()
client = MongoClient()
db = client.test_database
repo = UserRepository(db)
await repo.initialize()
repo.initialize()
return repo
@@ -51,11 +48,10 @@ def sample_user_update():
class TestUserRepositoryCreation:
"""Tests for user creation functionality."""
@pytest.mark.asyncio
async def test_i_can_create_user(self, in_memory_repository, sample_user_create):
def test_i_can_create_user(self, in_memory_repository, sample_user_create):
"""Test successful user creation."""
# Act
created_user = await in_memory_repository.create_user(sample_user_create)
created_user = in_memory_repository.create_user(sample_user_create)
# Assert
assert created_user is not None
@@ -68,15 +64,14 @@ class TestUserRepositoryCreation:
assert created_user.updated_at is not None
assert created_user.hashed_password != sample_user_create.password # Should be hashed
@pytest.mark.asyncio
async def test_i_cannot_create_user_with_duplicate_username(self, in_memory_repository, sample_user_create):
def test_i_cannot_create_user_with_duplicate_username(self, in_memory_repository, sample_user_create):
"""Test that creating user with duplicate username raises DuplicateKeyError."""
# Arrange
await in_memory_repository.create_user(sample_user_create)
in_memory_repository.create_user(sample_user_create)
# Act & Assert
with pytest.raises(DuplicateKeyError) as exc_info:
await in_memory_repository.create_user(sample_user_create)
in_memory_repository.create_user(sample_user_create)
assert "already exists" in str(exc_info.value)
@@ -84,14 +79,13 @@ class TestUserRepositoryCreation:
class TestUserRepositoryFinding:
"""Tests for user finding functionality."""
@pytest.mark.asyncio
async def test_i_can_find_user_by_id(self, in_memory_repository, sample_user_create):
def test_i_can_find_user_by_id(self, in_memory_repository, sample_user_create):
"""Test finding user by valid ID."""
# Arrange
created_user = await in_memory_repository.create_user(sample_user_create)
created_user = in_memory_repository.create_user(sample_user_create)
# Act
found_user = await in_memory_repository.find_user_by_id(str(created_user.id))
found_user = in_memory_repository.find_user_by_id(str(created_user.id))
# Assert
assert found_user is not None
@@ -99,69 +93,63 @@ class TestUserRepositoryFinding:
assert found_user.username == created_user.username
assert found_user.email == created_user.email
@pytest.mark.asyncio
async def test_i_cannot_find_user_by_invalid_id(self, in_memory_repository):
def test_i_cannot_find_user_by_invalid_id(self, in_memory_repository):
"""Test that invalid ObjectId returns None."""
# Act
found_user = await in_memory_repository.find_user_by_id("invalid_id")
found_user = in_memory_repository.find_user_by_id("invalid_id")
# Assert
assert found_user is None
@pytest.mark.asyncio
async def test_i_cannot_find_user_by_nonexistent_id(self, in_memory_repository):
def test_i_cannot_find_user_by_nonexistent_id(self, in_memory_repository):
"""Test that nonexistent but valid ObjectId returns None."""
# Arrange
nonexistent_id = str(ObjectId())
# Act
found_user = await in_memory_repository.find_user_by_id(nonexistent_id)
found_user = in_memory_repository.find_user_by_id(nonexistent_id)
# Assert
assert found_user is None
@pytest.mark.asyncio
async def test_i_can_find_user_by_username(self, in_memory_repository, sample_user_create):
def test_i_can_find_user_by_username(self, in_memory_repository, sample_user_create):
"""Test finding user by username."""
# Arrange
created_user = await in_memory_repository.create_user(sample_user_create)
created_user = in_memory_repository.create_user(sample_user_create)
# Act
found_user = await in_memory_repository.find_user_by_username(sample_user_create.username)
found_user = in_memory_repository.find_user_by_username(sample_user_create.username)
# Assert
assert found_user is not None
assert found_user.username == created_user.username
assert found_user.id == created_user.id
@pytest.mark.asyncio
async def test_i_cannot_find_user_by_nonexistent_username(self, in_memory_repository):
def test_i_cannot_find_user_by_nonexistent_username(self, in_memory_repository):
"""Test that nonexistent username returns None."""
# Act
found_user = await in_memory_repository.find_user_by_username("nonexistent")
found_user = in_memory_repository.find_user_by_username("nonexistent")
# Assert
assert found_user is None
@pytest.mark.asyncio
async def test_i_can_find_user_by_email(self, in_memory_repository, sample_user_create):
def test_i_can_find_user_by_email(self, in_memory_repository, sample_user_create):
"""Test finding user by email."""
# Arrange
created_user = await in_memory_repository.create_user(sample_user_create)
created_user = in_memory_repository.create_user(sample_user_create)
# Act
found_user = await in_memory_repository.find_user_by_email(str(sample_user_create.email))
found_user = in_memory_repository.find_user_by_email(str(sample_user_create.email))
# Assert
assert found_user is not None
assert found_user.email == created_user.email
assert found_user.id == created_user.id
@pytest.mark.asyncio
async def test_i_cannot_find_user_by_nonexistent_email(self, in_memory_repository):
def test_i_cannot_find_user_by_nonexistent_email(self, in_memory_repository):
"""Test that nonexistent email returns None."""
# Act
found_user = await in_memory_repository.find_user_by_email("nonexistent@example.com")
found_user = in_memory_repository.find_user_by_email("nonexistent@example.com")
# Assert
assert found_user is None
@@ -170,15 +158,14 @@ class TestUserRepositoryFinding:
class TestUserRepositoryUpdate:
"""Tests for user update functionality."""
@pytest.mark.asyncio
async def test_i_can_update_user(self, in_memory_repository, sample_user_create, sample_user_update):
def test_i_can_update_user(self, in_memory_repository, sample_user_create, sample_user_update):
"""Test successful user update."""
# Arrange
created_user = await in_memory_repository.create_user(sample_user_create)
created_user = in_memory_repository.create_user(sample_user_create)
original_updated_at = created_user.updated_at
# Act
updated_user = await in_memory_repository.update_user(str(created_user.id), sample_user_update)
updated_user = in_memory_repository.update_user(str(created_user.id), sample_user_update)
# Assert
assert updated_user is not None
@@ -187,24 +174,22 @@ class TestUserRepositoryUpdate:
assert updated_user.role == sample_user_update.role
assert updated_user.id == created_user.id
@pytest.mark.asyncio
async def test_i_cannot_update_user_with_invalid_id(self, in_memory_repository, sample_user_update):
def test_i_cannot_update_user_with_invalid_id(self, in_memory_repository, sample_user_update):
"""Test that updating with invalid ID returns None."""
# Act
result = await in_memory_repository.update_user("invalid_id", sample_user_update)
result = in_memory_repository.update_user("invalid_id", sample_user_update)
# Assert
assert result is None
@pytest.mark.asyncio
async def test_i_can_update_user_with_partial_data(self, in_memory_repository, sample_user_create):
def test_i_can_update_user_with_partial_data(self, in_memory_repository, sample_user_create):
"""Test updating user with partial data."""
# Arrange
created_user = await in_memory_repository.create_user(sample_user_create)
created_user = in_memory_repository.create_user(sample_user_create)
partial_update = UserUpdate(username="newusername")
# Act
updated_user = await in_memory_repository.update_user(str(created_user.id), partial_update)
updated_user = in_memory_repository.update_user(str(created_user.id), partial_update)
# Assert
assert updated_user is not None
@@ -212,15 +197,14 @@ class TestUserRepositoryUpdate:
assert updated_user.email == created_user.email # Should remain unchanged
assert updated_user.role == created_user.role # Should remain unchanged
@pytest.mark.asyncio
async def test_i_can_update_user_with_empty_data(self, in_memory_repository, sample_user_create):
def test_i_can_update_user_with_empty_data(self, in_memory_repository, sample_user_create):
"""Test updating user with empty data returns current user."""
# Arrange
created_user = await in_memory_repository.create_user(sample_user_create)
created_user = in_memory_repository.create_user(sample_user_create)
empty_update = UserUpdate()
# Act
result = await in_memory_repository.update_user(str(created_user.id), empty_update)
result = in_memory_repository.update_user(str(created_user.id), empty_update)
# Assert
assert result is not None
@@ -231,39 +215,36 @@ class TestUserRepositoryUpdate:
class TestUserRepositoryDeletion:
"""Tests for user deletion functionality."""
@pytest.mark.asyncio
async def test_i_can_delete_user(self, in_memory_repository, sample_user_create):
def test_i_can_delete_user(self, in_memory_repository, sample_user_create):
"""Test successful user deletion."""
# Arrange
created_user = await in_memory_repository.create_user(sample_user_create)
created_user = in_memory_repository.create_user(sample_user_create)
# Act
deletion_result = await in_memory_repository.delete_user(str(created_user.id))
deletion_result = in_memory_repository.delete_user(str(created_user.id))
# Assert
assert deletion_result is True
# Verify user is actually deleted
found_user = await in_memory_repository.find_user_by_id(str(created_user.id))
found_user = in_memory_repository.find_user_by_id(str(created_user.id))
assert found_user is None
@pytest.mark.asyncio
async def test_i_cannot_delete_user_with_invalid_id(self, in_memory_repository):
def test_i_cannot_delete_user_with_invalid_id(self, in_memory_repository):
"""Test that deleting with invalid ID returns False."""
# Act
result = await in_memory_repository.delete_user("invalid_id")
result = in_memory_repository.delete_user("invalid_id")
# Assert
assert result is False
@pytest.mark.asyncio
async def test_i_cannot_delete_nonexistent_user(self, in_memory_repository):
def test_i_cannot_delete_nonexistent_user(self, in_memory_repository):
"""Test that deleting nonexistent user returns False."""
# Arrange
nonexistent_id = str(ObjectId())
# Act
result = await in_memory_repository.delete_user(nonexistent_id)
result = in_memory_repository.delete_user(nonexistent_id)
# Assert
assert result is False
@@ -272,30 +253,27 @@ class TestUserRepositoryDeletion:
class TestUserRepositoryUtilities:
"""Tests for utility methods."""
@pytest.mark.asyncio
async def test_i_can_count_users(self, in_memory_repository, sample_user_create):
def test_i_can_count_users(self, in_memory_repository, sample_user_create):
"""Test counting users."""
# Arrange
initial_count = await in_memory_repository.count_users()
await in_memory_repository.create_user(sample_user_create)
initial_count = in_memory_repository.count_users()
in_memory_repository.create_user(sample_user_create)
# Act
final_count = await in_memory_repository.count_users()
final_count = in_memory_repository.count_users()
# Assert
assert final_count == initial_count + 1
@pytest.mark.asyncio
async def test_i_can_check_user_exists(self, in_memory_repository, sample_user_create):
def test_i_can_check_user_exists(self, in_memory_repository, sample_user_create):
"""Test checking if user exists."""
# Arrange
await in_memory_repository.create_user(sample_user_create)
in_memory_repository.create_user(sample_user_create)
# Act
exists = await in_memory_repository.user_exists(sample_user_create.username)
not_exists = await in_memory_repository.user_exists("nonexistent")
exists = in_memory_repository.user_exists(sample_user_create.username)
not_exists = in_memory_repository.user_exists("nonexistent")
# Assert
assert exists is True
assert not_exists is False

View File

View File

@@ -0,0 +1,570 @@
"""
Unit tests for DocumentService using in-memory MongoDB.
Tests the orchestration logic with real MongoDB operations
using mongomock for better integration testing.
"""
import os
from datetime import datetime
from unittest.mock import patch
import pytest
import pytest_asyncio
from bson import ObjectId
from mongomock.mongo_client import MongoClient
from app.models.document import FileType
from app.services.document_service import DocumentService
@pytest.fixture(autouse=True)
def cleanup_test_folder():
"""Clean up test folder."""
import shutil
shutil.rmtree("test_folder", ignore_errors=True)
@pytest.fixture
def in_memory_database():
"""Create an in-memory database for testing."""
client = MongoClient()
return client.test_database
@pytest_asyncio.fixture
def document_service(in_memory_database):
"""Create DocumentService with in-memory repositories."""
service = DocumentService(in_memory_database, objects_folder="test_folder")
return service
@pytest.fixture
def sample_file_bytes():
"""Sample file content as bytes."""
return b"This is a test PDF content"
@pytest.fixture
def sample_text_bytes():
"""Sample text file content as bytes."""
return b"This is a test text file content"
@pytest.fixture
def sample_file_hash():
"""Expected SHA256 hash for sample file bytes."""
import hashlib
return hashlib.sha256(b"This is a test PDF content").hexdigest()
def validate_file_saved(document_service, file_hash, file_bytes):
# Verify file is saved to disk
target_file_path = os.path.join(document_service.objects_folder, file_hash[:24], file_hash)
assert os.path.exists(target_file_path)
with open(target_file_path, "rb") as f:
content = f.read()
assert content == file_bytes
class TestCreateDocument:
"""Tests for create_document method."""
@patch('app.services.document_service.magic.from_buffer')
@patch('app.services.document_service.datetime')
def test_i_can_create_document_with_new_content(
self,
mock_datetime,
mock_magic,
document_service,
sample_file_bytes
):
"""Test creating document when content doesn't exist yet."""
# Setup mocks
fixed_time = datetime(2025, 1, 1, 10, 30, 0)
mock_datetime.now.return_value = fixed_time
mock_magic.return_value = "application/pdf"
# Execute
result = document_service.create_document(
"/test/test.pdf",
sample_file_bytes,
"utf-8"
)
# Verify document creation
assert result is not None
assert result.filename == "test.pdf"
assert result.filepath == "/test/test.pdf"
assert result.file_type == FileType.PDF
assert result.detected_at == fixed_time
assert result.file_hash == document_service._calculate_file_hash(sample_file_bytes)
# Verify document created in database
doc_in_db = document_service.document_repository.find_document_by_id(result.id)
assert doc_in_db is not None
assert doc_in_db.id == result.id
assert doc_in_db.filename == result.filename
assert doc_in_db.filepath == result.filepath
assert doc_in_db.file_type == result.file_type
assert doc_in_db.detected_at == fixed_time
assert doc_in_db.file_hash == result.file_hash
# Verify file is saved to disk
validate_file_saved(document_service, result.file_hash, sample_file_bytes)
@patch('app.services.document_service.magic.from_buffer')
@patch('app.services.document_service.datetime')
def test_i_can_create_document_with_existing_content(
self,
mock_datetime,
mock_magic,
document_service,
sample_file_bytes
):
"""Test creating document when content already exists (deduplication)."""
# Setup mocks
fixed_time = datetime(2025, 1, 1, 10, 30, 0)
mock_datetime.now.return_value = fixed_time
mock_magic.return_value = "application/pdf"
# Create first document
first_doc = document_service.create_document(
"/test/first.pdf",
sample_file_bytes,
"utf-8"
)
# Create second document with same content
second_doc = document_service.create_document(
"/test/second.pdf",
sample_file_bytes,
"utf-8"
)
# Verify both documents exist but share same hash
assert first_doc.file_hash == second_doc.file_hash
assert first_doc.filename != second_doc.filename
assert first_doc.filepath != second_doc.filepath
def test_i_cannot_create_document_with_unsupported_file_type(
self,
document_service,
sample_file_bytes
):
"""Test that unsupported file types raise ValueError."""
with pytest.raises(ValueError, match="Unsupported file type"):
document_service.create_document(
"/test/test.xyz", # Unsupported extension
sample_file_bytes,
"utf-8"
)
def test_i_cannot_create_document_with_empty_file_path(
self,
document_service,
sample_file_bytes
):
"""Test that empty file path raises ValueError."""
with pytest.raises(ValueError):
document_service.create_document(
"", # Empty path
sample_file_bytes,
"utf-8"
)
@patch('app.services.document_service.magic.from_buffer')
def test_i_can_create_document_with_empty_bytes(
self,
mock_magic,
document_service
):
"""Test behavior with empty file bytes."""
# Setup
mock_magic.return_value = "text/plain"
# Execute with empty bytes
result = document_service.create_document(
"/test/empty.txt",
b"", # Empty bytes
"utf-8"
)
# Verify file is saved to disk
validate_file_saved(document_service, result.file_hash, b"")
class TestGetMethods:
"""Tests for document retrieval methods."""
@patch('app.services.document_service.magic.from_buffer')
def test_i_can_get_document_by_id(
self,
mock_magic,
document_service,
sample_file_bytes
):
"""Test retrieving document by ID."""
# Setup
mock_magic.return_value = "application/pdf"
# Create a document first
created_doc = document_service.create_document(
"/test/test.pdf",
sample_file_bytes,
"utf-8"
)
# Execute
result = document_service.get_document_by_id(created_doc.id)
# Verify
assert result is not None
assert result.id == created_doc.id
assert result.filename == created_doc.filename
@patch('app.services.document_service.magic.from_buffer')
def test_i_can_get_document_by_hash(
self,
mock_magic,
document_service,
sample_file_bytes
):
"""Test retrieving document by file hash."""
# Setup
mock_magic.return_value = "application/pdf"
# Create a document first
created_doc = document_service.create_document(
"/test/test.pdf",
sample_file_bytes,
"utf-8"
)
# Execute
result = document_service.get_document_by_hash(created_doc.file_hash)
# Verify
assert result is not None
assert result.file_hash == created_doc.file_hash
assert result.filename == created_doc.filename
@patch('app.services.document_service.magic.from_buffer')
def test_i_can_get_document_by_filepath(
self,
mock_magic,
document_service,
sample_file_bytes
):
"""Test retrieving document by file path."""
# Setup
mock_magic.return_value = "application/pdf"
test_path = "/test/unique_test.pdf"
# Create a document first
created_doc = document_service.create_document(
test_path,
sample_file_bytes,
"utf-8"
)
# Execute
result = document_service.get_document_by_filepath(test_path)
# Verify
assert result is not None
assert result.filepath == test_path
assert result.id == created_doc.id
@patch('app.services.document_service.magic.from_buffer')
def test_i_can_get_document_content(
self,
mock_magic,
document_service,
sample_file_bytes
):
"""Test retrieving document with associated content."""
# Setup
mock_magic.return_value = "application/pdf"
# Create a document first
created_doc = document_service.create_document(
"/test/test.pdf",
sample_file_bytes,
"utf-8"
)
# Execute
result = document_service.get_document_content_by_hash(created_doc.file_hash)
# Verify
assert result == sample_file_bytes
def test_i_cannot_get_nonexistent_document_by_id(
self,
document_service
):
"""Test that nonexistent document returns None."""
# Execute with random ObjectId
result = document_service.get_document_by_id(ObjectId())
# Verify
assert result is None
def test_i_cannot_get_nonexistent_document_by_hash(
self,
document_service
):
"""Test that nonexistent document hash returns None."""
# Execute
result = document_service.get_document_by_hash("nonexistent_hash")
# Verify
assert result is None
class TestPaginationAndCounting:
"""Tests for document listing and counting."""
@patch('app.services.document_service.magic.from_buffer')
def test_i_can_list_documents_with_pagination(
self,
mock_magic,
document_service,
sample_file_bytes
):
"""Test document listing with pagination parameters."""
# Setup
mock_magic.return_value = "application/pdf"
# Create multiple documents
for i in range(5):
document_service.create_document(
f"/test/test{i}.pdf",
sample_file_bytes + bytes(str(i), 'utf-8'), # Make each file unique
"utf-8"
)
# Execute with pagination
result = document_service.list_documents(skip=1, limit=2)
# Verify
assert len(result) == 2
# Test counting
total_count = document_service.count_documents()
assert total_count == 5
@patch('app.services.document_service.magic.from_buffer')
def test_i_can_count_documents(
self,
mock_magic,
document_service,
sample_file_bytes
):
"""Test document counting."""
# Setup
mock_magic.return_value = "text/plain"
# Initially should be 0
initial_count = document_service.count_documents()
assert initial_count == 0
# Create some documents
for i in range(3):
document_service.create_document(
f"/test/test{i}.txt",
sample_file_bytes + bytes(str(i), 'utf-8'),
"utf-8"
)
# Execute
final_count = document_service.count_documents()
# Verify
assert final_count == 3
class TestUpdateAndDelete:
"""Tests for document update and deletion operations."""
@patch('app.services.document_service.magic.from_buffer')
def test_i_can_update_document_metadata(
self,
mock_magic,
document_service,
sample_file_bytes
):
"""Test updating document metadata."""
# Setup
mock_magic.return_value = "application/pdf"
# Create a document first
created_doc = document_service.create_document(
"/test/test.pdf",
sample_file_bytes,
"utf-8"
)
# Execute update
update_data = {"metadata": {"page_count": 5}}
result = document_service.update_document(created_doc.id, update_data)
# Verify
assert result is not None
assert result.metadata.get("page_count") == 5
assert result.filename == created_doc.filename
assert result.filepath == created_doc.filepath
assert result.file_hash == created_doc.file_hash
assert result.file_type == created_doc.file_type
assert result.metadata == update_data['metadata']
def test_i_can_update_document_content(
self,
document_service,
sample_file_bytes
):
# Create a document first
created_doc = document_service.create_document(
"/test/test.pdf",
sample_file_bytes,
"utf-8"
)
# Execute update
update_data = {"file_bytes": b"this is an updated file content"}
result = document_service.update_document(created_doc.id, update_data)
assert result.filename == created_doc.filename
assert result.filepath == created_doc.filepath
assert result.file_hash != created_doc.file_hash
assert result.file_type == created_doc.file_type
assert result.metadata == created_doc.metadata
# Verify file is saved to disk
validate_file_saved(document_service, result.file_hash, b"this is an updated file content")
@patch('app.services.document_service.magic.from_buffer')
def test_i_can_delete_document_and_orphaned_content(
self,
mock_magic,
document_service,
sample_file_bytes
):
"""Test deleting document with orphaned content cleanup."""
# Setup
mock_magic.return_value = "application/pdf"
# Create a document
created_doc = document_service.create_document(
"/test/test.pdf",
sample_file_bytes,
"utf-8"
)
# Verify content exists
validate_file_saved(document_service, created_doc.file_hash, sample_file_bytes)
# Execute deletion
result = document_service.delete_document(created_doc.id)
# Verify document and content are deleted
assert result is True
deleted_doc = document_service.get_document_by_id(created_doc.id)
assert deleted_doc is None
# validate content is deleted
file_hash = created_doc.file_hash[:24]
target_file_path = os.path.join(document_service.objects_folder, file_hash[:24], file_hash)
assert not os.path.exists(target_file_path)
@patch('app.services.document_service.magic.from_buffer')
def test_i_can_delete_document_without_affecting_shared_content(
self,
mock_magic,
document_service,
sample_file_bytes
):
"""Test deleting document without removing shared content."""
# Setup
mock_magic.return_value = "application/pdf"
# Create two documents with same content
doc1 = document_service.create_document(
"/test/test1.pdf",
sample_file_bytes,
"utf-8"
)
doc2 = document_service.create_document(
"/test/test2.pdf",
sample_file_bytes,
"utf-8"
)
# They should share the same hash
assert doc1.file_hash == doc2.file_hash
# Delete first document
result = document_service.delete_document(doc1.id)
assert result is True
# Verify first document is deleted but content still exists
deleted_doc = document_service.get_document_by_id(doc1.id)
assert deleted_doc is None
remaining_doc = document_service.get_document_by_id(doc2.id)
assert remaining_doc is not None
validate_file_saved(document_service, doc2.file_hash, sample_file_bytes)
class TestHashCalculation:
"""Tests for file hash calculation utility."""
def test_i_can_calculate_consistent_file_hash(self, document_service):
"""Test that file hash calculation is consistent."""
test_bytes = b"Test content for hashing"
# Calculate hash multiple times
hash1 = document_service._calculate_file_hash(test_bytes)
hash2 = document_service._calculate_file_hash(test_bytes)
# Should be identical
assert hash1 == hash2
assert len(hash1) == 64 # SHA256 produces 64-character hex string
def test_i_get_different_hashes_for_different_content(self, document_service):
"""Test that different content produces different hashes."""
content1 = b"First content"
content2 = b"Second content"
hash1 = document_service._calculate_file_hash(content1)
hash2 = document_service._calculate_file_hash(content2)
assert hash1 != hash2
class TestFileTypeDetection:
"""Tests for file type detection."""
def test_i_can_detect_pdf_file_type(self, document_service):
"""Test PDF file type detection."""
file_type = document_service._detect_file_type("/path/to/document.pdf")
assert file_type == FileType.PDF
def test_i_can_detect_txt_file_type(self, document_service):
"""Test text file type detection."""
file_type = document_service._detect_file_type("/path/to/document.txt")
assert file_type == FileType.TXT
def test_i_can_detect_docx_file_type(self, document_service):
"""Test DOCX file type detection."""
file_type = document_service._detect_file_type("/path/to/document.docx")
assert file_type == FileType.DOCX
def test_i_cannot_detect_unsupported_file_type(self, document_service):
"""Test unsupported file type raises ValueError."""
with pytest.raises(ValueError, match="Unsupported file type"):
document_service._detect_file_type("/path/to/document.xyz")

View File

@@ -0,0 +1,518 @@
"""
Unit tests for JobService using in-memory MongoDB.
Tests the business logic operations with real MongoDB operations
using mongomock for better integration testing.
"""
import pytest
from bson import ObjectId
from mongomock.mongo_client import MongoClient
from app.exceptions.job_exceptions import InvalidStatusTransitionError
from app.models.job import ProcessingStatus
from app.models.types import PyObjectId
from app.services.job_service import JobService
@pytest.fixture
def in_memory_database():
"""Create an in-memory database for testing."""
client = MongoClient()
return client.test_database
@pytest.fixture
def job_service(in_memory_database):
"""Create JobService with in-memory repositories."""
service = JobService(in_memory_database).initialize()
return service
@pytest.fixture
def sample_document_id():
"""Sample file ObjectId."""
return PyObjectId()
@pytest.fixture
def sample_task_id():
"""Sample Celery task UUID."""
return "550e8400-e29b-41d4-a716-446655440000"
class TestCreateJob:
"""Tests for create_job method."""
def test_i_can_create_job_with_task_id(
self,
job_service,
sample_document_id,
sample_task_id
):
"""Test creating job with task ID."""
# Execute
result = job_service.create_job(sample_document_id, sample_task_id)
# Verify job creation
assert result is not None
assert result.document_id == sample_document_id
assert result.task_id == sample_task_id
assert result.status == ProcessingStatus.PENDING
assert result.created_at is not None
assert result.started_at is None
assert result.error_message is None
# Verify job exists in database
job_in_db = job_service.get_job_by_id(result.id)
assert job_in_db is not None
assert job_in_db.id == result.id
assert job_in_db.document_id == sample_document_id
assert job_in_db.task_id == sample_task_id
assert job_in_db.status == ProcessingStatus.PENDING
def test_i_can_create_job_without_task_id(
self,
job_service,
sample_document_id
):
"""Test creating job without task ID."""
# Execute
result = job_service.create_job(sample_document_id)
# Verify job creation
assert result is not None
assert result.document_id == sample_document_id
assert result.task_id is None
assert result.status == ProcessingStatus.PENDING
assert result.created_at is not None
assert result.started_at is None
assert result.error_message is None
class TestGetJobMethods:
"""Tests for job retrieval methods."""
def test_i_can_get_job_by_id(
self,
job_service,
sample_document_id,
sample_task_id
):
"""Test retrieving job by ID."""
# Create a job first
created_job = job_service.create_job(sample_document_id, sample_task_id)
# Execute
result = job_service.get_job_by_id(created_job.id)
# Verify
assert result is not None
assert result.id == created_job.id
assert result.document_id == created_job.document_id
assert result.task_id == created_job.task_id
assert result.status == created_job.status
def test_i_can_get_jobs_by_status(
self,
job_service,
sample_document_id
):
"""Test retrieving jobs by status."""
# Create jobs with different statuses
pending_job = job_service.create_job(sample_document_id, "pending-task")
processing_job = job_service.create_job(ObjectId(), "processing-task")
job_service.mark_job_as_started(processing_job.id)
completed_job = job_service.create_job(ObjectId(), "completed-task")
job_service.mark_job_as_started(completed_job.id)
job_service.mark_job_as_completed(completed_job.id)
# Execute - get pending jobs
pending_results = job_service.get_jobs_by_status(ProcessingStatus.PENDING)
# Verify
assert len(pending_results) == 1
assert pending_results[0].id == pending_job.id
assert pending_results[0].status == ProcessingStatus.PENDING
# Execute - get processing jobs
processing_results = job_service.get_jobs_by_status(ProcessingStatus.PROCESSING)
assert len(processing_results) == 1
assert processing_results[0].status == ProcessingStatus.PROCESSING
# Execute - get completed jobs
completed_results = job_service.get_jobs_by_status(ProcessingStatus.COMPLETED)
assert len(completed_results) == 1
assert completed_results[0].status == ProcessingStatus.COMPLETED
class TestUpdateStatus:
"""Tests for mark_job_as_started method."""
def test_i_can_mark_pending_job_as_started(
self,
job_service,
sample_document_id,
sample_task_id
):
"""Test marking pending job as started (PENDING → PROCESSING)."""
# Create a pending job
created_job = job_service.create_job(sample_document_id, sample_task_id)
assert created_job.status == ProcessingStatus.PENDING
# Execute
result = job_service.mark_job_as_started(created_job.id)
# Verify status transition
assert result is not None
assert result.id == created_job.id
assert result.status == ProcessingStatus.PROCESSING
# Verify in database
updated_job = job_service.get_job_by_id(created_job.id)
assert updated_job.status == ProcessingStatus.PROCESSING
def test_i_cannot_mark_processing_job_as_started(
self,
job_service,
sample_document_id,
sample_task_id
):
"""Test that processing job cannot be marked as started."""
# Create and start a job
created_job = job_service.create_job(sample_document_id, sample_task_id)
job_service.mark_job_as_started(created_job.id)
# Try to start it again
with pytest.raises(InvalidStatusTransitionError) as exc_info:
job_service.mark_job_as_started(created_job.id)
# Verify exception details
assert exc_info.value.current_status == ProcessingStatus.PROCESSING
assert exc_info.value.target_status == ProcessingStatus.PROCESSING
def test_i_cannot_mark_completed_job_as_started(
self,
job_service,
sample_document_id,
sample_task_id
):
"""Test that completed job cannot be marked as started."""
# Create, start, and complete a job
created_job = job_service.create_job(sample_document_id, sample_task_id)
job_service.mark_job_as_started(created_job.id)
job_service.mark_job_as_completed(created_job.id)
# Try to start it again
with pytest.raises(InvalidStatusTransitionError) as exc_info:
job_service.mark_job_as_started(created_job.id)
# Verify exception details
assert exc_info.value.current_status == ProcessingStatus.COMPLETED
assert exc_info.value.target_status == ProcessingStatus.PROCESSING
def test_i_cannot_mark_failed_job_as_started(
self,
job_service,
sample_document_id,
sample_task_id
):
"""Test that failed job cannot be marked as started."""
# Create, start, and fail a job
created_job = job_service.create_job(sample_document_id, sample_task_id)
job_service.mark_job_as_started(created_job.id)
job_service.mark_job_as_failed(created_job.id, "Test error")
# Try to start it again
with pytest.raises(InvalidStatusTransitionError) as exc_info:
job_service.mark_job_as_started(created_job.id)
# Verify exception details
assert exc_info.value.current_status == ProcessingStatus.FAILED
assert exc_info.value.target_status == ProcessingStatus.PROCESSING
def test_i_can_mark_processing_job_as_completed(
self,
job_service,
sample_document_id,
sample_task_id
):
"""Test marking processing job as completed (PROCESSING → COMPLETED)."""
# Create and start a job
created_job = job_service.create_job(sample_document_id, sample_task_id)
started_job = job_service.mark_job_as_started(created_job.id)
# Execute
result = job_service.mark_job_as_completed(created_job.id)
# Verify status transition
assert result is not None
assert result.id == created_job.id
assert result.status == ProcessingStatus.COMPLETED
# Verify in database
updated_job = job_service.get_job_by_id(created_job.id)
assert updated_job.status == ProcessingStatus.COMPLETED
def test_i_cannot_mark_pending_job_as_completed(
self,
job_service,
sample_document_id,
sample_task_id
):
"""Test that pending job cannot be marked as completed."""
# Create a pending job
created_job = job_service.create_job(sample_document_id, sample_task_id)
# Try to complete it directly
with pytest.raises(InvalidStatusTransitionError) as exc_info:
job_service.mark_job_as_completed(created_job.id)
# Verify exception details
assert exc_info.value.current_status == ProcessingStatus.PENDING
assert exc_info.value.target_status == ProcessingStatus.COMPLETED
def test_i_cannot_mark_completed_job_as_completed(
self,
job_service,
sample_document_id,
sample_task_id
):
"""Test that completed job cannot be marked as completed again."""
# Create, start, and complete a job
created_job = job_service.create_job(sample_document_id, sample_task_id)
job_service.mark_job_as_started(created_job.id)
job_service.mark_job_as_completed(created_job.id)
# Try to complete it again
with pytest.raises(InvalidStatusTransitionError) as exc_info:
job_service.mark_job_as_completed(created_job.id)
# Verify exception details
assert exc_info.value.current_status == ProcessingStatus.COMPLETED
assert exc_info.value.target_status == ProcessingStatus.COMPLETED
def test_i_cannot_mark_failed_job_as_completed(
self,
job_service,
sample_document_id,
sample_task_id
):
"""Test that failed job cannot be marked as completed."""
# Create, start, and fail a job
created_job = job_service.create_job(sample_document_id, sample_task_id)
job_service.mark_job_as_started(created_job.id)
job_service.mark_job_as_failed(created_job.id, "Test error")
# Try to complete it
with pytest.raises(InvalidStatusTransitionError) as exc_info:
job_service.mark_job_as_completed(created_job.id)
# Verify exception details
assert exc_info.value.current_status == ProcessingStatus.FAILED
assert exc_info.value.target_status == ProcessingStatus.COMPLETED
def test_i_can_mark_processing_job_as_failed_with_error_message(
self,
job_service,
sample_document_id,
sample_task_id
):
"""Test marking processing job as failed with error message."""
# Create and start a job
created_job = job_service.create_job(sample_document_id, sample_task_id)
started_job = job_service.mark_job_as_started(created_job.id)
error_message = "Processing failed due to invalid file format"
# Execute
result = job_service.mark_job_as_failed(created_job.id, error_message)
# Verify status transition
assert result is not None
assert result.id == created_job.id
assert result.status == ProcessingStatus.FAILED
assert result.error_message == error_message
# Verify in database
updated_job = job_service.get_job_by_id(created_job.id)
assert updated_job.status == ProcessingStatus.FAILED
assert updated_job.error_message == error_message
def test_i_can_mark_processing_job_as_failed_without_error_message(
self,
job_service,
sample_document_id,
sample_task_id
):
"""Test marking processing job as failed without error message."""
# Create and start a job
created_job = job_service.create_job(sample_document_id, sample_task_id)
job_service.mark_job_as_started(created_job.id)
# Execute without error message
result = job_service.mark_job_as_failed(created_job.id)
# Verify status transition
assert result is not None
assert result.status == ProcessingStatus.FAILED
assert result.error_message is None
def test_i_cannot_mark_pending_job_as_failed(
self,
job_service,
sample_document_id,
sample_task_id
):
"""Test that pending job cannot be marked as failed."""
# Create a pending job
created_job = job_service.create_job(sample_document_id, sample_task_id)
# Try to fail it directly
with pytest.raises(InvalidStatusTransitionError) as exc_info:
job_service.mark_job_as_failed(created_job.id, "Test error")
# Verify exception details
assert exc_info.value.current_status == ProcessingStatus.PENDING
assert exc_info.value.target_status == ProcessingStatus.FAILED
def test_i_cannot_mark_completed_job_as_failed(
self,
job_service,
sample_document_id,
sample_task_id
):
"""Test that completed job cannot be marked as failed."""
# Create, start, and complete a job
created_job = job_service.create_job(sample_document_id, sample_task_id)
job_service.mark_job_as_started(created_job.id)
job_service.mark_job_as_completed(created_job.id)
# Try to fail it
with pytest.raises(InvalidStatusTransitionError) as exc_info:
job_service.mark_job_as_failed(created_job.id, "Test error")
# Verify exception details
assert exc_info.value.current_status == ProcessingStatus.COMPLETED
assert exc_info.value.target_status == ProcessingStatus.FAILED
def test_i_cannot_mark_failed_job_as_failed(
self,
job_service,
sample_document_id,
sample_task_id
):
"""Test that failed job cannot be marked as failed again."""
# Create, start, and fail a job
created_job = job_service.create_job(sample_document_id, sample_task_id)
job_service.mark_job_as_started(created_job.id)
job_service.mark_job_as_failed(created_job.id, "First error")
# Try to fail it again
with pytest.raises(InvalidStatusTransitionError) as exc_info:
job_service.mark_job_as_failed(created_job.id, "Second error")
# Verify exception details
assert exc_info.value.current_status == ProcessingStatus.FAILED
assert exc_info.value.target_status == ProcessingStatus.FAILED
class TestDeleteJob:
"""Tests for delete_job method."""
def test_i_can_delete_existing_job(
self,
job_service,
sample_document_id,
sample_task_id
):
"""Test deleting an existing job."""
# Create a job
created_job = job_service.create_job(sample_document_id, sample_task_id)
# Verify job exists
job_before_delete = job_service.get_job_by_id(created_job.id)
assert job_before_delete is not None
# Execute deletion
result = job_service.delete_job(created_job.id)
# Verify deletion
assert result is True
# Verify job no longer exists
deleted_job = job_service.get_job_by_id(created_job.id)
assert deleted_job is None
def test_i_cannot_delete_nonexistent_job(
self,
job_service
):
"""Test deleting a nonexistent job returns False."""
# Execute deletion with random ObjectId
result = job_service.delete_job(ObjectId())
# Verify
assert result is False
class TestStatusTransitionValidation:
"""Tests for status transition validation across different scenarios."""
def test_valid_job_lifecycle_flow(
self,
job_service,
sample_document_id,
sample_task_id
):
"""Test complete valid job lifecycle: PENDING → PROCESSING → COMPLETED."""
# Create job (PENDING)
job = job_service.create_job(sample_document_id, sample_task_id)
assert job.status == ProcessingStatus.PENDING
# Start job (PENDING → PROCESSING)
started_job = job_service.mark_job_as_started(job.id)
assert started_job.status == ProcessingStatus.PROCESSING
# Complete job (PROCESSING → COMPLETED)
completed_job = job_service.mark_job_as_completed(job.id)
assert completed_job.status == ProcessingStatus.COMPLETED
def test_valid_job_failure_flow(
self,
job_service,
sample_document_id,
sample_task_id
):
"""Test valid job failure: PENDING → PROCESSING → FAILED."""
# Create job (PENDING)
job = job_service.create_job(sample_document_id, sample_task_id)
assert job.status == ProcessingStatus.PENDING
# Start job (PENDING → PROCESSING)
started_job = job_service.mark_job_as_started(job.id)
assert started_job.status == ProcessingStatus.PROCESSING
# Fail job (PROCESSING → FAILED)
failed_job = job_service.mark_job_as_failed(job.id, "Test failure")
assert failed_job.status == ProcessingStatus.FAILED
assert failed_job.error_message == "Test failure"
def test_job_operations_with_empty_database(
self,
job_service
):
"""Test job operations when database is empty."""
# Try to get nonexistent job
result = job_service.get_job_by_id(ObjectId())
assert result is None
# Try to get jobs by status when none exist
pending_jobs = job_service.get_jobs_by_status(ProcessingStatus.PENDING)
assert pending_jobs == []
# Try to delete nonexistent job
delete_result = job_service.delete_job(ObjectId())
assert delete_result is False

View File

@@ -1,187 +0,0 @@
"""
Unit tests for MongoDB database connection module.
Tests the database connection functionality with mocking
to avoid requiring actual MongoDB instance during tests.
"""
import pytest
from unittest.mock import Mock, patch, MagicMock
from pymongo.errors import ConnectionFailure, ServerSelectionTimeoutError
from app.database.connection import (
create_mongodb_client,
get_database,
close_database_connection,
get_mongodb_client,
test_database_connection
)
def test_i_can_get_database_connection():
"""Test successful database connection creation."""
mock_client = Mock()
mock_database = Mock()
# Configure the mock to support dictionary-like access
mock_client.__getitem__ = Mock(return_value=mock_database)
with patch('app.database.connection.MongoClient', return_value=mock_client):
with patch('app.database.connection.get_mongodb_url', return_value="mongodb://localhost:27017"):
with patch('app.database.connection.get_mongodb_database_name', return_value="testdb"):
# Reset global variables
import app.database.connection
app.database.connection._client = None
app.database.connection._database = None
result = get_database()
assert result == mock_database
mock_client.admin.command.assert_called_with('ping')
# Verify that __getitem__ was called with the database name
mock_client.__getitem__.assert_called_with("testdb")
def test_i_cannot_connect_to_invalid_mongodb_url():
"""Test fail-fast behavior with invalid MongoDB URL."""
mock_client = Mock()
mock_client.admin.command.side_effect = ConnectionFailure("Connection failed")
with patch('app.database.connection.MongoClient', return_value=mock_client):
with patch('app.database.connection.get_mongodb_url', return_value="mongodb://invalid:27017"):
with pytest.raises(SystemExit) as exc_info:
create_mongodb_client()
assert exc_info.value.code == 1
def test_i_cannot_connect_with_server_selection_timeout():
"""Test fail-fast behavior with server selection timeout."""
mock_client = Mock()
mock_client.admin.command.side_effect = ServerSelectionTimeoutError("Timeout")
with patch('app.database.connection.MongoClient', return_value=mock_client):
with patch('app.database.connection.get_mongodb_url', return_value="mongodb://timeout:27017"):
with pytest.raises(SystemExit) as exc_info:
create_mongodb_client()
assert exc_info.value.code == 1
def test_i_cannot_connect_with_unexpected_error():
"""Test fail-fast behavior with unexpected connection error."""
with patch('app.database.connection.MongoClient', side_effect=Exception("Unexpected error")):
with patch('app.database.connection.get_mongodb_url', return_value="mongodb://error:27017"):
with pytest.raises(SystemExit) as exc_info:
create_mongodb_client()
assert exc_info.value.code == 1
def test_i_can_get_database_singleton():
"""Test that get_database returns the same instance (singleton pattern)."""
mock_client = Mock()
mock_database = Mock()
mock_client.__getitem__ = Mock(return_value=mock_database)
with patch('app.database.connection.MongoClient', return_value=mock_client):
with patch('app.database.connection.get_mongodb_url', return_value="mongodb://localhost:27017"):
with patch('app.database.connection.get_mongodb_database_name', return_value="testdb"):
# Reset global variables
import app.database.connection
app.database.connection._client = None
app.database.connection._database = None
# First call
db1 = get_database()
# Second call
db2 = get_database()
assert db1 is db2
# MongoClient should be called only once
assert mock_client.admin.command.call_count == 1
def test_i_can_close_database_connection():
"""Test closing database connection."""
mock_client = Mock()
mock_database = Mock()
mock_client.__getitem__ = Mock(return_value=mock_database)
with patch('app.database.connection.MongoClient', return_value=mock_client):
with patch('app.database.connection.get_mongodb_url', return_value="mongodb://localhost:27017"):
with patch('app.database.connection.get_mongodb_database_name', return_value="testdb"):
# Reset global variables
import app.database.connection
app.database.connection._client = None
app.database.connection._database = None
# Create connection
get_database()
# Close connection
close_database_connection()
mock_client.close.assert_called_once()
assert app.database.connection._client is None
assert app.database.connection._database is None
def test_i_can_get_mongodb_client():
"""Test getting raw MongoDB client instance."""
mock_client = Mock()
mock_database = Mock()
mock_client.__getitem__ = Mock(return_value=mock_database)
with patch('app.database.connection.MongoClient', return_value=mock_client):
with patch('app.database.connection.get_mongodb_url', return_value="mongodb://localhost:27017"):
with patch('app.database.connection.get_mongodb_database_name', return_value="testdb"):
# Reset global variables
import app.database.connection
app.database.connection._client = None
app.database.connection._database = None
# Create connection first
get_database()
# Get client
result = get_mongodb_client()
assert result == mock_client
def test_i_can_get_none_mongodb_client_when_not_connected():
"""Test getting MongoDB client returns None when not connected."""
# Reset global variables
import app.database.connection
app.database.connection._client = None
app.database.connection._database = None
result = get_mongodb_client()
assert result is None
def test_i_can_test_database_connection_success():
"""Test database connection health check - success case."""
mock_database = Mock()
mock_database.command.return_value = True
with patch('app.database.connection.get_database', return_value=mock_database):
result = test_database_connection()
assert result is True
mock_database.command.assert_called_with('ping')
def test_i_can_close_connection_when_no_client():
"""Test closing connection when no client exists (should not raise error)."""
# Reset global variables
import app.database.connection
app.database.connection._client = None
app.database.connection._database = None
# Should not raise any exception
close_database_connection()
assert app.database.connection._client is None
assert app.database.connection._database is None

View File

@@ -1,311 +0,0 @@
"""
Test suite for DocumentContentRepository with async/await support.
This module contains comprehensive tests for all DocumentContentRepository methods
using mongomock-motor for in-memory MongoDB testing.
"""
import pytest
import hashlib
from datetime import datetime
import pytest_asyncio
from bson import ObjectId
from pymongo.errors import DuplicateKeyError
from mongomock_motor import AsyncMongoMockClient
from app.database.repositories.document_content_repository import DocumentContentRepository
from app.models.document import DocumentContent
@pytest_asyncio.fixture
async def in_memory_repository():
"""Create an in-memory DocumentContentRepository for testing."""
client = AsyncMongoMockClient()
db = client.test_database
repo = DocumentContentRepository(db)
await repo.initialize()
return repo
@pytest.fixture
def sample_document_content():
"""Sample DocumentContent data for testing."""
content = "This is sample document content for testing purposes."
file_hash = hashlib.sha256(content.encode()).hexdigest()
return DocumentContent(
file_hash=file_hash,
content=content,
encoding="utf-8",
file_size=len(content.encode()),
mime_type="text/plain"
)
@pytest.fixture
def another_document_content():
"""Another sample DocumentContent data for testing."""
content = "This is another sample document with different content."
file_hash = hashlib.sha256(content.encode()).hexdigest()
return DocumentContent(
file_hash=file_hash,
content=content,
encoding="utf-8",
file_size=len(content.encode()),
mime_type="text/plain"
)
class TestDocumentContentRepositoryCreation:
"""Tests for document content creation functionality."""
@pytest.mark.asyncio
async def test_i_can_create_document_content(self, in_memory_repository, sample_document_content):
"""Test successful document content creation."""
# Act
created_content = await in_memory_repository.create_document_content(sample_document_content)
# Assert
assert created_content is not None
assert created_content.file_hash == sample_document_content.file_hash
assert created_content.content == sample_document_content.content
assert created_content.encoding == sample_document_content.encoding
assert created_content.file_size == sample_document_content.file_size
assert created_content.mime_type == sample_document_content.mime_type
assert created_content.id is not None
@pytest.mark.asyncio
async def test_i_cannot_create_document_content_with_duplicate_file_hash(self, in_memory_repository,
sample_document_content):
"""Test that creating document content with duplicate file_hash raises DuplicateKeyError."""
# Arrange
await in_memory_repository.create_document_content(sample_document_content)
# Act & Assert
with pytest.raises(DuplicateKeyError) as exc_info:
await in_memory_repository.create_document_content(sample_document_content)
assert "already exists" in str(exc_info.value)
class TestDocumentContentRepositoryFinding:
"""Tests for document content finding functionality."""
@pytest.mark.asyncio
async def test_i_can_find_document_content_by_id(self, in_memory_repository, sample_document_content):
"""Test finding document content by valid ID."""
# Arrange
created_content = await in_memory_repository.create_document_content(sample_document_content)
# Act
found_content = await in_memory_repository.find_document_content_by_id(str(created_content.id))
# Assert
assert found_content is not None
assert found_content.id == created_content.id
assert found_content.file_hash == created_content.file_hash
assert found_content.content == created_content.content
@pytest.mark.asyncio
async def test_i_cannot_find_document_content_by_invalid_id(self, in_memory_repository):
"""Test that invalid ObjectId returns None."""
# Act
found_content = await in_memory_repository.find_document_content_by_id("invalid_id")
# Assert
assert found_content is None
@pytest.mark.asyncio
async def test_i_cannot_find_document_content_by_nonexistent_id(self, in_memory_repository):
"""Test that nonexistent but valid ObjectId returns None."""
# Arrange
nonexistent_id = str(ObjectId())
# Act
found_content = await in_memory_repository.find_document_content_by_id(nonexistent_id)
# Assert
assert found_content is None
@pytest.mark.asyncio
async def test_i_can_find_document_content_by_file_hash(self, in_memory_repository, sample_document_content):
"""Test finding document content by file hash."""
# Arrange
created_content = await in_memory_repository.create_document_content(sample_document_content)
# Act
found_content = await in_memory_repository.find_document_content_by_file_hash(sample_document_content.file_hash)
# Assert
assert found_content is not None
assert found_content.file_hash == created_content.file_hash
assert found_content.id == created_content.id
@pytest.mark.asyncio
async def test_i_cannot_find_document_content_by_nonexistent_file_hash(self, in_memory_repository):
"""Test that nonexistent file hash returns None."""
# Act
found_content = await in_memory_repository.find_document_content_by_file_hash("nonexistent_hash")
# Assert
assert found_content is None
class TestDocumentContentRepositoryUpdate:
"""Tests for document content update functionality."""
@pytest.mark.asyncio
async def test_i_can_update_document_content(self, in_memory_repository, sample_document_content):
"""Test successful document content update."""
# Arrange
created_content = await in_memory_repository.create_document_content(sample_document_content)
update_data = {
"content": "Updated content for testing",
"encoding": "utf-16",
"mime_type": "text/html"
}
# Act
updated_content = await in_memory_repository.update_document_content(str(created_content.id), update_data)
# Assert
assert updated_content is not None
assert updated_content.content == update_data["content"]
assert updated_content.encoding == update_data["encoding"]
assert updated_content.mime_type == update_data["mime_type"]
assert updated_content.id == created_content.id
assert updated_content.file_hash == created_content.file_hash # Should remain unchanged
@pytest.mark.asyncio
async def test_i_cannot_update_document_content_with_invalid_id(self, in_memory_repository):
"""Test that updating with invalid ID returns None."""
# Act
result = await in_memory_repository.update_document_content("invalid_id", {"content": "test"})
# Assert
assert result is None
@pytest.mark.asyncio
async def test_i_can_update_document_content_with_partial_data(self, in_memory_repository, sample_document_content):
"""Test updating document content with partial data."""
# Arrange
created_content = await in_memory_repository.create_document_content(sample_document_content)
partial_update = {"encoding": "iso-8859-1"}
# Act
updated_content = await in_memory_repository.update_document_content(str(created_content.id), partial_update)
# Assert
assert updated_content is not None
assert updated_content.encoding == "iso-8859-1"
assert updated_content.content == created_content.content # Should remain unchanged
assert updated_content.mime_type == created_content.mime_type # Should remain unchanged
@pytest.mark.asyncio
async def test_i_can_update_document_content_with_empty_data(self, in_memory_repository, sample_document_content):
"""Test updating document content with empty data returns current content."""
# Arrange
created_content = await in_memory_repository.create_document_content(sample_document_content)
empty_update = {}
# Act
result = await in_memory_repository.update_document_content(str(created_content.id), empty_update)
# Assert
assert result is not None
assert result.content == created_content.content
assert result.encoding == created_content.encoding
assert result.mime_type == created_content.mime_type
class TestDocumentContentRepositoryDeletion:
"""Tests for document content deletion functionality."""
@pytest.mark.asyncio
async def test_i_can_delete_document_content(self, in_memory_repository, sample_document_content):
"""Test successful document content deletion."""
# Arrange
created_content = await in_memory_repository.create_document_content(sample_document_content)
# Act
deletion_result = await in_memory_repository.delete_document_content(str(created_content.id))
# Assert
assert deletion_result is True
# Verify content is actually deleted
found_content = await in_memory_repository.find_document_content_by_id(str(created_content.id))
assert found_content is None
@pytest.mark.asyncio
async def test_i_cannot_delete_document_content_with_invalid_id(self, in_memory_repository):
"""Test that deleting with invalid ID returns False."""
# Act
result = await in_memory_repository.delete_document_content("invalid_id")
# Assert
assert result is False
@pytest.mark.asyncio
async def test_i_cannot_delete_nonexistent_document_content(self, in_memory_repository):
"""Test that deleting nonexistent document content returns False."""
# Arrange
nonexistent_id = str(ObjectId())
# Act
result = await in_memory_repository.delete_document_content(nonexistent_id)
# Assert
assert result is False
class TestDocumentContentRepositoryUtilities:
"""Tests for utility methods."""
@pytest.mark.asyncio
async def test_i_can_check_content_exists(self, in_memory_repository, sample_document_content):
"""Test checking if document content exists by file hash."""
# Arrange
await in_memory_repository.create_document_content(sample_document_content)
# Act
exists = await in_memory_repository.content_exists(sample_document_content.file_hash)
not_exists = await in_memory_repository.content_exists("nonexistent_hash")
# Assert
assert exists is True
assert not_exists is False
@pytest.mark.asyncio
async def test_i_can_list_document_contents(self, in_memory_repository, sample_document_content,
another_document_content):
"""Test listing document contents with pagination."""
# Arrange
await in_memory_repository.create_document_content(sample_document_content)
await in_memory_repository.create_document_content(another_document_content)
# Act
all_contents = await in_memory_repository.list_document_contents()
limited_contents = await in_memory_repository.list_document_contents(skip=0, limit=1)
# Assert
assert len(all_contents) == 2
assert len(limited_contents) == 1
assert all(isinstance(content, DocumentContent) for content in all_contents)
@pytest.mark.asyncio
async def test_i_can_count_document_contents(self, in_memory_repository, sample_document_content,
another_document_content):
"""Test counting document contents."""
# Arrange
initial_count = await in_memory_repository.count_document_contents()
await in_memory_repository.create_document_content(sample_document_content)
await in_memory_repository.create_document_content(another_document_content)
# Act
final_count = await in_memory_repository.count_document_contents()
# Assert
assert final_count == initial_count + 2

View File

@@ -1,566 +0,0 @@
"""
Test suite for FileDocumentRepository with async/await support.
This module contains comprehensive tests for all FileDocumentRepository methods
using mongomock-motor for in-memory MongoDB testing.
"""
import pytest
from datetime import datetime
from typing import Dict, Any
import pytest_asyncio
from bson import ObjectId
from pymongo.errors import DuplicateKeyError, PyMongoError
from mongomock_motor import AsyncMongoMockClient
from app.database.repositories.document_repository import FileDocumentRepository
from app.models.document import FileDocument, FileType
@pytest_asyncio.fixture
async def in_memory_repository():
"""Create an in-memory FileDocumentRepository for testing."""
client = AsyncMongoMockClient()
db = client.test_database
repo = FileDocumentRepository(db)
# repo.db = db
# repo.collection = db.files
await repo.initialize()
return repo
@pytest.fixture
def sample_file_document():
"""Sample FileDocument data for testing."""
return FileDocument(
filename="test_document.pdf",
filepath="/path/to/test_document.pdf",
file_hash="a1b2c3d4e5f6789012345678901234567890abcdef1234567890abcdef123456",
file_type=FileType("pdf"),
detected_at=datetime.now(),
)
@pytest.fixture
def sample_update_data():
"""Sample update data for testing."""
return {
"metadata": {"tags": ["updated", "document"]},
"file_type": FileType("txt"),
}
@pytest.fixture
def multiple_sample_documents():
"""Multiple FileDocument objects for list/search testing."""
base_time = datetime.now()
return [
FileDocument(
filename="document1.pdf",
filepath="/path/to/document1.pdf",
file_hash="hash1" + "0" * 58,
file_type=FileType("pdf"),
detected_at=base_time,
),
FileDocument(
filename="similar_document.pdf",
filepath="/path/to/similar_document.pdf",
file_hash="hash2" + "0" * 58,
file_type=FileType("pdf"),
detected_at=base_time,
),
FileDocument(
filename="completely_different.txt",
filepath="/path/to/completely_different.txt",
file_hash="hash3" + "0" * 58,
file_type=FileType("pdf"),
detected_at=base_time,
)
]
class TestFileDocumentRepositoryInitialization:
"""Tests for repository initialization."""
@pytest.mark.asyncio
async def test_i_can_initialize_repository(self):
"""Test repository initialization."""
# Arrange
client = AsyncMongoMockClient()
db = client.test_database
repo = FileDocumentRepository(db)
await repo.initialize()
# Act & Assert (should not raise any exception)
assert repo.db is not None
assert repo.collection is not None
# TODO : check that the indexes are create
class TestFileDocumentRepositoryCreation:
"""Tests for file document creation functionality."""
@pytest.mark.asyncio
async def test_i_can_create_document(self, in_memory_repository, sample_file_document):
"""Test successful file document creation."""
# Act
created_doc = await in_memory_repository.create_document(sample_file_document)
# Assert
assert created_doc is not None
assert created_doc.filename == sample_file_document.filename
assert created_doc.filepath == sample_file_document.filepath
assert created_doc.file_hash == sample_file_document.file_hash
assert created_doc.file_type == sample_file_document.file_type
assert created_doc.id is not None
assert isinstance(created_doc.id, ObjectId)
@pytest.mark.asyncio
async def test_i_can_create_document_without_id(self, in_memory_repository, sample_file_document):
"""Test creating document with _id set to None (should be removed)."""
# Arrange
sample_file_document.id = None
# Act
created_doc = await in_memory_repository.create_document(sample_file_document)
# Assert
assert created_doc is not None
assert created_doc.id is not None
assert isinstance(created_doc.id, ObjectId)
@pytest.mark.asyncio
async def test_i_cannot_create_duplicate_document(self, in_memory_repository, sample_file_document):
"""Test that creating document with duplicate hash raises DuplicateKeyError."""
# Arrange
await in_memory_repository.create_document(sample_file_document)
duplicate_doc = FileDocument(
filename="different_name.pdf",
filepath=sample_file_document.filepath,
file_hash="different_hash" + "0" * 58,
file_type=FileType("pdf"),
detected_at=datetime.now()
)
# Act & Assert
with pytest.raises(DuplicateKeyError) as exc_info:
await in_memory_repository.create_document(duplicate_doc)
assert "already exists" in str(exc_info.value)
@pytest.mark.asyncio
async def test_i_cannot_create_document_with_pymongo_error(self, in_memory_repository, sample_file_document, mocker):
"""Test handling of PyMongo errors during document creation."""
# Arrange
mocker.patch.object(in_memory_repository.collection, 'insert_one', side_effect=PyMongoError("Database error"))
# Act & Assert
with pytest.raises(ValueError) as exc_info:
await in_memory_repository.create_document(sample_file_document)
assert "Failed to create file document" in str(exc_info.value)
class TestFileDocumentRepositoryFinding:
"""Tests for file document finding functionality."""
@pytest.mark.asyncio
async def test_i_can_find_document_by_valid_id(self, in_memory_repository, sample_file_document):
"""Test finding document by valid ObjectId."""
# Arrange
created_doc = await in_memory_repository.create_document(sample_file_document)
# Act
found_doc = await in_memory_repository.find_document_by_id(str(created_doc.id))
# Assert
assert found_doc is not None
assert found_doc.id == created_doc.id
assert found_doc.filename == created_doc.filename
assert found_doc.file_hash == created_doc.file_hash
@pytest.mark.asyncio
async def test_i_cannot_find_document_with_invalid_id(self, in_memory_repository):
"""Test that invalid ObjectId returns None."""
# Act
found_doc = await in_memory_repository.find_document_by_id("invalid_id")
# Assert
assert found_doc is None
@pytest.mark.asyncio
async def test_i_cannot_find_document_by_nonexistent_id(self, in_memory_repository):
"""Test that nonexistent but valid ObjectId returns None."""
# Arrange
nonexistent_id = str(ObjectId())
# Act
found_doc = await in_memory_repository.find_document_by_id(nonexistent_id)
# Assert
assert found_doc is None
@pytest.mark.asyncio
async def test_i_can_find_document_by_hash(self, in_memory_repository, sample_file_document):
"""Test finding document by file hash."""
# Arrange
created_doc = await in_memory_repository.create_document(sample_file_document)
# Act
found_doc = await in_memory_repository.find_document_by_hash(sample_file_document.file_hash)
# Assert
assert found_doc is not None
assert found_doc.file_hash == created_doc.file_hash
assert found_doc.id == created_doc.id
@pytest.mark.asyncio
async def test_i_cannot_find_document_with_nonexistent_hash(self, in_memory_repository):
"""Test that nonexistent hash returns None."""
# Act
found_doc = await in_memory_repository.find_document_by_hash("nonexistent_hash")
# Assert
assert found_doc is None
@pytest.mark.asyncio
async def test_i_can_find_document_by_filepath(self, in_memory_repository, sample_file_document):
"""Test finding document by exact filepath."""
# Arrange
created_doc = await in_memory_repository.create_document(sample_file_document)
# Act
found_doc = await in_memory_repository.find_document_by_filepath(sample_file_document.filepath)
# Assert
assert found_doc is not None
assert found_doc.filepath == created_doc.filepath
assert found_doc.id == created_doc.id
@pytest.mark.asyncio
async def test_i_cannot_find_document_with_nonexistent_filepath(self, in_memory_repository):
"""Test that nonexistent filepath returns None."""
# Act
found_doc = await in_memory_repository.find_document_by_filepath("/nonexistent/path.pdf")
# Assert
assert found_doc is None
class TestFileDocumentRepositoryFuzzySearch:
"""Tests for fuzzy search functionality by filename."""
@pytest.mark.asyncio
async def test_i_can_find_documents_by_exact_name(self, in_memory_repository, multiple_sample_documents):
"""Test finding documents with exact filename match."""
# Arrange
for doc in multiple_sample_documents:
await in_memory_repository.create_document(doc)
# Act
found_docs = await in_memory_repository.find_document_by_name("document1.pdf")
# Assert
assert len(found_docs) == 1
assert found_docs[0].filename == "document1.pdf"
@pytest.mark.asyncio
async def test_i_can_find_documents_by_fuzzy_name(self, in_memory_repository, multiple_sample_documents):
"""Test finding documents with fuzzy matching using default threshold."""
# Arrange
for doc in multiple_sample_documents:
await in_memory_repository.create_document(doc)
# Act
found_docs = await in_memory_repository.find_document_by_name("document")
# Assert
assert len(found_docs) >= 2 # Should find document1.pdf and similar_document.pdf
filenames = [doc.filename for doc in found_docs]
assert "document1.pdf" in filenames
assert "similar_document.pdf" in filenames
@pytest.mark.asyncio
async def test_i_cannot_find_documents_by_name_with_pymongo_error(self, in_memory_repository, mocker):
"""Test handling of PyMongo errors during name search."""
# Arrange
mocker.patch.object(in_memory_repository.collection, 'find', side_effect=PyMongoError("Database error"))
# Act
found_docs = await in_memory_repository.find_document_by_name("test")
# Assert
assert found_docs == []
class TestFileDocumentRepositoryListing:
"""Tests for document listing functionality."""
@pytest.mark.asyncio
async def test_i_can_list_documents_with_default_pagination(self, in_memory_repository, multiple_sample_documents):
"""Test listing documents with default pagination."""
# Arrange
for doc in multiple_sample_documents:
await in_memory_repository.create_document(doc)
# Act
docs = await in_memory_repository.list_documents()
# Assert
assert len(docs) == len(multiple_sample_documents)
assert all(isinstance(doc, FileDocument) for doc in docs)
@pytest.mark.asyncio
async def test_i_can_list_documents_with_custom_pagination(self, in_memory_repository, multiple_sample_documents):
"""Test listing documents with custom pagination."""
# Arrange
for doc in multiple_sample_documents:
await in_memory_repository.create_document(doc)
# Act
docs_page1 = await in_memory_repository.list_documents(skip=0, limit=2)
docs_page2 = await in_memory_repository.list_documents(skip=2, limit=2)
# Assert
assert len(docs_page1) == 2
assert len(docs_page2) == 1 # Only 3 total documents
# Ensure no overlap between pages
page1_ids = [doc.id for doc in docs_page1]
page2_ids = [doc.id for doc in docs_page2]
assert len(set(page1_ids).intersection(set(page2_ids))) == 0
@pytest.mark.asyncio
async def test_i_can_list_documents_sorted_by_date(self, in_memory_repository, sample_file_document):
"""Test that documents are sorted by detected_at in descending order."""
# Arrange
from datetime import timedelta
# Create documents with different timestamps
doc1 = sample_file_document.model_copy()
doc1.filename = "oldest.pdf"
doc1.filepath = f"/path/to/{doc1.filename}"
doc1.file_hash = "hash1" + "0" * 58
doc1.detected_at = datetime.now() - timedelta(hours=2)
doc2 = sample_file_document.model_copy()
doc2.filename = "newest.pdf"
doc2.filepath = f"/path/to/{doc2.filename}"
doc2.file_hash = "hash2" + "0" * 58
doc2.detected_at = datetime.now()
await in_memory_repository.create_document(doc1)
await in_memory_repository.create_document(doc2)
# Act
docs = await in_memory_repository.list_documents()
# Assert
assert len(docs) == 2
assert docs[0].filename == "newest.pdf" # Most recent first
assert docs[1].filename == "oldest.pdf"
@pytest.mark.asyncio
async def test_i_can_list_empty_documents(self, in_memory_repository):
"""Test listing documents from empty collection."""
# Act
docs = await in_memory_repository.list_documents()
# Assert
assert docs == []
@pytest.mark.asyncio
async def test_i_cannot_list_documents_with_pymongo_error(self, in_memory_repository, mocker):
"""Test handling of PyMongo errors during document listing."""
# Arrange
mocker.patch.object(in_memory_repository.collection, 'find', side_effect=PyMongoError("Database error"))
# Act
docs = await in_memory_repository.list_documents()
# Assert
assert docs == []
class TestFileDocumentRepositoryUpdate:
"""Tests for document update functionality."""
@pytest.mark.asyncio
async def test_i_can_update_document_successfully(self, in_memory_repository, sample_file_document,
sample_update_data):
"""Test successful document update."""
# Arrange
created_doc = await in_memory_repository.create_document(sample_file_document)
# Act
updated_doc = await in_memory_repository.update_document(str(created_doc.id), sample_update_data)
# Assert
assert updated_doc is not None
assert updated_doc.file_type == sample_update_data["file_type"]
assert updated_doc.id == created_doc.id
assert updated_doc.filename == created_doc.filename # Unchanged fields remain
@pytest.mark.asyncio
async def test_i_can_update_document_with_partial_data(self, in_memory_repository, sample_file_document):
"""Test updating document with partial data."""
# Arrange
created_doc = await in_memory_repository.create_document(sample_file_document)
partial_update = {"file_type": FileType("txt")}
# Act
updated_doc = await in_memory_repository.update_document(str(created_doc.id), partial_update)
# Assert
assert updated_doc is not None
assert updated_doc.file_type == FileType("txt")
assert updated_doc.filename == created_doc.filename # Should remain unchanged
assert updated_doc.filepath == created_doc.filepath # Should remain unchanged
@pytest.mark.asyncio
async def test_i_can_update_document_filtering_none_values(self, in_memory_repository, sample_file_document):
"""Test that None values are filtered out from update data."""
# Arrange
created_doc = await in_memory_repository.create_document(sample_file_document)
update_with_none = {"metadata": {"tags": ["updated", "document"]}, "file_type": None}
# Act
updated_doc = await in_memory_repository.update_document(str(created_doc.id), update_with_none)
# Assert
assert updated_doc is not None
assert updated_doc.metadata == {"tags": ["updated", "document"]}
assert updated_doc.file_type == created_doc.file_type # Should remain unchanged (None filtered out)
@pytest.mark.asyncio
async def test_i_can_update_document_with_empty_data(self, in_memory_repository, sample_file_document):
"""Test updating document with empty data returns current document."""
# Arrange
created_doc = await in_memory_repository.create_document(sample_file_document)
empty_update = {}
# Act
result = await in_memory_repository.update_document(str(created_doc.id), empty_update)
# Assert
assert result is not None
assert result.filename == created_doc.filename
assert result.file_hash == created_doc.file_hash
assert result.metadata == created_doc.metadata
@pytest.mark.asyncio
async def test_i_cannot_update_document_with_invalid_id(self, in_memory_repository, sample_update_data):
"""Test that updating with invalid ID returns None."""
# Act
result = await in_memory_repository.update_document("invalid_id", sample_update_data)
# Assert
assert result is None
@pytest.mark.asyncio
async def test_i_cannot_update_nonexistent_document(self, in_memory_repository, sample_update_data):
"""Test that updating nonexistent document returns None."""
# Arrange
nonexistent_id = str(ObjectId())
# Act
result = await in_memory_repository.update_document(nonexistent_id, sample_update_data)
# Assert
assert result is None
@pytest.mark.asyncio
async def test_i_cannot_update_document_with_pymongo_error(self, in_memory_repository, sample_file_document,
sample_update_data, mocker):
"""Test handling of PyMongo errors during document update."""
# Arrange
created_doc = await in_memory_repository.create_document(sample_file_document)
mocker.patch.object(in_memory_repository.collection, 'find_one_and_update',
side_effect=PyMongoError("Database error"))
# Act
result = await in_memory_repository.update_document(str(created_doc.id), sample_update_data)
# Assert
assert result is None
class TestFileDocumentRepositoryDeletion:
"""Tests for document deletion functionality."""
@pytest.mark.asyncio
async def test_i_can_delete_existing_document(self, in_memory_repository, sample_file_document):
"""Test successful document deletion."""
# Arrange
created_doc = await in_memory_repository.create_document(sample_file_document)
# Act
deletion_result = await in_memory_repository.delete_document(str(created_doc.id))
# Assert
assert deletion_result is True
# Verify document is actually deleted
found_doc = await in_memory_repository.find_document_by_id(str(created_doc.id))
assert found_doc is None
@pytest.mark.asyncio
async def test_i_cannot_delete_document_with_invalid_id(self, in_memory_repository):
"""Test that deleting with invalid ID returns False."""
# Act
result = await in_memory_repository.delete_document("invalid_id")
# Assert
assert result is False
@pytest.mark.asyncio
async def test_i_cannot_delete_nonexistent_document(self, in_memory_repository):
"""Test that deleting nonexistent document returns False."""
# Arrange
nonexistent_id = str(ObjectId())
# Act
result = await in_memory_repository.delete_document(nonexistent_id)
# Assert
assert result is False
@pytest.mark.asyncio
async def test_i_cannot_delete_document_with_pymongo_error(self, in_memory_repository, sample_file_document, mocker):
"""Test handling of PyMongo errors during document deletion."""
# Arrange
created_doc = await in_memory_repository.create_document(sample_file_document)
mocker.patch.object(in_memory_repository.collection, 'delete_one', side_effect=PyMongoError("Database error"))
# Act
result = await in_memory_repository.delete_document(str(created_doc.id))
# Assert
assert result is False
class TestFileDocumentRepositoryUtilities:
"""Tests for utility methods."""
@pytest.mark.asyncio
async def test_i_can_count_documents(self, in_memory_repository, sample_file_document):
"""Test counting documents."""
# Arrange
initial_count = await in_memory_repository.count_documents()
await in_memory_repository.create_document(sample_file_document)
# Act
final_count = await in_memory_repository.count_documents()
# Assert
assert final_count == initial_count + 1
@pytest.mark.asyncio
async def test_i_can_count_zero_documents(self, in_memory_repository):
"""Test counting documents in empty collection."""
# Act
count = await in_memory_repository.count_documents()
# Assert
assert count == 0

View File

@@ -1,697 +0,0 @@
"""
Unit tests for DocumentService using in-memory MongoDB.
Tests the orchestration logic with real MongoDB operations
using mongomock for better integration testing.
"""
import pytest
import pytest_asyncio
from unittest.mock import Mock, patch
from datetime import datetime
from bson import ObjectId
from pathlib import Path
from mongomock_motor import AsyncMongoMockClient
from app.services.document_service import DocumentService
from app.database.repositories.document_repository import FileDocumentRepository
from app.database.repositories.document_content_repository import DocumentContentRepository
from app.models.document import FileDocument, DocumentContent, FileType, ExtractionMethod
from app.models.types import PyObjectId
@pytest_asyncio.fixture
async def in_memory_file_repository():
"""Create an in-memory FileDocumentRepository for testing."""
client = AsyncMongoMockClient()
db = client.test_database
repo = FileDocumentRepository(db)
await repo.initialize()
return repo
@pytest_asyncio.fixture
async def in_memory_content_repository():
"""Create an in-memory DocumentContentRepository for testing."""
client = AsyncMongoMockClient()
db = client.test_database
repo = DocumentContentRepository(db)
await repo.initialize()
return repo
@pytest_asyncio.fixture
async def in_memory_database():
"""Create an in-memory database for testing."""
client = AsyncMongoMockClient()
return client.test_database
@pytest_asyncio.fixture
async def document_service(in_memory_file_repository, in_memory_content_repository, in_memory_database):
"""Create DocumentService with in-memory repositories."""
with patch('app.services.document_service.get_database', return_value=in_memory_database):
service = DocumentService()
service.file_repository = in_memory_file_repository
service.content_repository = in_memory_content_repository
return service
@pytest.fixture
def sample_file_bytes():
"""Sample file content as bytes."""
return b"This is a test PDF content"
@pytest.fixture
def sample_text_bytes():
"""Sample text file content as bytes."""
return b"This is a test text file content"
@pytest.fixture
def sample_file_hash():
"""Expected SHA256 hash for sample file bytes."""
import hashlib
return hashlib.sha256(b"This is a test PDF content").hexdigest()
@pytest.fixture
def sample_file_document():
"""Sample FileDocument for testing."""
return FileDocument(
id=ObjectId(),
filename="test.pdf",
filepath="/test/test.pdf",
file_type=FileType.PDF,
extraction_method=None,
metadata={},
detected_at=datetime(2024, 1, 15, 10, 30, 0),
file_hash="test_hash"
)
class TestCreateDocument:
"""Tests for create_document method."""
@patch('app.services.document_service.magic.from_buffer')
@patch('app.services.document_service.datetime')
@pytest.mark.asyncio
async def test_i_can_create_document_with_new_content(
self,
mock_datetime,
mock_magic,
document_service,
sample_file_bytes
):
"""Test creating document when content doesn't exist yet."""
# Setup mocks
fixed_time = datetime(2024, 1, 15, 10, 30, 0)
mock_datetime.utcnow.return_value = fixed_time
mock_magic.return_value = "application/pdf"
# Execute
result = await document_service.create_document(
"/test/test.pdf",
sample_file_bytes,
"utf-8"
)
# Verify document creation
assert result is not None
assert result.filename == "test.pdf"
assert result.filepath == "/test/test.pdf"
assert result.file_type == FileType.PDF
assert result.detected_at == fixed_time
assert result.file_hash == document_service._calculate_file_hash(sample_file_bytes)
# Verify content was created
content = await document_service.content_repository.find_document_content_by_file_hash(
result.file_hash
)
assert content is not None
assert content.file_hash == result.file_hash
assert content.file_size == len(sample_file_bytes)
assert content.mime_type == "application/pdf"
assert content.encoding == "utf-8"
@patch('app.services.document_service.magic.from_buffer')
@patch('app.services.document_service.datetime')
@pytest.mark.asyncio
async def test_i_can_create_document_with_existing_content(
self,
mock_datetime,
mock_magic,
document_service,
sample_file_bytes
):
"""Test creating document when content already exists (deduplication)."""
# Setup mocks
fixed_time = datetime(2024, 1, 15, 10, 30, 0)
mock_datetime.utcnow.return_value = fixed_time
mock_magic.return_value = "application/pdf"
# Create first document
first_doc = await document_service.create_document(
"/test/first.pdf",
sample_file_bytes,
"utf-8"
)
# Create second document with same content
second_doc = await document_service.create_document(
"/test/second.pdf",
sample_file_bytes,
"utf-8"
)
# Verify both documents exist but share same hash
assert first_doc.file_hash == second_doc.file_hash
assert first_doc.filename != second_doc.filename
assert first_doc.filepath != second_doc.filepath
# Verify only one content document exists
all_content = await document_service.content_repository.list_document_content()
content_for_hash = [c for c in all_content if c.file_hash == first_doc.file_hash]
assert len(content_for_hash) == 1
@patch('app.services.document_service.magic.from_buffer')
@pytest.mark.asyncio
async def test_i_can_create_document_with_different_encodings(
self,
mock_magic,
document_service,
sample_text_bytes
):
"""Test creating documents with different text encodings."""
# Setup
mock_magic.return_value = "text/plain"
# Test with different encodings
encodings = ["utf-8", "latin-1", "ascii"]
for i, encoding in enumerate(encodings):
result = await document_service.create_document(
f"/test/test{i}.txt",
sample_text_bytes,
encoding
)
# Verify document was created
assert result is not None
assert result.file_type == FileType.TXT
# Verify content has correct encoding
content = await document_service.content_repository.find_document_content_by_file_hash(
result.file_hash
)
assert content.encoding == encoding
@pytest.mark.asyncio
async def test_i_cannot_create_document_with_unsupported_file_type(
self,
document_service,
sample_file_bytes
):
"""Test that unsupported file types raise ValueError."""
with pytest.raises(ValueError, match="Unsupported file type"):
await document_service.create_document(
"/test/test.xyz", # Unsupported extension
sample_file_bytes,
"utf-8"
)
@pytest.mark.asyncio
async def test_i_cannot_create_document_with_empty_file_path(
self,
document_service,
sample_file_bytes
):
"""Test that empty file path raises ValueError."""
with pytest.raises(ValueError):
await document_service.create_document(
"", # Empty path
sample_file_bytes,
"utf-8"
)
@patch('app.services.document_service.magic.from_buffer')
@pytest.mark.asyncio
async def test_i_can_create_document_with_empty_bytes(
self,
mock_magic,
document_service
):
"""Test behavior with empty file bytes."""
# Setup
mock_magic.return_value = "text/plain"
# Execute with empty bytes
result = await document_service.create_document(
"/test/empty.txt",
b"", # Empty bytes
"utf-8"
)
# Should still work but with zero file size
assert result is not None
content = await document_service.content_repository.find_document_content_by_file_hash(
result.file_hash
)
assert content.file_size == 0
class TestGetMethods:
"""Tests for document retrieval methods."""
@patch('app.services.document_service.magic.from_buffer')
@pytest.mark.asyncio
async def test_i_can_get_document_by_id(
self,
mock_magic,
document_service,
sample_file_bytes
):
"""Test retrieving document by ID."""
# Setup
mock_magic.return_value = "application/pdf"
# Create a document first
created_doc = await document_service.create_document(
"/test/test.pdf",
sample_file_bytes,
"utf-8"
)
# Execute
result = await document_service.get_document_by_id(created_doc.id)
# Verify
assert result is not None
assert result.id == created_doc.id
assert result.filename == created_doc.filename
@patch('app.services.document_service.magic.from_buffer')
@pytest.mark.asyncio
async def test_i_can_get_document_by_hash(
self,
mock_magic,
document_service,
sample_file_bytes
):
"""Test retrieving document by file hash."""
# Setup
mock_magic.return_value = "application/pdf"
# Create a document first
created_doc = await document_service.create_document(
"/test/test.pdf",
sample_file_bytes,
"utf-8"
)
# Execute
result = await document_service.get_document_by_hash(created_doc.file_hash)
# Verify
assert result is not None
assert result.file_hash == created_doc.file_hash
assert result.filename == created_doc.filename
@patch('app.services.document_service.magic.from_buffer')
@pytest.mark.asyncio
async def test_i_can_get_document_by_filepath(
self,
mock_magic,
document_service,
sample_file_bytes
):
"""Test retrieving document by file path."""
# Setup
mock_magic.return_value = "application/pdf"
test_path = "/test/unique_test.pdf"
# Create a document first
created_doc = await document_service.create_document(
test_path,
sample_file_bytes,
"utf-8"
)
# Execute
result = await document_service.get_document_by_filepath(test_path)
# Verify
assert result is not None
assert result.filepath == test_path
assert result.id == created_doc.id
@patch('app.services.document_service.magic.from_buffer')
@pytest.mark.asyncio
async def test_i_can_get_document_with_content(
self,
mock_magic,
document_service,
sample_file_bytes
):
"""Test retrieving document with associated content."""
# Setup
mock_magic.return_value = "application/pdf"
# Create a document first
created_doc = await document_service.create_document(
"/test/test.pdf",
sample_file_bytes,
"utf-8"
)
# Execute
result = await document_service.get_document_with_content(created_doc.id)
# Verify
assert result is not None
document, content = result
assert document.id == created_doc.id
assert content is not None
assert content.file_hash == created_doc.file_hash
@pytest.mark.asyncio
async def test_i_cannot_get_nonexistent_document_by_id(
self,
document_service
):
"""Test that nonexistent document returns None."""
# Execute with random ObjectId
result = await document_service.get_document_by_id(ObjectId())
# Verify
assert result is None
@pytest.mark.asyncio
async def test_i_cannot_get_nonexistent_document_by_hash(
self,
document_service
):
"""Test that nonexistent document hash returns None."""
# Execute
result = await document_service.get_document_by_hash("nonexistent_hash")
# Verify
assert result is None
class TestPaginationAndCounting:
"""Tests for document listing and counting."""
@patch('app.services.document_service.magic.from_buffer')
@pytest.mark.asyncio
async def test_i_can_list_documents_with_pagination(
self,
mock_magic,
document_service,
sample_file_bytes
):
"""Test document listing with pagination parameters."""
# Setup
mock_magic.return_value = "application/pdf"
# Create multiple documents
for i in range(5):
await document_service.create_document(
f"/test/test{i}.pdf",
sample_file_bytes + bytes(str(i), 'utf-8'), # Make each file unique
"utf-8"
)
# Execute with pagination
result = await document_service.list_documents(skip=1, limit=2)
# Verify
assert len(result) == 2
# Test counting
total_count = await document_service.count_documents()
assert total_count == 5
@patch('app.services.document_service.magic.from_buffer')
@pytest.mark.asyncio
async def test_i_can_count_documents(
self,
mock_magic,
document_service,
sample_file_bytes
):
"""Test document counting."""
# Setup
mock_magic.return_value = "text/plain"
# Initially should be 0
initial_count = await document_service.count_documents()
assert initial_count == 0
# Create some documents
for i in range(3):
await document_service.create_document(
f"/test/test{i}.txt",
sample_file_bytes + bytes(str(i), 'utf-8'),
"utf-8"
)
# Execute
final_count = await document_service.count_documents()
# Verify
assert final_count == 3
class TestUpdateAndDelete:
"""Tests for document update and deletion operations."""
@patch('app.services.document_service.magic.from_buffer')
@pytest.mark.asyncio
async def test_i_can_update_document_metadata(
self,
mock_magic,
document_service,
sample_file_bytes
):
"""Test updating document metadata."""
# Setup
mock_magic.return_value = "application/pdf"
# Create a document first
created_doc = await document_service.create_document(
"/test/test.pdf",
sample_file_bytes,
"utf-8"
)
# Execute update
update_data = {"metadata": {"page_count": 5}}
result = await document_service.update_document(created_doc.id, update_data)
# Verify
assert result is not None
assert result.metadata.get("page_count") == 5
@patch('app.services.document_service.magic.from_buffer')
@pytest.mark.asyncio
async def test_i_can_delete_document_and_orphaned_content(
self,
mock_magic,
document_service,
sample_file_bytes
):
"""Test deleting document with orphaned content cleanup."""
# Setup
mock_magic.return_value = "application/pdf"
# Create a document
created_doc = await document_service.create_document(
"/test/test.pdf",
sample_file_bytes,
"utf-8"
)
# Verify content exists
content_before = await document_service.content_repository.find_document_content_by_file_hash(
created_doc.file_hash
)
assert content_before is not None
# Execute deletion
result = await document_service.delete_document(created_doc.id)
# Verify document and content are deleted
assert result is True
deleted_doc = await document_service.get_document_by_id(created_doc.id)
assert deleted_doc is None
content_after = await document_service.content_repository.find_document_content_by_file_hash(
created_doc.file_hash
)
assert content_after is None
@patch('app.services.document_service.magic.from_buffer')
@pytest.mark.asyncio
async def test_i_can_delete_document_without_affecting_shared_content(
self,
mock_magic,
document_service,
sample_file_bytes
):
"""Test deleting document without removing shared content."""
# Setup
mock_magic.return_value = "application/pdf"
# Create two documents with same content
doc1 = await document_service.create_document(
"/test/test1.pdf",
sample_file_bytes,
"utf-8"
)
doc2 = await document_service.create_document(
"/test/test2.pdf",
sample_file_bytes,
"utf-8"
)
# They should share the same hash
assert doc1.file_hash == doc2.file_hash
# Delete first document
result = await document_service.delete_document(doc1.id)
assert result is True
# Verify first document is deleted but content still exists
deleted_doc = await document_service.get_document_by_id(doc1.id)
assert deleted_doc is None
remaining_doc = await document_service.get_document_by_id(doc2.id)
assert remaining_doc is not None
content = await document_service.content_repository.find_document_content_by_file_hash(
doc2.file_hash
)
assert content is not None
class TestUtilityMethods:
"""Tests for utility methods."""
@patch('app.services.document_service.magic.from_buffer')
@pytest.mark.asyncio
async def test_i_can_check_content_exists(
self,
mock_magic,
document_service,
sample_file_bytes
):
"""Test checking if content exists by hash."""
# Setup
mock_magic.return_value = "application/pdf"
# Initially content doesn't exist
test_hash = "nonexistent_hash"
exists_before = await document_service.content_exists(test_hash)
assert exists_before is False
# Create a document
created_doc = await document_service.create_document(
"/test/test.pdf",
sample_file_bytes,
"utf-8"
)
# Now content should exist
exists_after = await document_service.content_exists(created_doc.file_hash)
assert exists_after is True
@patch('app.services.document_service.magic.from_buffer')
@pytest.mark.asyncio
async def test_i_can_update_document_content(
self,
mock_magic,
document_service,
sample_file_bytes
):
"""Test updating extracted document content."""
# Setup
mock_magic.return_value = "application/pdf"
# Create a document first
created_doc = await document_service.create_document(
"/test/test.pdf",
sample_file_bytes,
"utf-8"
)
# Update content
new_content = "Updated extracted content"
result = await document_service.update_document_content(
created_doc.file_hash,
new_content
)
# Verify update
assert result is not None
assert result.content == new_content
# Verify persistence
updated_content = await document_service.content_repository.find_document_content_by_file_hash(
created_doc.file_hash
)
assert updated_content.content == new_content
class TestHashCalculation:
"""Tests for file hash calculation utility."""
def test_i_can_calculate_consistent_file_hash(self, document_service):
"""Test that file hash calculation is consistent."""
test_bytes = b"Test content for hashing"
# Calculate hash multiple times
hash1 = document_service._calculate_file_hash(test_bytes)
hash2 = document_service._calculate_file_hash(test_bytes)
# Should be identical
assert hash1 == hash2
assert len(hash1) == 64 # SHA256 produces 64-character hex string
def test_i_get_different_hashes_for_different_content(self, document_service):
"""Test that different content produces different hashes."""
content1 = b"First content"
content2 = b"Second content"
hash1 = document_service._calculate_file_hash(content1)
hash2 = document_service._calculate_file_hash(content2)
assert hash1 != hash2
class TestFileTypeDetection:
"""Tests for file type detection."""
def test_i_can_detect_pdf_file_type(self, document_service):
"""Test PDF file type detection."""
file_type = document_service._detect_file_type("/path/to/document.pdf")
assert file_type == FileType.PDF
def test_i_can_detect_txt_file_type(self, document_service):
"""Test text file type detection."""
file_type = document_service._detect_file_type("/path/to/document.txt")
assert file_type == FileType.TXT
def test_i_can_detect_docx_file_type(self, document_service):
"""Test DOCX file type detection."""
file_type = document_service._detect_file_type("/path/to/document.docx")
assert file_type == FileType.DOCX
def test_i_cannot_detect_unsupported_file_type(self, document_service):
"""Test unsupported file type raises ValueError."""
with pytest.raises(ValueError, match="Unsupported file type"):
document_service._detect_file_type("/path/to/document.xyz")

0
tests/utils/__init__.py Normal file
View File

View File

@@ -14,6 +14,8 @@ def get_doc(filename: str = None):
file_hash="a1b2c3d4e5f6789012345678901234567890abcdef1234567890abcdef123456",
file_type=FileType(os.path.splitext(filename)[1].lstrip(".") or "txt"),
detected_at=datetime.now(),
file_size=1024,
mime_type="application/pdf"
)