Files
MyDocManager/src/file-processor/app/services/user_service.py

202 lines
6.1 KiB
Python

"""
User service for business logic operations.
This module provides user-related business logic including user creation,
retrieval, updates, and authentication operations with proper error handling.
"""
from typing import Optional, List
from pymongo.errors import DuplicateKeyError
from app.database.repositories.user_repository import UserRepository
from app.models.user import UserCreate, UserInDB, UserUpdate, UserCreateNoValidation
from app.services.auth_service import AuthService
class UserService:
"""
Service class for user business logic operations.
This class handles user-related operations including creation,
authentication, and data management with proper validation.
"""
def __init__(self, database):
"""
Initialize user service with repository dependency.
Args:
user_repository (UserRepository): Repository for user data operations
"""
self.db = database
self.user_repository = UserRepository(self.db)
self.auth_service = AuthService()
def initialize(self):
self.user_repository.initialize()
return self
def create_user(self, user_data: UserCreate | UserCreateNoValidation) -> UserInDB:
"""
Create a new user with business logic validation.
Args:
user_data (UserCreate): User creation data
Returns:
UserInDB: Created user with database information
Raises:
ValueError: If user already exists or validation fails
"""
# Check if user already exists
if self.user_repository.user_exists(user_data.username):
raise ValueError(f"User with username '{user_data.username}' already exists")
# Check if email already exists
existing_user = self.user_repository.find_user_by_email(user_data.email)
if existing_user:
raise ValueError(f"User with email '{user_data.email}' already exists")
try:
return self.user_repository.create_user(user_data)
except DuplicateKeyError:
raise ValueError(f"User with username '{user_data.username}' already exists")
def get_user_by_username(self, username: str) -> Optional[UserInDB]:
"""
Retrieve user by username.
Args:
username (str): Username to search for
Returns:
UserInDB or None: User if found, None otherwise
"""
return self.user_repository.find_user_by_username(username)
def get_user_by_id(self, user_id: str) -> Optional[UserInDB]:
"""
Retrieve user by ID.
Args:
user_id (str): User ID to search for
Returns:
UserInDB or None: User if found, None otherwise
"""
return self.user_repository.find_user_by_id(user_id)
def authenticate_user(self, username: str, password: str) -> Optional[UserInDB]:
"""
Authenticate user with username and password.
Args:
username (str): Username for authentication
password (str): Password for authentication
Returns:
UserInDB or None: Authenticated user if valid, None otherwise
"""
user = self.user_repository.find_user_by_username(username)
if not user:
return None
if not user.is_active:
return None
if not self.auth_service.verify_user_password(password, user.hashed_password):
return None
return user
def update_user(self, user_id: str, user_update: UserUpdate) -> Optional[UserInDB]:
"""
Update user information.
Args:
user_id (str): User ID to update
user_update (UserUpdate): Updated user data
Returns:
UserInDB or None: Updated user if successful, None otherwise
Raises:
ValueError: If username or email already exists for different user
"""
# Validate username uniqueness if being updated
if user_update.username is not None:
existing_user = self.user_repository.find_user_by_username(user_update.username)
if existing_user and str(existing_user.id) != user_id:
raise ValueError(f"Username '{user_update.username}' is already taken")
# Validate email uniqueness if being updated
if user_update.email is not None:
existing_user = self.user_repository.find_user_by_email(user_update.email)
if existing_user and str(existing_user.id) != user_id:
raise ValueError(f"Email '{user_update.email}' is already taken")
return self.user_repository.update_user(user_id, user_update)
def delete_user(self, user_id: str) -> bool:
"""
Delete user from system.
Args:
user_id (str): User ID to delete
Returns:
bool: True if user was deleted, False otherwise
"""
return self.user_repository.delete_user(user_id)
def list_users(self, skip: int = 0, limit: int = 100) -> List[UserInDB]:
"""
List users with pagination.
Args:
skip (int): Number of users to skip (default: 0)
limit (int): Maximum number of users to return (default: 100)
Returns:
List[UserInDB]: List of users
"""
return self.user_repository.list_users(skip=skip, limit=limit)
def count_users(self) -> int:
"""
Count total number of users.
Returns:
int: Total number of users in system
"""
return self.user_repository.count_users()
def user_exists(self, username: str) -> bool:
"""
Check if user exists by username.
Args:
username (str): Username to check
Returns:
bool: True if user exists, False otherwise
"""
return self.user_repository.user_exists(username)
def get_preference(self, user_id: str, preference):
user = self.get_user_by_id(user_id)
if user is None:
return None
return user.preferences.get(preference, None)
def set_preference(self, user_id: str, preference, value):
user = self.get_user_by_id(user_id)
if user is None:
return None
user.preferences[preference] = value
self.user_repository.update_user(user_id, UserUpdate(preferences=user.preferences))
return self.get_user_by_id(user_id)