2025-05-25 19:25:30 +02:00

241 lines
8.6 KiB
Python

import logging
from typing import Optional
from datetime import datetime
from fastapi import APIRouter, Depends, HTTPException, Header, Request, Query
from bson import ObjectId
from src.services.auth_service import AuthService
from src.schemas.api_key import ApiKeyCreate, ApiKeyResponse, ApiKeyWithValueResponse, ApiKeyListResponse
from src.schemas.team import TeamCreate
from src.schemas.user import UserCreate
from src.auth.security import get_current_user
from src.models.api_key import ApiKeyModel
from src.models.team import TeamModel
from src.models.user import UserModel
from src.utils.logging import log_request
logger = logging.getLogger(__name__)
router = APIRouter(tags=["Authentication"], prefix="/auth")
# Initialize service
auth_service = AuthService()
@router.post("/api-keys", response_model=ApiKeyWithValueResponse, status_code=201)
async def create_api_key(
key_data: ApiKeyCreate,
request: Request,
user_id: str = Query(..., description="User ID for the API key"),
team_id: str = Query(..., description="Team ID for the API key")
):
"""
Create a new API key for a specific user and team
This endpoint creates an API key without requiring authentication.
Both user_id and team_id must be provided as query parameters.
Args:
key_data: API key creation data including name and description
user_id: The user ID to create the key for
team_id: The team ID the user belongs to
Returns:
ApiKeyWithValueResponse: The created API key with the raw key value
Raises:
400: Invalid input data or user/team validation errors
404: User or team not found
500: Internal server error
"""
log_request(
{
"path": request.url.path,
"method": request.method,
"key_data": key_data.dict(),
"user_id": user_id,
"team_id": team_id
}
)
try:
response = await auth_service.create_api_key_for_user_and_team(user_id, team_id, key_data)
logger.info(f"API key created successfully for user {user_id} in team {team_id}")
return response
except ValueError as e:
logger.warning(f"Invalid input for API key creation: {e}")
raise HTTPException(status_code=400, detail=str(e))
except RuntimeError as e:
logger.warning(f"Resource not found for API key creation: {e}")
raise HTTPException(status_code=404, detail=str(e))
except Exception as e:
logger.error(f"Unexpected error creating API key: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
@router.post("/admin/api-keys/{user_id}", response_model=ApiKeyWithValueResponse, status_code=201)
async def create_api_key_for_user(
user_id: str,
key_data: ApiKeyCreate,
request: Request,
current_user: UserModel = Depends(get_current_user)
):
"""
Create a new API key for a specific user (admin only)
This endpoint requires admin authentication and allows creating API keys
for any user in the system.
Args:
user_id: The target user ID to create the key for
key_data: API key creation data including name and description
current_user: The authenticated admin user
Returns:
ApiKeyWithValueResponse: The created API key with the raw key value
Raises:
400: Invalid input data
403: Insufficient permissions (not admin)
404: Target user or team not found
500: Internal server error
"""
log_request(
{
"path": request.url.path,
"method": request.method,
"target_user_id": user_id,
"key_data": key_data.dict()
},
user_id=str(current_user.id),
team_id=str(current_user.team_id)
)
try:
response = await auth_service.create_api_key_for_user_by_admin(user_id, key_data, current_user)
logger.info(f"Admin {current_user.id} created API key for user {user_id}")
return response
except PermissionError as e:
logger.warning(f"Permission denied for admin API key creation: {e}")
raise HTTPException(status_code=403, detail=str(e))
except ValueError as e:
logger.warning(f"Invalid input for admin API key creation: {e}")
raise HTTPException(status_code=400, detail=str(e))
except RuntimeError as e:
logger.warning(f"Resource not found for admin API key creation: {e}")
raise HTTPException(status_code=404, detail=str(e))
except Exception as e:
logger.error(f"Unexpected error creating API key for user: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
@router.get("/api-keys", response_model=ApiKeyListResponse)
async def list_api_keys(
request: Request,
current_user: UserModel = Depends(get_current_user)
):
"""
List API keys for the current authenticated user
Returns all active and inactive API keys belonging to the authenticated user.
Args:
current_user: The authenticated user
Returns:
ApiKeyListResponse: List of API keys with metadata
Raises:
500: Internal server error
"""
log_request(
{"path": request.url.path, "method": request.method},
user_id=str(current_user.id),
team_id=str(current_user.team_id)
)
try:
response = await auth_service.list_user_api_keys(current_user)
logger.info(f"Listed {response.total} API keys for user {current_user.id}")
return response
except Exception as e:
logger.error(f"Unexpected error listing API keys: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
@router.delete("/api-keys/{key_id}", status_code=204)
async def revoke_api_key(
key_id: str,
request: Request,
current_user: UserModel = Depends(get_current_user)
):
"""
Revoke (deactivate) an API key
Deactivates the specified API key. Only the key owner or an admin can revoke keys.
Args:
key_id: The ID of the API key to revoke
current_user: The authenticated user
Returns:
None (204 No Content)
Raises:
400: Invalid key ID format
403: Insufficient permissions to revoke this key
404: API key not found
500: Internal server error
"""
log_request(
{"path": request.url.path, "method": request.method, "key_id": key_id},
user_id=str(current_user.id),
team_id=str(current_user.team_id)
)
try:
await auth_service.revoke_api_key(key_id, current_user)
logger.info(f"API key {key_id} revoked by user {current_user.id}")
return None
except ValueError as e:
logger.warning(f"Invalid input for API key revocation: {e}")
raise HTTPException(status_code=400, detail=str(e))
except RuntimeError as e:
logger.warning(f"API key not found for revocation: {e}")
raise HTTPException(status_code=404, detail=str(e))
except PermissionError as e:
logger.warning(f"Permission denied for API key revocation: {e}")
raise HTTPException(status_code=403, detail=str(e))
except Exception as e:
logger.error(f"Unexpected error revoking API key: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
@router.get("/verify", status_code=200)
async def verify_authentication(
request: Request,
current_user: UserModel = Depends(get_current_user)
):
"""
Verify the current authentication status
Validates the current API key and returns user information.
Useful for checking if an API key is still valid and active.
Args:
current_user: The authenticated user
Returns:
dict: Authentication verification response with user details
Raises:
500: Internal server error
"""
log_request(
{"path": request.url.path, "method": request.method},
user_id=str(current_user.id),
team_id=str(current_user.team_id)
)
try:
response = await auth_service.verify_user_authentication(current_user)
logger.info(f"Authentication verified for user {current_user.id}")
return response
except Exception as e:
logger.error(f"Unexpected error verifying authentication: {e}")
raise HTTPException(status_code=500, detail="Internal server error")