Added frontend. Working on user management

This commit is contained in:
2025-09-16 22:58:28 +02:00
parent 10650420ef
commit 2958d5cf82
31 changed files with 5101 additions and 15 deletions

View File

@@ -0,0 +1,85 @@
"""
Application configuration settings.
This module handles environment variables and application configuration
using simple os.getenv() approach without external validation libraries.
"""
import os
from typing import Optional
def get_mongodb_url() -> str:
"""
Get MongoDB connection URL from environment variables.
Returns:
str: MongoDB connection URL
Returns default localhost URL if MONGODB_URL not set.
"""
return os.getenv("MONGODB_URL", "mongodb://localhost:27017")
def get_mongodb_database_name() -> str:
"""
Get MongoDB database name from environment variables.
Returns:
str: MongoDB database name
"""
return os.getenv("MONGODB_DATABASE", "mydocmanager")
def get_jwt_secret_key() -> str:
"""
Get JWT secret key from environment variables.
Returns:
str: JWT secret key
Raises:
ValueError: If JWT_SECRET is not set in production
"""
secret = os.getenv("JWT_SECRET")
if not secret:
# For development, provide a default key with warning
if os.getenv("ENVIRONMENT", "development") == "development":
print("WARNING: Using default JWT secret key for development")
return "dev-secret-key-change-in-production"
else:
raise ValueError("JWT_SECRET environment variable must be set in production")
return secret
def get_jwt_algorithm() -> str:
"""
Get JWT algorithm from environment variables.
Returns:
str: JWT algorithm (default: HS256)
"""
return os.getenv("JWT_ALGORITHM", "HS256")
def get_jwt_expire_hours() -> int:
"""
Get JWT token expiration time in hours from environment variables.
Returns:
int: JWT expiration time in hours (default: 24)
"""
try:
return int(os.getenv("JWT_EXPIRE_HOURS", "24"))
except ValueError:
return 24
def is_development_environment() -> bool:
"""
Check if running in development environment.
Returns:
bool: True if development environment
"""
return os.getenv("ENVIRONMENT", "development").lower() == "development"

View File

@@ -0,0 +1,125 @@
"""
MongoDB database connection management.
This module handles MongoDB connection with fail-fast approach.
The application will terminate if MongoDB is not accessible at startup.
"""
import sys
from typing import Optional
from pymongo import MongoClient
from pymongo.database import Database
from pymongo.errors import ConnectionFailure, ServerSelectionTimeoutError
from ..config.settings import get_mongodb_url, get_mongodb_database_name
# Global variables for singleton pattern
_client: Optional[MongoClient] = None
_database: Optional[Database] = None
def create_mongodb_client() -> MongoClient:
"""
Create MongoDB client with connection validation.
Returns:
MongoClient: Connected MongoDB client
Raises:
SystemExit: If connection fails (fail-fast approach)
"""
mongodb_url = get_mongodb_url()
try:
# Create client with short timeout for fail-fast behavior
client = MongoClient(
mongodb_url,
serverSelectionTimeoutMS=5000, # 5 seconds timeout
connectTimeoutMS=5000,
socketTimeoutMS=5000
)
# Test connection by running admin command
client.admin.command('ping')
print(f"Successfully connected to MongoDB at {mongodb_url}")
return client
except (ConnectionFailure, ServerSelectionTimeoutError) as e:
print(f"ERROR: Failed to connect to MongoDB at {mongodb_url}")
print(f"Connection error: {str(e)}")
print("MongoDB is required for this application. Please ensure MongoDB is running and accessible.")
sys.exit(1)
except Exception as e:
print(f"ERROR: Unexpected error connecting to MongoDB: {str(e)}")
sys.exit(1)
def get_database() -> Database:
"""
Get MongoDB database instance (singleton pattern).
Returns:
Database: MongoDB database instance
This function implements singleton pattern to ensure only one
database connection is created throughout the application lifecycle.
"""
global _client, _database
if _database is None:
if _client is None:
_client = create_mongodb_client()
database_name = get_mongodb_database_name()
_database = _client[database_name]
print(f"Connected to database: {database_name}")
return _database
def close_database_connection():
"""
Close MongoDB database connection.
This function should be called during application shutdown
to properly close the database connection.
"""
global _client, _database
if _client is not None:
_client.close()
_client = None
_database = None
print("MongoDB connection closed")
def get_mongodb_client() -> Optional[MongoClient]:
"""
Get the raw MongoDB client instance.
Returns:
MongoClient or None: MongoDB client instance if connected
This is primarily for testing purposes or advanced use cases
where direct client access is needed.
"""
return _client
def test_database_connection() -> bool:
"""
Test if database connection is working.
Returns:
bool: True if connection is working, False otherwise
This function can be used for health checks.
"""
try:
db = get_database()
# Simple operation to test connection
db.command('ping')
return True
except Exception:
return False

