219 lines
6.5 KiB
Python
219 lines
6.5 KiB
Python
import pytest
|
|
from fastapi.testclient import TestClient
|
|
from datetime import datetime, timedelta
|
|
from bson import ObjectId
|
|
|
|
from src.models.api_key import ApiKeyModel
|
|
from src.db.repositories.api_key_repository import api_key_repository
|
|
from src.auth.security import generate_api_key
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_create_api_key(client: TestClient, admin_api_key: tuple):
|
|
"""Test creating a new API key"""
|
|
raw_key, _ = admin_api_key
|
|
|
|
# Set up the headers with the admin API key
|
|
headers = {"X-API-Key": raw_key}
|
|
|
|
# Create a new API key
|
|
response = client.post(
|
|
"/api/v1/auth/api-keys",
|
|
headers=headers,
|
|
json={
|
|
"name": "Test API Key",
|
|
"description": "A key for testing"
|
|
}
|
|
)
|
|
|
|
# Check response
|
|
assert response.status_code == 201
|
|
data = response.json()
|
|
assert "id" in data
|
|
assert "key" in data
|
|
assert data["name"] == "Test API Key"
|
|
assert data["description"] == "A key for testing"
|
|
assert "created_at" in data
|
|
assert "expiry_date" in data
|
|
assert data["is_active"] is True
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_list_api_keys(client: TestClient, user_api_key: tuple, admin_api_key: tuple):
|
|
"""Test listing API keys for a user"""
|
|
# Use the regular user's API key
|
|
raw_key, _ = user_api_key
|
|
|
|
# Set up the headers
|
|
headers = {"X-API-Key": raw_key}
|
|
|
|
# List API keys
|
|
response = client.get(
|
|
"/api/v1/auth/api-keys",
|
|
headers=headers
|
|
)
|
|
|
|
# Check response
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert "api_keys" in data
|
|
assert "total" in data
|
|
assert data["total"] == 1 # The user should have only their initial API key
|
|
|
|
# Create another API key for the user
|
|
response = client.post(
|
|
"/api/v1/auth/api-keys",
|
|
headers=headers,
|
|
json={
|
|
"name": "Another Test Key",
|
|
"description": "Another key for testing"
|
|
}
|
|
)
|
|
assert response.status_code == 201
|
|
|
|
# List API keys again
|
|
response = client.get(
|
|
"/api/v1/auth/api-keys",
|
|
headers=headers
|
|
)
|
|
|
|
# Check that there are now 2 keys
|
|
data = response.json()
|
|
assert data["total"] == 2
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_revoke_api_key(client: TestClient, user_api_key: tuple):
|
|
"""Test revoking an API key"""
|
|
raw_key, api_key = user_api_key
|
|
|
|
# Set up the headers
|
|
headers = {"X-API-Key": raw_key}
|
|
|
|
# Create another API key to revoke
|
|
response = client.post(
|
|
"/api/v1/auth/api-keys",
|
|
headers=headers,
|
|
json={
|
|
"name": "Key to Revoke",
|
|
"description": "This key will be revoked"
|
|
}
|
|
)
|
|
assert response.status_code == 201
|
|
new_key_id = response.json()["id"]
|
|
|
|
# Revoke the new key
|
|
response = client.delete(
|
|
f"/api/v1/auth/api-keys/{new_key_id}",
|
|
headers=headers
|
|
)
|
|
|
|
# Check that the key was revoked successfully
|
|
assert response.status_code == 204
|
|
|
|
# Check that the key is now inactive in the repository
|
|
revoked_key = await api_key_repository.get_by_id(ObjectId(new_key_id))
|
|
assert revoked_key is not None
|
|
assert revoked_key.is_active is False
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_verify_auth(client: TestClient, admin_api_key: tuple, admin_user: object):
|
|
"""Test verifying authentication"""
|
|
raw_key, _ = admin_api_key
|
|
|
|
# Set up the headers
|
|
headers = {"X-API-Key": raw_key}
|
|
|
|
# Verify authentication
|
|
response = client.get(
|
|
"/api/v1/auth/verify",
|
|
headers=headers
|
|
)
|
|
|
|
# Check response
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert data["user_id"] == str(admin_user.id)
|
|
assert data["name"] == admin_user.name
|
|
assert data["email"] == admin_user.email
|
|
assert data["team_id"] == str(admin_user.team_id)
|
|
assert data["is_admin"] is True
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_invalid_api_key(client: TestClient):
|
|
"""Test authentication with an invalid API key"""
|
|
# Set up the headers with an invalid API key
|
|
headers = {"X-API-Key": "invalid-api-key"}
|
|
|
|
# Try to access a protected endpoint
|
|
response = client.get(
|
|
"/api/v1/auth/verify",
|
|
headers=headers
|
|
)
|
|
|
|
# Check that authentication fails
|
|
assert response.status_code == 401
|
|
assert "detail" in response.json()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_expired_api_key(client: TestClient, regular_user: object):
|
|
"""Test authentication with an expired API key"""
|
|
# Create an expired API key
|
|
raw_key, hashed_key = raw_key, hashed_key = generate_api_key(str(regular_user.team_id), str(regular_user.id))
|
|
expired_date = datetime.utcnow() - timedelta(days=1)
|
|
api_key = ApiKeyModel(
|
|
key_hash=hashed_key,
|
|
user_id=regular_user.id,
|
|
team_id=regular_user.team_id,
|
|
name="Expired API Key",
|
|
description="This key is expired",
|
|
expiry_date=expired_date,
|
|
is_active=True
|
|
)
|
|
await api_key_repository.create(api_key)
|
|
|
|
# Set up the headers
|
|
headers = {"X-API-Key": raw_key}
|
|
|
|
# Try to access a protected endpoint
|
|
response = client.get(
|
|
"/api/v1/auth/verify",
|
|
headers=headers
|
|
)
|
|
|
|
# Check that authentication fails due to expiry
|
|
assert response.status_code == 401
|
|
assert "expired" in response.json()["detail"].lower()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_inactive_api_key(client: TestClient, regular_user: object):
|
|
"""Test authentication with an inactive API key"""
|
|
# Create an inactive API key
|
|
raw_key, hashed_key = raw_key, hashed_key = generate_api_key(str(regular_user.team_id), str(regular_user.id))
|
|
api_key = ApiKeyModel(
|
|
key_hash=hashed_key,
|
|
user_id=regular_user.id,
|
|
team_id=regular_user.team_id,
|
|
name="Inactive API Key",
|
|
description="This key is inactive",
|
|
expiry_date=datetime.utcnow() + timedelta(days=30),
|
|
is_active=False
|
|
)
|
|
await api_key_repository.create(api_key)
|
|
|
|
# Set up the headers
|
|
headers = {"X-API-Key": raw_key}
|
|
|
|
# Try to access a protected endpoint
|
|
response = client.get(
|
|
"/api/v1/auth/verify",
|
|
headers=headers
|
|
)
|
|
|
|
# Check that authentication fails due to inactivity
|
|
assert response.status_code == 401
|
|
assert "inactive" in response.json()["detail"].lower() |