Files
MyDocManager/src/file-processor/app/api/dependencies.py

101 lines
2.7 KiB
Python

import jwt
from fastapi import Depends, HTTPException
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from jwt import InvalidTokenError
from starlette import status
from app.config import settings
from app.database.connection import get_database
from app.models.auth import UserRole
from app.models.user import UserInDB
from app.services.auth_service import AuthService
from app.services.user_service import UserService
security = HTTPBearer()
def get_auth_service() -> AuthService:
"""Dependency to get AuthService instance."""
return AuthService()
def get_user_service() -> UserService:
"""Dependency to get UserService instance."""
database = get_database()
return UserService(database)
def get_current_user(
credentials: HTTPAuthorizationCredentials = Depends(security),
user_service: UserService = Depends(get_user_service)
) -> UserInDB:
"""
Dependency to get current authenticated user from JWT token.
Args:
credentials: HTTP Bearer credentials
user_service: Auth service instance
Returns:
User: Current authenticated user
Raises:
HTTPException: If token is invalid or user not found
"""
try:
payload = jwt.decode(
credentials.credentials,
settings.get_jwt_secret_key(),
algorithms=[settings.get_jwt_algorithm()]
)
username: str = payload.get("sub")
if username is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
except InvalidTokenError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
user = user_service.get_user_by_username(username)
if user is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
if not user.is_active:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Inactive user"
)
return user
def get_admin_user(current_user: UserInDB = Depends(get_current_user)) -> UserInDB:
"""
Dependency to ensure current user has admin role.
Args:
current_user: Current authenticated user
Returns:
User: Current user if admin
Raises:
HTTPException: If user is not admin
"""
if current_user.role != UserRole.ADMIN:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not enough permissions"
)
return current_user