View File

@@ -0,0 +1,221 @@
"""
User repository for MongoDB operations.
This module implements the repository pattern for user CRUD operations
with dependency injection of the database connection.
"""
from typing import Optional, List, Dict, Any
from datetime import datetime
from bson import ObjectId
from pymongo.database import Database
from pymongo.errors import DuplicateKeyError
from pymongo.collection import Collection
from app.models.user import UserCreate, UserInDB, UserUpdate
class UserRepository:
"""
Repository class for user CRUD operations in MongoDB.
This class handles all database operations related to users,
following the repository pattern with dependency injection.
"""
def __init__(self, database: Database):
"""
Initialize repository with database dependency.
Args:
database (Database): MongoDB database instance
"""
self.db = database
self.collection: Collection = database.users
# Create unique index on username for duplicate prevention
self._ensure_indexes()
def _ensure_indexes(self):
"""
Ensure required database indexes exist.
Creates unique index on username field to prevent duplicates.
"""
try:
self.collection.create_index("username", unique=True)
except Exception:
# Index might already exist, ignore error
pass
def create_user(self, user_data: UserCreate) -> UserInDB:
"""
Create a new user in the database.
Args:
user_data (UserCreate): User creation data
Returns:
UserInDB: Created user with database ID
Raises:
DuplicateKeyError: If username already exists
"""
user_dict = {
"username": user_data.username,
"email": user_data.email,
"hashed_password": user_data.hashed_password,
"role": user_data.role,
"is_active": user_data.is_active,
"created_at": datetime.utcnow(),
"updated_at": datetime.utcnow()
}
try:
result = self.collection.insert_one(user_dict)
user_dict["_id"] = result.inserted_id
return UserInDB(**user_dict)
except DuplicateKeyError:
raise DuplicateKeyError(f"User with username '{user_data.username}' already exists")
def find_user_by_username(self, username: str) -> Optional[UserInDB]:
"""
Find user by username.
Args:
username (str): Username to search for
Returns:
UserInDB or None: User if found, None otherwise
"""
user_doc = self.collection.find_one({"username": username})
if user_doc:
return UserInDB(**user_doc)
return None
def find_user_by_id(self, user_id: str) -> Optional[UserInDB]:
"""
Find user by ID.
Args:
user_id (str): User ID to search for
Returns:
UserInDB or None: User if found, None otherwise
"""
try:
object_id = ObjectId(user_id)
user_doc = self.collection.find_one({"_id": object_id})
if user_doc:
return UserInDB(**user_doc)
except Exception:
# Invalid ObjectId format
pass
return None
def find_user_by_email(self, email: str) -> Optional[UserInDB]:
"""
Find user by email address.
Args:
email (str): Email address to search for
Returns:
UserInDB or None: User if found, None otherwise
"""
user_doc = self.collection.find_one({"email": email})
if user_doc:
return UserInDB(**user_doc)
return None
def update_user(self, user_id: str, user_update: UserUpdate) -> Optional[UserInDB]:
"""
Update user information.
Args:
user_id (str): User ID to update
user_update (UserUpdate): Updated user data
Returns:
UserInDB or None: Updated user if found, None otherwise
"""
try:
object_id = ObjectId(user_id)
# Build update document with only provided fields
update_data = {"updated_at": datetime.utcnow()}
if user_update.email is not None:
update_data["email"] = user_update.email
if user_update.hashed_password is not None:
update_data["hashed_password"] = user_update.hashed_password
if user_update.role is not None:
update_data["role"] = user_update.role
if user_update.is_active is not None:
update_data["is_active"] = user_update.is_active
result = self.collection.update_one(
{"_id": object_id},
{"$set": update_data}
)
if result.matched_count > 0:
return self.find_user_by_id(user_id)
except Exception:
# Invalid ObjectId format or other errors
pass
return None
def delete_user(self, user_id: str) -> bool:
"""
Delete user from database.
Args:
user_id (str): User ID to delete
Returns:
bool: True if user was deleted, False otherwise
"""
try:
object_id = ObjectId(user_id)
result = self.collection.delete_one({"_id": object_id})
return result.deleted_count > 0
except Exception:
# Invalid ObjectId format
return False
def list_users(self, skip: int = 0, limit: int = 100) -> List[UserInDB]:
"""
List users with pagination.
Args:
skip (int): Number of users to skip (default: 0)
limit (int): Maximum number of users to return (default: 100)
Returns:
List[UserInDB]: List of users
"""
cursor = self.collection.find().skip(skip).limit(limit)
return [UserInDB(**user_doc) for user_doc in cursor]
def count_users(self) -> int:
"""
Count total number of users.
Returns:
int: Total number of users in database
"""
return self.collection.count_documents({})
def user_exists(self, username: str) -> bool:
"""
Check if user exists by username.
Args:
username (str): Username to check
Returns:
bool: True if user exists, False otherwise
"""
return self.collection.count_documents({"username": username}) > 0

