t2m / src /app /utils /auth_integration_example.py
thanhkt's picture
implement core api
50a7bf0
"""
Example integration of user management utilities with FastAPI dependencies.
This module demonstrates how to integrate the user management utilities
with FastAPI dependency injection for authentication and authorization.
"""
from typing import Optional
from fastapi import Depends, HTTPException, status, Request
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from .user_utils import (
create_auth_context,
validate_user_permission,
get_session_cache
)
from ..models.user import AuthenticationContext, UserPermissions
from ..core.auth import get_clerk_manager, AuthenticationError, AuthorizationError
# Security scheme for Bearer token authentication
security = HTTPBearer()
async def get_current_user(
credentials: HTTPAuthorizationCredentials = Depends(security)
) -> AuthenticationContext:
"""
FastAPI dependency to get current authenticated user.
This dependency extracts the Bearer token from the Authorization header,
validates it with Clerk, and returns the complete authentication context.
Args:
credentials: HTTP Bearer credentials from FastAPI security
Returns:
AuthenticationContext with user, session, and permissions
Raises:
HTTPException: If authentication fails
"""
try:
token = credentials.credentials
# Check session cache first
cache = get_session_cache()
cached_context = cache.get(token)
if cached_context and cached_context.is_authenticated:
return cached_context
# Create new authentication context
auth_context = await create_auth_context(token)
# Cache the context
cache.set(token, auth_context)
return auth_context
except Exception as e:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=f"Authentication failed: {str(e)}",
headers={"WWW-Authenticate": "Bearer"}
)
async def get_verified_user(
current_user: AuthenticationContext = Depends(get_current_user)
) -> AuthenticationContext:
"""
FastAPI dependency to get verified user only.
This dependency ensures the user has verified their email address.
Args:
current_user: Current authenticated user context
Returns:
AuthenticationContext for verified user
Raises:
HTTPException: If user is not verified
"""
if not current_user.is_verified:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Email verification required"
)
return current_user
async def get_admin_user(
current_user: AuthenticationContext = Depends(get_current_user)
) -> AuthenticationContext:
"""
FastAPI dependency to get admin user only.
This dependency ensures the user has admin privileges.
Args:
current_user: Current authenticated user context
Returns:
AuthenticationContext for admin user
Raises:
HTTPException: If user is not admin
"""
if not current_user.permissions.role.value == "admin":
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Admin privileges required"
)
return current_user
def require_permission(permission: str):
"""
Create a FastAPI dependency that requires specific permission.
Args:
permission: Required permission string
Returns:
FastAPI dependency function
"""
async def permission_dependency(
current_user: AuthenticationContext = Depends(get_current_user)
) -> AuthenticationContext:
"""Check if user has required permission."""
if not current_user.can_perform_action(permission):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Permission required: {permission}"
)
return current_user
return permission_dependency
async def check_rate_limits(
current_user: AuthenticationContext = Depends(get_current_user)
) -> AuthenticationContext:
"""
FastAPI dependency to check user rate limits.
This dependency validates that the user hasn't exceeded their rate limits
for API usage.
Args:
current_user: Current authenticated user context
Returns:
AuthenticationContext if rate limits are not exceeded
Raises:
HTTPException: If rate limits are exceeded
"""
# This is a placeholder implementation
# In a real system, you would check against Redis or database
permissions = current_user.permissions
# Example rate limit check (would be implemented with Redis)
current_concurrent_jobs = 0 # Would fetch from Redis
current_daily_jobs = 0 # Would fetch from database
if current_concurrent_jobs >= permissions.max_concurrent_jobs:
raise HTTPException(
status_code=status.HTTP_429_TOO_MANY_REQUESTS,
detail="Maximum concurrent jobs exceeded",
headers={"Retry-After": "300"}
)
if current_daily_jobs >= permissions.max_daily_jobs:
raise HTTPException(
status_code=status.HTTP_429_TOO_MANY_REQUESTS,
detail="Daily job limit exceeded",
headers={"Retry-After": "86400"} # 24 hours
)
return current_user
# Example usage in FastAPI endpoints:
"""
from fastapi import APIRouter, Depends
from .auth_integration_example import (
get_current_user,
get_verified_user,
get_admin_user,
require_permission,
check_rate_limits
)
router = APIRouter()
@router.get("/profile")
async def get_user_profile(
current_user: AuthenticationContext = Depends(get_current_user)
):
'''Get current user profile.'''
return {
"user": current_user.user,
"permissions": current_user.permissions
}
@router.post("/videos/generate")
async def generate_video(
request: VideoGenerationRequest,
current_user: AuthenticationContext = Depends(get_verified_user),
_rate_check: AuthenticationContext = Depends(check_rate_limits)
):
'''Generate video - requires verified user and rate limit check.'''
# Implementation here
pass
@router.get("/admin/system/health")
async def get_system_health(
current_user: AuthenticationContext = Depends(get_admin_user)
):
'''Get system health - admin only.'''
# Implementation here
pass
@router.post("/jobs/{job_id}/cancel")
async def cancel_job(
job_id: str,
current_user: AuthenticationContext = Depends(require_permission("cancel_jobs"))
):
'''Cancel job - requires specific permission.'''
# Implementation here
pass
"""