+
);
diff --git a/tests/services/test_user_service.py b/tests/services/test_user_service.py
new file mode 100644
index 0000000..a607df3
--- /dev/null
+++ b/tests/services/test_user_service.py
@@ -0,0 +1,739 @@
+"""
+Unit tests for UserService using in-memory MongoDB.
+
+Tests the business logic operations with real MongoDB operations
+using mongomock for better integration testing.
+"""
+
+import pytest
+from bson import ObjectId
+from mongomock.mongo_client import MongoClient
+
+from app.models.auth import UserRole
+from app.models.user import UserCreate, UserUpdate, UserCreateNoValidation
+from app.services.user_service import UserService
+
+
+@pytest.fixture
+def in_memory_database():
+ """Create an in-memory database for testing."""
+ client = MongoClient()
+ return client.test_database
+
+
+@pytest.fixture
+def user_service(in_memory_database):
+ """Create UserService with in-memory repositories."""
+ service = UserService(in_memory_database).initialize()
+ return service
+
+
+@pytest.fixture
+def sample_user_data():
+ """Sample user data for testing."""
+ return {
+ "username": "testuser",
+ "email": "testuser@example.com",
+ "password": "SecureP@ssw0rd123"
+ }
+
+
+@pytest.fixture
+def sample_user_data_2():
+ """Second sample user data for testing."""
+ return {
+ "username": "anotheruser",
+ "email": "anotheruser@example.com",
+ "password": "AnotherP@ssw0rd456"
+ }
+
+
+class TestCreateUser:
+ """Tests for create_user method."""
+
+ def test_i_can_create_user_with_valid_data(
+ self,
+ user_service,
+ sample_user_data
+ ):
+ """Test creating user with valid data."""
+ # Execute
+ user_create = UserCreate(**sample_user_data)
+ result = user_service.create_user(user_create)
+
+ # Verify user creation
+ assert result is not None
+ assert result.username == sample_user_data["username"]
+ assert result.email == sample_user_data["email"]
+ assert result.hashed_password is not None
+ assert result.hashed_password != sample_user_data["password"]
+ assert result.role == UserRole.USER
+ assert result.is_active is True
+ assert result.preferences == {}
+ assert result.created_at is not None
+ assert result.updated_at is not None
+
+ # Verify user exists in database
+ user_in_db = user_service.get_user_by_id(str(result.id))
+ assert user_in_db is not None
+ assert user_in_db.id == result.id
+ assert user_in_db.username == sample_user_data["username"]
+
+ def test_i_cannot_create_user_with_duplicate_username(
+ self,
+ user_service,
+ sample_user_data
+ ):
+ """Test that duplicate username raises ValueError."""
+ # Create first user
+ user_create = UserCreate(**sample_user_data)
+ user_service.create_user(user_create)
+
+ # Try to create user with same username but different email
+ duplicate_user_data = sample_user_data.copy()
+ duplicate_user_data["email"] = "different@example.com"
+ duplicate_user_create = UserCreate(**duplicate_user_data)
+
+ # Execute and verify exception
+ with pytest.raises(ValueError) as exc_info:
+ user_service.create_user(duplicate_user_create)
+
+ assert "already exists" in str(exc_info.value)
+ assert sample_user_data["username"] in str(exc_info.value)
+
+ def test_i_cannot_create_user_with_duplicate_email(
+ self,
+ user_service,
+ sample_user_data
+ ):
+ """Test that duplicate email raises ValueError."""
+ # Create first user
+ user_create = UserCreate(**sample_user_data)
+ user_service.create_user(user_create)
+
+ # Try to create user with same email but different username
+ duplicate_user_data = sample_user_data.copy()
+ duplicate_user_data["username"] = "differentuser"
+ duplicate_user_create = UserCreate(**duplicate_user_data)
+
+ # Execute and verify exception
+ with pytest.raises(ValueError) as exc_info:
+ user_service.create_user(duplicate_user_create)
+
+ assert "already exists" in str(exc_info.value)
+ assert sample_user_data["email"] in str(exc_info.value)
+
+
+class TestGetUserMethods:
+ """Tests for user retrieval methods."""
+
+ def test_i_can_get_user_by_username(
+ self,
+ user_service,
+ sample_user_data
+ ):
+ """Test retrieving user by username."""
+ # Create a user first
+ user_create = UserCreate(**sample_user_data)
+ created_user = user_service.create_user(user_create)
+
+ # Execute
+ result = user_service.get_user_by_username(sample_user_data["username"])
+
+ # Verify
+ assert result is not None
+ assert result.id == created_user.id
+ assert result.username == sample_user_data["username"]
+ assert result.email == sample_user_data["email"]
+
+ def test_i_can_get_user_by_id(
+ self,
+ user_service,
+ sample_user_data
+ ):
+ """Test retrieving user by ID."""
+ # Create a user first
+ user_create = UserCreate(**sample_user_data)
+ created_user = user_service.create_user(user_create)
+
+ # Execute
+ result = user_service.get_user_by_id(str(created_user.id))
+
+ # Verify
+ assert result is not None
+ assert result.id == created_user.id
+ assert result.username == sample_user_data["username"]
+ assert result.email == sample_user_data["email"]
+
+ def test_i_can_check_user_exists(
+ self,
+ user_service,
+ sample_user_data
+ ):
+ """Test checking if user exists."""
+ # Initially should not exist
+ assert user_service.user_exists(sample_user_data["username"]) is False
+
+ # Create a user
+ user_create = UserCreate(**sample_user_data)
+ user_service.create_user(user_create)
+
+ # Now should exist
+ assert user_service.user_exists(sample_user_data["username"]) is True
+
+ def test_i_cannot_get_nonexistent_user_by_username(
+ self,
+ user_service
+ ):
+ """Test retrieving nonexistent user by username returns None."""
+ # Execute
+ result = user_service.get_user_by_username("nonexistentuser")
+
+ # Verify
+ assert result is None
+
+ def test_i_cannot_get_nonexistent_user_by_id(
+ self,
+ user_service
+ ):
+ """Test retrieving nonexistent user by ID returns None."""
+ # Execute with random ObjectId
+ result = user_service.get_user_by_id(str(ObjectId()))
+
+ # Verify
+ assert result is None
+
+
+class TestAuthenticateUser:
+ """Tests for authenticate_user method."""
+
+ def test_i_can_authenticate_user_with_valid_credentials(
+ self,
+ user_service,
+ sample_user_data
+ ):
+ """Test authenticating user with valid credentials."""
+ # Create a user
+ user_create = UserCreate(**sample_user_data)
+ created_user = user_service.create_user(user_create)
+
+ # Execute authentication
+ result = user_service.authenticate_user(
+ sample_user_data["username"],
+ sample_user_data["password"]
+ )
+
+ # Verify
+ assert result is not None
+ assert result.id == created_user.id
+ assert result.username == sample_user_data["username"]
+
+ def test_i_cannot_authenticate_user_with_wrong_password(
+ self,
+ user_service,
+ sample_user_data
+ ):
+ """Test authenticating user with wrong password returns None."""
+ # Create a user
+ user_create = UserCreate(**sample_user_data)
+ user_service.create_user(user_create)
+
+ # Execute authentication with wrong password
+ result = user_service.authenticate_user(
+ sample_user_data["username"],
+ "WrongP@ssw0rd123"
+ )
+
+ # Verify
+ assert result is None
+
+ def test_i_cannot_authenticate_user_with_wrong_username(
+ self,
+ user_service,
+ sample_user_data
+ ):
+ """Test authenticating user with wrong username returns None."""
+ # Create a user
+ user_create = UserCreate(**sample_user_data)
+ user_service.create_user(user_create)
+
+ # Execute authentication with wrong username
+ result = user_service.authenticate_user(
+ "wrongusername",
+ sample_user_data["password"]
+ )
+
+ # Verify
+ assert result is None
+
+ def test_i_cannot_authenticate_inactive_user(
+ self,
+ user_service,
+ sample_user_data
+ ):
+ """Test authenticating inactive user returns None."""
+ # Create a user
+ user_create = UserCreate(**sample_user_data)
+ created_user = user_service.create_user(user_create)
+
+ # Deactivate the user
+ user_service.update_user(str(created_user.id), UserUpdate(is_active=False))
+
+ # Execute authentication
+ result = user_service.authenticate_user(
+ sample_user_data["username"],
+ sample_user_data["password"]
+ )
+
+ # Verify
+ assert result is None
+
+
+class TestUpdateUser:
+ """Tests for update_user method."""
+
+ def test_i_can_update_user_username(
+ self,
+ user_service,
+ sample_user_data
+ ):
+ """Test updating user username."""
+ # Create a user
+ user_create = UserCreate(**sample_user_data)
+ created_user = user_service.create_user(user_create)
+
+ # Execute update
+ new_username = "updatedusername"
+ result = user_service.update_user(
+ str(created_user.id),
+ UserUpdate(username=new_username)
+ )
+
+ # Verify
+ assert result is not None
+ assert result.username == new_username
+
+ # Verify in database
+ updated_user = user_service.get_user_by_id(str(created_user.id))
+ assert updated_user.username == new_username
+
+ def test_i_can_update_user_email(
+ self,
+ user_service,
+ sample_user_data
+ ):
+ """Test updating user email."""
+ # Create a user
+ user_create = UserCreate(**sample_user_data)
+ created_user = user_service.create_user(user_create)
+
+ # Execute update
+ new_email = "newemail@example.com"
+ result = user_service.update_user(
+ str(created_user.id),
+ UserUpdate(email=new_email)
+ )
+
+ # Verify
+ assert result is not None
+ assert result.email == new_email
+
+ # Verify in database
+ updated_user = user_service.get_user_by_id(str(created_user.id))
+ assert updated_user.email == new_email
+
+ def test_i_can_update_user_role(
+ self,
+ user_service,
+ sample_user_data
+ ):
+ """Test updating user role."""
+ # Create a user
+ user_create = UserCreate(**sample_user_data)
+ created_user = user_service.create_user(user_create)
+
+ # Execute update
+ result = user_service.update_user(
+ str(created_user.id),
+ UserUpdate(role=UserRole.ADMIN)
+ )
+
+ # Verify
+ assert result is not None
+ assert result.role == UserRole.ADMIN
+
+ # Verify in database
+ updated_user = user_service.get_user_by_id(str(created_user.id))
+ assert updated_user.role == UserRole.ADMIN
+
+ def test_i_can_update_user_is_active(
+ self,
+ user_service,
+ sample_user_data
+ ):
+ """Test updating user is_active status."""
+ # Create a user
+ user_create = UserCreate(**sample_user_data)
+ created_user = user_service.create_user(user_create)
+
+ # Execute update
+ result = user_service.update_user(
+ str(created_user.id),
+ UserUpdate(is_active=False)
+ )
+
+ # Verify
+ assert result is not None
+ assert result.is_active is False
+
+ # Verify in database
+ updated_user = user_service.get_user_by_id(str(created_user.id))
+ assert updated_user.is_active is False
+
+ def test_i_cannot_update_user_with_duplicate_username(
+ self,
+ user_service,
+ sample_user_data,
+ sample_user_data_2
+ ):
+ """Test that updating to existing username raises ValueError."""
+ # Create two users
+ user_create_1 = UserCreate(**sample_user_data)
+ user_1 = user_service.create_user(user_create_1)
+
+ user_create_2 = UserCreate(**sample_user_data_2)
+ user_2 = user_service.create_user(user_create_2)
+
+ # Try to update user_2 with user_1's username
+ with pytest.raises(ValueError) as exc_info:
+ user_service.update_user(
+ str(user_2.id),
+ UserUpdate(username=sample_user_data["username"])
+ )
+
+ assert "already taken" in str(exc_info.value)
+
+ def test_i_cannot_update_user_with_duplicate_email(
+ self,
+ user_service,
+ sample_user_data,
+ sample_user_data_2
+ ):
+ """Test that updating to existing email raises ValueError."""
+ # Create two users
+ user_create_1 = UserCreate(**sample_user_data)
+ user_1 = user_service.create_user(user_create_1)
+
+ user_create_2 = UserCreate(**sample_user_data_2)
+ user_2 = user_service.create_user(user_create_2)
+
+ # Try to update user_2 with user_1's email
+ with pytest.raises(ValueError) as exc_info:
+ user_service.update_user(
+ str(user_2.id),
+ UserUpdate(email=sample_user_data["email"])
+ )
+
+ assert "already taken" in str(exc_info.value)
+
+ def test_i_cannot_update_nonexistent_user(
+ self,
+ user_service
+ ):
+ """Test updating nonexistent user returns None."""
+ # Execute update with random ObjectId
+ result = user_service.update_user(
+ str(ObjectId()),
+ UserUpdate(username="newusername")
+ )
+
+ # Verify
+ assert result is None
+
+
+class TestDeleteUser:
+ """Tests for delete_user method."""
+
+ def test_i_can_delete_existing_user(
+ self,
+ user_service,
+ sample_user_data
+ ):
+ """Test deleting an existing user."""
+ # Create a user
+ user_create = UserCreate(**sample_user_data)
+ created_user = user_service.create_user(user_create)
+
+ # Verify user exists
+ user_before_delete = user_service.get_user_by_id(str(created_user.id))
+ assert user_before_delete is not None
+
+ # Execute deletion
+ result = user_service.delete_user(str(created_user.id))
+
+ # Verify deletion
+ assert result is True
+
+ # Verify user no longer exists
+ deleted_user = user_service.get_user_by_id(str(created_user.id))
+ assert deleted_user is None
+
+ def test_i_cannot_delete_nonexistent_user(
+ self,
+ user_service
+ ):
+ """Test deleting a nonexistent user returns False."""
+ # Execute deletion with random ObjectId
+ result = user_service.delete_user(str(ObjectId()))
+
+ # Verify
+ assert result is False
+
+
+class TestListAndCountMethods:
+ """Tests for list_users and count_users methods."""
+
+ def test_i_can_list_users(
+ self,
+ user_service,
+ sample_user_data,
+ sample_user_data_2
+ ):
+ """Test listing all users."""
+ # Create multiple users
+ user_create_1 = UserCreate(**sample_user_data)
+ user_1 = user_service.create_user(user_create_1)
+
+ user_create_2 = UserCreate(**sample_user_data_2)
+ user_2 = user_service.create_user(user_create_2)
+
+ # Execute
+ result = user_service.list_users()
+
+ # Verify
+ assert len(result) == 2
+ usernames = [user.username for user in result]
+ assert sample_user_data["username"] in usernames
+ assert sample_user_data_2["username"] in usernames
+
+ def test_i_can_list_users_with_pagination(
+ self,
+ user_service
+ ):
+ """Test listing users with pagination."""
+ # Create 5 users
+ for i in range(5):
+ user_data = UserCreateNoValidation(
+ username=f"user{i}",
+ email=f"user{i}@example.com",
+ password="SecureP@ssw0rd123"
+ )
+ user_service.create_user(user_data)
+
+ # Test skip and limit
+ result_page_1 = user_service.list_users(skip=0, limit=2)
+ assert len(result_page_1) == 2
+
+ result_page_2 = user_service.list_users(skip=2, limit=2)
+ assert len(result_page_2) == 2
+
+ result_page_3 = user_service.list_users(skip=4, limit=2)
+ assert len(result_page_3) == 1
+
+ # Verify different users in each page
+ page_1_usernames = [user.username for user in result_page_1]
+ page_2_usernames = [user.username for user in result_page_2]
+ assert page_1_usernames != page_2_usernames
+
+ def test_i_can_count_users(
+ self,
+ user_service,
+ sample_user_data,
+ sample_user_data_2
+ ):
+ """Test counting users."""
+ # Initially no users
+ assert user_service.count_users() == 0
+
+ # Create first user
+ user_create_1 = UserCreate(**sample_user_data)
+ user_service.create_user(user_create_1)
+ assert user_service.count_users() == 1
+
+ # Create second user
+ user_create_2 = UserCreate(**sample_user_data_2)
+ user_service.create_user(user_create_2)
+ assert user_service.count_users() == 2
+
+ def test_list_users_returns_empty_list_when_no_users(
+ self,
+ user_service
+ ):
+ """Test listing users returns empty list when no users exist."""
+ # Execute
+ result = user_service.list_users()
+
+ # Verify
+ assert result == []
+
+
+class TestUserPreferences:
+ """Tests for user preferences methods."""
+
+ def test_i_can_get_user_preference(
+ self,
+ user_service,
+ sample_user_data
+ ):
+ """Test getting user preference."""
+ # Create a user with preferences
+ user_create = UserCreate(**sample_user_data)
+ created_user = user_service.create_user(user_create)
+
+ # Set a preference
+ user_service.set_preference(str(created_user.id), "theme", "dark")
+
+ # Execute
+ result = user_service.get_preference(str(created_user.id), "theme")
+
+ # Verify
+ assert result == "dark"
+
+ def test_i_can_set_user_preference(
+ self,
+ user_service,
+ sample_user_data
+ ):
+ """Test setting user preference."""
+ # Create a user
+ user_create = UserCreate(**sample_user_data)
+ created_user = user_service.create_user(user_create)
+
+ # Execute
+ result = user_service.set_preference(str(created_user.id), "language", "fr")
+
+ # Verify
+ assert result is not None
+ assert result.preferences.get("language") == "fr"
+
+ # Verify in database
+ updated_user = user_service.get_user_by_id(str(created_user.id))
+ assert updated_user.preferences.get("language") == "fr"
+
+ def test_i_cannot_get_preference_for_nonexistent_user(
+ self,
+ user_service
+ ):
+ """Test getting preference for nonexistent user returns None."""
+ # Execute with random ObjectId
+ result = user_service.get_preference(str(ObjectId()), "theme")
+
+ # Verify
+ assert result is None
+
+ def test_i_cannot_set_preference_for_nonexistent_user(
+ self,
+ user_service
+ ):
+ """Test setting preference for nonexistent user returns None."""
+ # Execute with random ObjectId
+ result = user_service.set_preference(str(ObjectId()), "theme", "dark")
+
+ # Verify
+ assert result is None
+
+ def test_get_preference_returns_none_for_nonexistent_key(
+ self,
+ user_service,
+ sample_user_data
+ ):
+ """Test getting nonexistent preference key returns None."""
+ # Create a user
+ user_create = UserCreate(**sample_user_data)
+ created_user = user_service.create_user(user_create)
+
+ # Execute
+ result = user_service.get_preference(str(created_user.id), "nonexistent_key")
+
+ # Verify
+ assert result is None
+
+
+class TestUserLifecycle:
+ """Tests for complete user lifecycle scenarios."""
+
+ def test_complete_user_lifecycle(
+ self,
+ user_service,
+ sample_user_data
+ ):
+ """Test complete user lifecycle: create → authenticate → update → preferences → delete."""
+ # Create user
+ user_create = UserCreate(**sample_user_data)
+ created_user = user_service.create_user(user_create)
+ assert created_user is not None
+ assert created_user.username == sample_user_data["username"]
+
+ # Authenticate user
+ authenticated_user = user_service.authenticate_user(
+ sample_user_data["username"],
+ sample_user_data["password"]
+ )
+ assert authenticated_user is not None
+ assert authenticated_user.id == created_user.id
+
+ # Update user
+ updated_user = user_service.update_user(
+ str(created_user.id),
+ UserUpdate(role=UserRole.ADMIN)
+ )
+ assert updated_user.role == UserRole.ADMIN
+
+ # Set preference
+ user_with_pref = user_service.set_preference(
+ str(created_user.id),
+ "theme",
+ "dark"
+ )
+ assert user_with_pref.preferences.get("theme") == "dark"
+
+ # Get preference
+ pref_value = user_service.get_preference(str(created_user.id), "theme")
+ assert pref_value == "dark"
+
+ # Delete user
+ delete_result = user_service.delete_user(str(created_user.id))
+ assert delete_result is True
+
+ # Verify user no longer exists
+ deleted_user = user_service.get_user_by_id(str(created_user.id))
+ assert deleted_user is None
+
+ def test_user_operations_with_empty_database(
+ self,
+ user_service
+ ):
+ """Test user operations when database is empty."""
+ # Try to get nonexistent user
+ result = user_service.get_user_by_id(str(ObjectId()))
+ assert result is None
+
+ # Try to get user by username
+ result = user_service.get_user_by_username("nonexistent")
+ assert result is None
+
+ # Try to list users
+ users = user_service.list_users()
+ assert users == []
+
+ # Try to count users
+ count = user_service.count_users()
+ assert count == 0
+
+ # Try to delete nonexistent user
+ delete_result = user_service.delete_user(str(ObjectId()))
+ assert delete_result is False
+
+ # Try to check user existence
+ exists = user_service.user_exists("nonexistent")
+ assert exists is False
\ No newline at end of file