Working on API

This commit is contained in:
2025-09-25 22:58:31 +02:00
parent 48f5b009ae
commit 1f7ef200e7
16 changed files with 618 additions and 63 deletions

View File

View File

@@ -0,0 +1,100 @@
import jwt
from fastapi import Depends, HTTPException
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from jwt import InvalidTokenError
from starlette import status
from app.config import settings
from app.database.connection import get_database
from app.models.auth import UserRole
from app.models.user import UserInDB
from app.services.auth_service import AuthService
from app.services.user_service import UserService
security = HTTPBearer()
def get_auth_service() -> AuthService:
"""Dependency to get AuthService instance."""
return AuthService()
def get_user_service() -> UserService:
"""Dependency to get UserService instance."""
database = get_database()
return UserService(database)
def get_current_user(
credentials: HTTPAuthorizationCredentials = Depends(security),
user_service: UserService = Depends(get_user_service)
) -> UserInDB:
"""
Dependency to get current authenticated user from JWT token.
Args:
credentials: HTTP Bearer credentials
user_service: Auth service instance
Returns:
User: Current authenticated user
Raises:
HTTPException: If token is invalid or user not found
"""
try:
payload = jwt.decode(
credentials.credentials,
settings.get_jwt_secret_key(),
algorithms=[settings.get_jwt_algorithm()]
)
username: str = payload.get("sub")
if username is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
except InvalidTokenError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
user = user_service.get_user_by_username(username)
if user is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
if not user.is_active:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Inactive user"
)
return user
def get_admin_user(current_user: UserInDB = Depends(get_current_user)) -> UserInDB:
"""
Dependency to ensure current user has admin role.
Args:
current_user: Current authenticated user
Returns:
User: Current user if admin
Raises:
HTTPException: If user is not admin
"""
if current_user.role != UserRole.ADMIN:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not enough permissions"
)
return current_user

View File

@@ -0,0 +1,80 @@
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm
from app.api.dependencies import get_auth_service, get_current_user, get_user_service
from app.models.auth import LoginResponse, UserResponse
from app.models.user import UserInDB
from app.services.auth_service import AuthService
from app.services.user_service import UserService
router = APIRouter(tags=["authentication"])
@router.post("/login", response_model=LoginResponse)
def login(
form_data: OAuth2PasswordRequestForm = Depends(),
auth_service: AuthService = Depends(get_auth_service),
user_service: UserService = Depends(get_user_service)
):
"""
Authenticate user and return JWT token.
Args:
form_data: OAuth2 password form data
auth_service: Auth service instance
user_service: User service instance
Returns:
LoginResponse: JWT token and user info
Raises:
HTTPException: If authentication fails
"""
incorrect_username_or_pwd = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
user = user_service.get_user_by_username(form_data.username)
if (not user or
not user.is_active or
not auth_service.verify_user_password(form_data.password, user.hashed_password)):
raise incorrect_username_or_pwd
access_token = auth_service.create_access_token(data={"sub": user.username})
return LoginResponse(
access_token=access_token,
user=UserResponse(
_id=user.id,
username=user.username,
email=user.email,
role=user.role,
is_active=user.is_active,
created_at=user.created_at,
updated_at=user.updated_at
)
)
@router.get("/me", response_model=UserResponse)
def get_current_user_profile(current_user: UserInDB = Depends(get_current_user)):
"""
Get current user profile.
Args:
current_user: Current authenticated user
Returns:
UserResponse: Current user profile without sensitive data
"""
return UserResponse(
_id=current_user.id,
username=current_user.username,
email=current_user.email,
role=current_user.role,
is_active=current_user.is_active,
created_at=current_user.created_at,
updated_at=current_user.updated_at
)

View File

@@ -0,0 +1,172 @@
from fastapi import APIRouter, Depends, HTTPException
from starlette import status
from app.api.dependencies import get_admin_user, get_user_service
from app.models.auth import UserResponse, MessageResponse
from app.models.types import PyObjectId
from app.models.user import UserInDB, UserCreate, UserUpdate
from app.services.user_service import UserService
router = APIRouter(tags=["users"])
@router.get("", response_model=list[UserInDB])
def list_users(
admin_user: UserInDB = Depends(get_admin_user),
user_service: UserService = Depends(get_user_service)
):
"""
List all users (admin only).
Args:
admin_user: Current admin user
user_service: User service instance
Returns:
List[UserResponse]: List of all users without sensitive data
"""
return user_service.list_users()
@router.get("/{user_id}", response_model=UserResponse)
def get_user_by_id(
user_id: PyObjectId,
admin_user: UserInDB = Depends(get_admin_user),
user_service: UserService = Depends(get_user_service)
):
"""
Get specific user by ID (admin only).
Args:
user_id: User ID to retrieve
admin_user: Current admin user
user_service: User service instance
Returns:
UserResponse: User information without sensitive data
Raises:
HTTPException: If user not found
"""
user = user_service.get_user_by_id(str(user_id))
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
)
return user
@router.post("", response_model=UserResponse, status_code=status.HTTP_201_CREATED)
def create_user(
user_data: UserCreate,
admin_user: UserInDB = Depends(get_admin_user),
user_service: UserService = Depends(get_user_service)
):
"""
Create new user (admin only).
Args:
user_data: User creation data
admin_user: Current admin user
user_service: User service instance
Returns:
UserResponse: Created user information without sensitive data
Raises:
HTTPException: If user creation fails
"""
try:
user = user_service.create_user(user_data)
return UserResponse(
_id=user.id,
username=user.username,
email=user.email,
role=user.role,
is_active=user.is_active,
created_at=user.created_at,
updated_at=user.updated_at
)
except ValueError as e:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=str(e)
)
@router.put("/{user_id}", response_model=UserResponse)
def update_user(
user_id: PyObjectId,
user_data: UserUpdate,
admin_user: UserInDB = Depends(get_admin_user),
user_service: UserService = Depends(get_user_service)
):
"""
Update existing user (admin only).
Args:
user_id: User ID to update
user_data: User update data
admin_user: Current admin user
user_service: User service instance
Returns:
UserResponse: Updated user information without sensitive data
Raises:
HTTPException: If user not found or update fails
"""
try:
user = user_service.update_user(str(user_id), user_data)
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
)
return UserResponse(
_id=user.id,
username=user.username,
email=user.email,
role=user.role,
is_active=user.is_active,
created_at=user.created_at,
updated_at=user.updated_at
)
except ValueError as e:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=str(e)
)
@router.delete("/{user_id}", response_model=MessageResponse)
def delete_user(
user_id: PyObjectId,
admin_user: UserInDB = Depends(get_admin_user),
user_service: UserService = Depends(get_user_service)
):
"""
Delete user by ID (admin only).
Args:
user_id: User ID to delete
admin_user: Current admin user
user_service: User service instance
Returns:
MessageResponse: Success message
Raises:
HTTPException: If user not found or deletion fails
"""
success = user_service.delete_user(str(user_id))
if not success:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
)
return MessageResponse(message="User successfully deleted")