View File

@@ -0,0 +1,14 @@
"""
Authentication models and enums for user management.
Contains user roles enumeration and authentication-related Pydantic models.
"""
from enum import Enum
class UserRole(str, Enum):
"""User roles enumeration with string values."""
USER = "user"
ADMIN = "admin"

View File

@@ -0,0 +1,179 @@
"""
User models and validation for the MyDocManager application.
Contains Pydantic models for user creation, updates, database representation,
and API responses with proper validation and type safety.
"""
import re
from datetime import datetime
from typing import Optional, Any
from bson import ObjectId
from pydantic import BaseModel, Field, field_validator, EmailStr
from pydantic_core import core_schema
from app.models.auth import UserRole
class PyObjectId(ObjectId):
"""Custom ObjectId type for Pydantic v2 compatibility."""
@classmethod
def __get_pydantic_core_schema__(
cls, source_type: Any, handler
) -> core_schema.CoreSchema:
return core_schema.json_or_python_schema(
json_schema=core_schema.str_schema(),
python_schema=core_schema.union_schema([
core_schema.is_instance_schema(ObjectId),
core_schema.chain_schema([
core_schema.str_schema(),
core_schema.no_info_plain_validator_function(cls.validate),
])
]),
serialization=core_schema.plain_serializer_function_ser_schema(
lambda x: str(x)
),
)
@classmethod
def validate(cls, v):
if not ObjectId.is_valid(v):
raise ValueError("Invalid ObjectId")
return ObjectId(v)
def validate_password_strength(password: str) -> str:
"""
Validate password meets security requirements.
Requirements:
- At least 8 characters long
- Contains at least one uppercase letter
- Contains at least one lowercase letter
- Contains at least one digit
- Contains at least one special character
Args:
password: The password string to validate
Returns:
str: The validated password
Raises:
ValueError: If password doesn't meet requirements
"""
if len(password) < 8:
raise ValueError("Password must be at least 8 characters long")
if not re.search(r'[A-Z]', password):
raise ValueError("Password must contain at least one uppercase letter")
if not re.search(r'[a-z]', password):
raise ValueError("Password must contain at least one lowercase letter")
if not re.search(r'\d', password):
raise ValueError("Password must contain at least one digit")
if not re.search(r'[!@#$%^&*()_+\-=\[\]{};:"\\|,.<>\/?]', password):
raise ValueError("Password must contain at least one special character")
return password
def validate_username_not_empty(username: str) -> str:
"""
Validate username is not empty or whitespace only.
Args:
username: The username string to validate
Returns:
str: The validated username
Raises:
ValueError: If username is empty or whitespace only
"""
if not username or not username.strip():
raise ValueError("Username cannot be empty or whitespace only")
return username.strip()
class UserCreate(BaseModel):
"""Model for creating a new user."""
username: str
email: EmailStr
password: str
role: UserRole = UserRole.USER
@field_validator('username')
@classmethod
def validate_username(cls, v):
return validate_username_not_empty(v)
@field_validator('password')
@classmethod
def validate_password(cls, v):
return validate_password_strength(v)
class UserUpdate(BaseModel):
"""Model for updating an existing user."""
username: Optional[str] = None
email: Optional[EmailStr] = None
password: Optional[str] = None
role: Optional[UserRole] = None
@field_validator('username')
@classmethod
def validate_username(cls, v):
if v is not None:
return validate_username_not_empty(v)
return v
@field_validator('password')
@classmethod
def validate_password(cls, v):
if v is not None:
return validate_password_strength(v)
return v
class UserInDB(BaseModel):
"""Model for user data stored in database."""
id: PyObjectId = Field(default_factory=PyObjectId, alias="_id")
username: str
email: str
password_hash: str
role: UserRole
is_active: bool = True
created_at: datetime
updated_at: datetime
model_config = {
"populate_by_name": True,
"arbitrary_types_allowed": True,
"json_encoders": {ObjectId: str}
}
class UserResponse(BaseModel):
"""Model for user data in API responses (excludes password_hash)."""
id: PyObjectId = Field(alias="_id")
username: str
email: str
role: UserRole
is_active: bool
created_at: datetime
updated_at: datetime
model_config = {
"populate_by_name": True,
"arbitrary_types_allowed": True,
"json_encoders": {ObjectId: str}
}

