Fixed unit tests

This commit is contained in:
2025-09-17 21:24:03 +02:00
parent 2958d5cf82
commit da63f1b75b
7 changed files with 210 additions and 219 deletions

View File

@@ -11,7 +11,7 @@ from pymongo import MongoClient
from pymongo.database import Database
from pymongo.errors import ConnectionFailure, ServerSelectionTimeoutError
from ..config.settings import get_mongodb_url, get_mongodb_database_name
from config.settings import get_mongodb_url, get_mongodb_database_name
# Global variables for singleton pattern
_client: Optional[MongoClient] = None

View File

@@ -13,6 +13,7 @@ from pymongo.errors import DuplicateKeyError
from pymongo.collection import Collection
from app.models.user import UserCreate, UserInDB, UserUpdate
from utils.security import hash_password
class UserRepository:
@@ -64,11 +65,11 @@ class UserRepository:
user_dict = {
"username": user_data.username,
"email": user_data.email,
"hashed_password": user_data.hashed_password,
"hashed_password": hash_password(user_data.password),
"role": user_data.role,
"is_active": user_data.is_active,
"created_at": datetime.utcnow(),
"updated_at": datetime.utcnow()
"is_active": True,
"created_at": datetime.now(),
"updated_at": datetime.now()
}
try:
@@ -143,12 +144,14 @@ class UserRepository:
object_id = ObjectId(user_id)
# Build update document with only provided fields
update_data = {"updated_at": datetime.utcnow()}
update_data = {"updated_at": datetime.now()}
if user_update.username is not None:
update_data["username"] = user_update.username
if user_update.email is not None:
update_data["email"] = user_update.email
if user_update.hashed_password is not None:
update_data["hashed_password"] = user_update.hashed_password
if user_update.password is not None:
update_data["hashed_password"] = hash_password(user_update.password)
if user_update.role is not None:
update_data["role"] = user_update.role
if user_update.is_active is not None:
@@ -218,4 +221,4 @@ class UserRepository:
Returns:
bool: True if user exists, False otherwise
"""
return self.collection.count_documents({"username": username}) > 0
return self.collection.count_documents({"username": username}) > 0

View File

@@ -10,6 +10,8 @@ from pydantic import BaseModel
import redis
from celery import Celery
from database.connection import test_database_connection
# Initialize FastAPI app
app = FastAPI(
title="MyDocManager File Processor",
@@ -68,6 +70,12 @@ async def health_check():
health_status["dependencies"]["redis"] = "disconnected"
health_status["status"] = "degraded"
# check MongoDB connection
if test_database_connection():
health_status["dependencies"]["mongodb"] = "connected"
else:
health_status["dependencies"]["mongodb"] = "disconnected"
return health_status

View File

@@ -16,164 +16,165 @@ from app.models.auth import UserRole
class PyObjectId(ObjectId):
"""Custom ObjectId type for Pydantic v2 compatibility."""
@classmethod
def __get_pydantic_core_schema__(
cls, source_type: Any, handler
) -> core_schema.CoreSchema:
return core_schema.json_or_python_schema(
json_schema=core_schema.str_schema(),
python_schema=core_schema.union_schema([
core_schema.is_instance_schema(ObjectId),
core_schema.chain_schema([
core_schema.str_schema(),
core_schema.no_info_plain_validator_function(cls.validate),
])
]),
serialization=core_schema.plain_serializer_function_ser_schema(
lambda x: str(x)
),
)
@classmethod
def validate(cls, v):
if not ObjectId.is_valid(v):
raise ValueError("Invalid ObjectId")
return ObjectId(v)
"""Custom ObjectId type for Pydantic v2 compatibility."""
@classmethod
def __get_pydantic_core_schema__(
cls, source_type: Any, handler
) -> core_schema.CoreSchema:
return core_schema.json_or_python_schema(
json_schema=core_schema.str_schema(),
python_schema=core_schema.union_schema([
core_schema.is_instance_schema(ObjectId),
core_schema.chain_schema([
core_schema.str_schema(),
core_schema.no_info_plain_validator_function(cls.validate),
])
]),
serialization=core_schema.plain_serializer_function_ser_schema(
lambda x: str(x)
),
)
@classmethod
def validate(cls, v):
if not ObjectId.is_valid(v):
raise ValueError("Invalid ObjectId")
return ObjectId(v)
def validate_password_strength(password: str) -> str:
"""
Validate password meets security requirements.
Requirements:
- At least 8 characters long
- Contains at least one uppercase letter
- Contains at least one lowercase letter
- Contains at least one digit
- Contains at least one special character
Args:
password: The password string to validate
Returns:
str: The validated password
Raises:
ValueError: If password doesn't meet requirements
"""
if len(password) < 8:
raise ValueError("Password must be at least 8 characters long")
if not re.search(r'[A-Z]', password):
raise ValueError("Password must contain at least one uppercase letter")
if not re.search(r'[a-z]', password):
raise ValueError("Password must contain at least one lowercase letter")
if not re.search(r'\d', password):
raise ValueError("Password must contain at least one digit")
if not re.search(r'[!@#$%^&*()_+\-=\[\]{};:"\\|,.<>\/?]', password):
raise ValueError("Password must contain at least one special character")
return password
"""
Validate password meets security requirements.
Requirements:
- At least 8 characters long
- Contains at least one uppercase letter
- Contains at least one lowercase letter
- Contains at least one digit
- Contains at least one special character
Args:
password: The password string to validate
Returns:
str: The validated password
Raises:
ValueError: If password doesn't meet requirements
"""
if len(password) < 8:
raise ValueError("Password must be at least 8 characters long")
if not re.search(r'[A-Z]', password):
raise ValueError("Password must contain at least one uppercase letter")
if not re.search(r'[a-z]', password):
raise ValueError("Password must contain at least one lowercase letter")
if not re.search(r'\d', password):
raise ValueError("Password must contain at least one digit")
if not re.search(r'[!@#$%^&*()_+\-=\[\]{};:"\\|,.<>\/?]', password):
raise ValueError("Password must contain at least one special character")
return password
def validate_username_not_empty(username: str) -> str:
"""
Validate username is not empty or whitespace only.
Args:
username: The username string to validate
Returns:
str: The validated username
Raises:
ValueError: If username is empty or whitespace only
"""
if not username or not username.strip():
raise ValueError("Username cannot be empty or whitespace only")
return username.strip()
"""
Validate username is not empty or whitespace only.
Args:
username: The username string to validate
Returns:
str: The validated username
Raises:
ValueError: If username is empty or whitespace only
"""
if not username or not username.strip():
raise ValueError("Username cannot be empty or whitespace only")
return username.strip()
class UserCreate(BaseModel):
"""Model for creating a new user."""
username: str
email: EmailStr
password: str
role: UserRole = UserRole.USER
@field_validator('username')
@classmethod
def validate_username(cls, v):
return validate_username_not_empty(v)
@field_validator('password')
@classmethod
def validate_password(cls, v):
return validate_password_strength(v)
"""Model for creating a new user."""
username: str
email: EmailStr
password: str
role: UserRole = UserRole.USER
@field_validator('username')
@classmethod
def validate_username(cls, v):
return validate_username_not_empty(v)
@field_validator('password')
@classmethod
def validate_password(cls, v):
return validate_password_strength(v)
class UserUpdate(BaseModel):
"""Model for updating an existing user."""
username: Optional[str] = None
email: Optional[EmailStr] = None
password: Optional[str] = None
role: Optional[UserRole] = None
@field_validator('username')
@classmethod
def validate_username(cls, v):
if v is not None:
return validate_username_not_empty(v)
return v
@field_validator('password')
@classmethod
def validate_password(cls, v):
if v is not None:
return validate_password_strength(v)
return v
"""Model for updating an existing user."""
username: Optional[str] = None
email: Optional[EmailStr] = None
password: Optional[str] = None
role: Optional[UserRole] = None
is_active: Optional[bool] = None
@field_validator('username')
@classmethod
def validate_username(cls, v):
if v is not None:
return validate_username_not_empty(v)
return v
@field_validator('password')
@classmethod
def validate_password(cls, v):
if v is not None:
return validate_password_strength(v)
return v
class UserInDB(BaseModel):
"""Model for user data stored in database."""
id: PyObjectId = Field(default_factory=PyObjectId, alias="_id")
username: str
email: str
password_hash: str
role: UserRole
is_active: bool = True
created_at: datetime
updated_at: datetime
model_config = {
"populate_by_name": True,
"arbitrary_types_allowed": True,
"json_encoders": {ObjectId: str}
}
"""Model for user data stored in database."""
id: PyObjectId = Field(default_factory=PyObjectId, alias="_id")
username: str
email: str
hashed_password: str
role: UserRole
is_active: bool = True
created_at: datetime
updated_at: datetime
model_config = {
"populate_by_name": True,
"arbitrary_types_allowed": True,
"json_encoders": {ObjectId: str}
}
class UserResponse(BaseModel):
"""Model for user data in API responses (excludes password_hash)."""
id: PyObjectId = Field(alias="_id")
username: str
email: str
role: UserRole
is_active: bool
created_at: datetime
updated_at: datetime
model_config = {
"populate_by_name": True,
"arbitrary_types_allowed": True,
"json_encoders": {ObjectId: str}
}
"""Model for user data in API responses (excludes password_hash)."""
id: PyObjectId = Field(alias="_id")
username: str
email: str
role: UserRole
is_active: bool
created_at: datetime
updated_at: datetime
model_config = {
"populate_by_name": True,
"arbitrary_types_allowed": True,
"json_encoders": {ObjectId: str}
}