image_management_api/src/utils/authorization.py
2025-05-25 22:04:02 +02:00

212 lines
7.4 KiB
Python

"""
Centralized authorization utilities to eliminate scattered access control logic.
This module provides reusable authorization functions that can be used across
all services and API endpoints to ensure consistent access control.
"""
import logging
from typing import Optional, Any, Dict
from fastapi import HTTPException, status
from src.models.user import UserModel
logger = logging.getLogger(__name__)
class AuthorizationError(HTTPException):
"""Custom exception for authorization failures"""
def __init__(self, detail: str, status_code: int = status.HTTP_403_FORBIDDEN):
super().__init__(status_code=status_code, detail=detail)
class AuthorizationContext:
"""Context object for authorization decisions"""
def __init__(self, user: UserModel, resource_type: str, action: str, **kwargs):
self.user = user
self.resource_type = resource_type
self.action = action
self.metadata = kwargs
def to_dict(self) -> Dict[str, Any]:
"""Convert context to dictionary for logging"""
if self.user is None:
return {
"user_id": None,
"team_id": None,
"is_admin": False,
"resource_type": self.resource_type,
"action": self.action,
**self.metadata
}
return {
"user_id": str(self.user.id),
"team_id": str(self.user.team_id),
"is_admin": self.user.is_admin,
"resource_type": self.resource_type,
"action": self.action,
**self.metadata
}
def require_admin(user: UserModel, action: str = "perform admin action") -> None:
"""
Ensure user has admin privileges
Args:
user: The user to check
action: Description of the action being performed (for error messages)
Raises:
AuthorizationError: If user is not an admin
"""
if not user.is_admin:
logger.warning(f"Non-admin user {user.id} attempted to {action}")
raise AuthorizationError(f"Admin privileges required to {action}")
def require_team_access(user: UserModel, resource_team_id: str, resource_type: str, action: str = "access") -> None:
"""
Ensure user can access resources from the specified team
Args:
user: The user requesting access
resource_team_id: The team ID of the resource
resource_type: Type of resource being accessed (for error messages)
action: Action being performed (for error messages)
Raises:
AuthorizationError: If user cannot access the resource
"""
if not user.is_admin and str(user.team_id) != str(resource_team_id):
logger.warning(
f"User {user.id} from team {user.team_id} attempted to {action} "
f"{resource_type} from team {resource_team_id}"
)
raise AuthorizationError(f"Cannot {action} {resource_type} from different team")
def require_resource_owner_or_admin(user: UserModel, resource_user_id: str, resource_type: str, action: str = "access") -> None:
"""
Ensure user owns the resource or is an admin
Args:
user: The user requesting access
resource_user_id: The user ID who owns the resource
resource_type: Type of resource being accessed
action: Action being performed
Raises:
AuthorizationError: If user is not the owner and not an admin
"""
if not user.is_admin and str(user.id) != str(resource_user_id):
logger.warning(
f"User {user.id} attempted to {action} {resource_type} "
f"owned by user {resource_user_id}"
)
raise AuthorizationError(f"Cannot {action} {resource_type} owned by another user")
def can_access_team_resource(user: UserModel, resource_team_id: str) -> bool:
"""
Check if user can access a team resource (non-throwing version)
Args:
user: The user requesting access
resource_team_id: The team ID of the resource
Returns:
True if user can access the resource
"""
return user.is_admin or str(user.team_id) == str(resource_team_id)
def can_access_user_resource(user: UserModel, resource_user_id: str) -> bool:
"""
Check if user can access a user resource (non-throwing version)
Args:
user: The user requesting access
resource_user_id: The user ID who owns the resource
Returns:
True if user can access the resource
"""
return user.is_admin or str(user.id) == str(resource_user_id)
def get_team_filter(user: UserModel) -> Optional[str]:
"""
Get team filter for queries based on user permissions
Args:
user: The user making the request
Returns:
Team ID to filter by, or None if admin (can see all teams)
"""
return None if user.is_admin else str(user.team_id)
def log_authorization_context(context: AuthorizationContext, success: bool = True) -> None:
"""
Log authorization context for audit purposes
Args:
context: Authorization context
success: Whether the authorization was successful
"""
log_data = context.to_dict()
log_data["authorization_success"] = success
if success:
logger.info(f"Authorization granted for {context.action} on {context.resource_type}", extra=log_data)
else:
logger.warning(f"Authorization denied for {context.action} on {context.resource_type}", extra=log_data)
def create_auth_context(user: UserModel, resource_type: str, action: str, **kwargs) -> AuthorizationContext:
"""
Create an authorization context for logging and tracking
Args:
user: The user making the request
resource_type: Type of resource being accessed
action: Action being performed
**kwargs: Additional metadata
Returns:
AuthorizationContext object
"""
return AuthorizationContext(user, resource_type, action, **kwargs)
# Decorator for common authorization patterns
def authorize_team_resource(resource_type: str, action: str = "access"):
"""
Decorator to authorize team resource access
Args:
resource_type: Type of resource
action: Action being performed
"""
def decorator(func):
async def wrapper(*args, **kwargs):
# Extract user and resource from function arguments
# This assumes the function signature includes user and a resource with team_id
user = None
resource_team_id = None
# Find user in arguments
for arg in args:
if isinstance(arg, UserModel):
user = arg
break
# Find resource team_id in arguments or kwargs
for arg in args:
if hasattr(arg, 'team_id'):
resource_team_id = arg.team_id
break
if 'team_id' in kwargs:
resource_team_id = kwargs['team_id']
if user and resource_team_id:
require_team_access(user, resource_team_id, resource_type, action)
return await func(*args, **kwargs)
return wrapper
return decorator