View File

View File

@@ -0,0 +1,74 @@
"""
Password security utilities using bcrypt for secure password hashing.
This module provides secure password hashing and verification functions
using the bcrypt algorithm with automatic salt generation.
"""
import bcrypt
from typing import Union
def hash_password(password: str) -> str:
"""
Hash a password using bcrypt with automatic salt generation.
Args:
password: The plain text password to hash
Returns:
The hashed password as a string
Raises:
ValueError: If password is empty or None
RuntimeError: If bcrypt hashing fails
"""
if not password:
raise ValueError("Password cannot be empty or None")
try:
# Encode password to bytes and generate salt
password_bytes = password.encode('utf-8')
salt = bcrypt.gensalt()
# Hash the password
hashed = bcrypt.hashpw(password_bytes, salt)
# Return as string
return hashed.decode('utf-8')
except Exception as e:
raise RuntimeError(f"Failed to hash password: {str(e)}")
def verify_password(password: str, hashed_password: str) -> bool:
"""
Verify a password against its hash.
Args:
password: The plain text password to verify
hashed_password: The hashed password to verify against
Returns:
True if password matches the hash, False otherwise
Raises:
ValueError: If password or hashed_password is empty or None
RuntimeError: If password verification fails due to malformed hash
"""
if not password or not hashed_password:
raise ValueError("Password and hashed_password cannot be empty or None")
try:
# Encode inputs to bytes
password_bytes = password.encode('utf-8')
hashed_bytes = hashed_password.encode('utf-8')
# Verify password
return bcrypt.checkpw(password_bytes, hashed_bytes)
except ValueError as e:
# bcrypt raises ValueError for malformed hashes
raise RuntimeError(f"Invalid hash format: {str(e)}")
except Exception as e:
raise RuntimeError(f"Failed to verify password: {str(e)}")