2025-05-24 13:31:31 +02:00

469 lines
20 KiB
Python

"""
End-to-End Tests for SEREACT API
These tests cover the complete user workflows described in the README:
1. Bootstrap initial setup (team, admin user, API key) - or use existing setup
2. Team creation and management
3. User management within teams
4. API key authentication
5. Image upload and storage
6. Image search and retrieval
7. Multi-team isolation
These tests are idempotent and can be run multiple times against the same database.
Run with: pytest tests/test_e2e.py -v
For integration tests: pytest tests/test_e2e.py -v -m integration
"""
import pytest
import asyncio
import os
import io
import uuid
from typing import Dict, Any, List
from fastapi.testclient import TestClient
from PIL import Image as PILImage
import tempfile
from main import app
@pytest.mark.e2e
class TestE2EWorkflows:
"""End-to-end tests covering complete user workflows"""
@pytest.fixture(scope="function")
def client(self):
"""Create test client for the FastAPI app"""
return TestClient(app)
@pytest.fixture(scope="function")
def sample_image_file(self):
"""Create a sample image file for testing uploads"""
# Create a simple test image
img = PILImage.new('RGB', (100, 100), color='red')
img_bytes = io.BytesIO()
img.save(img_bytes, format='JPEG')
img_bytes.seek(0)
return img_bytes
@pytest.fixture(scope="function")
def unique_suffix(self):
"""Generate a unique suffix for test data to avoid conflicts"""
return str(uuid.uuid4())[:8]
def test_bootstrap_or_existing_setup_workflow(self, client: TestClient, sample_image_file, unique_suffix):
"""Test the complete workflow - either bootstrap new setup or use existing one"""
# 1. Try bootstrap first, but handle gracefully if already done
bootstrap_data = {
"team_name": f"E2E Test Team {unique_suffix}",
"admin_email": f"admin-{unique_suffix}@e2etest.com",
"admin_name": f"E2E Admin User {unique_suffix}",
"api_key_name": f"E2E Test API Key {unique_suffix}"
}
response = client.post("/api/v1/auth/bootstrap", params=bootstrap_data)
if response.status_code == 400:
# Bootstrap already completed, try to use existing setup
print("Bootstrap already completed, trying to use existing setup...")
# Check if user provided an API key via environment variable
test_api_key = os.getenv("E2E_TEST_API_KEY")
if test_api_key:
print(f"Using API key from environment variable")
headers = {"X-API-Key": test_api_key}
# Verify the API key works
response = client.get("/api/v1/auth/verify", headers=headers)
if response.status_code != 200:
pytest.skip(f"Provided API key is invalid: {response.status_code}")
auth_info = response.json()
# Create a new team for our test
team_data = {
"name": f"E2E Test Team {unique_suffix}",
"description": f"E2E test team created at {unique_suffix}"
}
response = client.post("/api/v1/teams", json=team_data, headers=headers)
if response.status_code != 201:
pytest.skip(f"Failed to create test team: {response.status_code}")
team = response.json()
team_id = team["id"]
# Create a test user for this team
user_data = {
"email": f"testuser-{unique_suffix}@e2etest.com",
"name": f"E2E Test User {unique_suffix}",
"is_admin": True,
"team_id": team_id
}
response = client.post("/api/v1/users", json=user_data, headers=headers)
if response.status_code != 201:
pytest.skip(f"Failed to create test user: {response.status_code}")
user = response.json()
admin_user_id = user["id"]
api_key = test_api_key
print(f"✅ Using existing setup with new team: {team_id}, user: {admin_user_id}")
else:
# No API key provided, skip the test
pytest.skip(
"Bootstrap already completed and no API key provided. "
"Set E2E_TEST_API_KEY environment variable with a valid API key to run this test."
)
else:
# Bootstrap succeeded
assert response.status_code == 201
bootstrap_result = response.json()
assert "key" in bootstrap_result
api_key = bootstrap_result["key"]
team_id = bootstrap_result["team_id"]
admin_user_id = bootstrap_result["user_id"]
headers = {"X-API-Key": api_key}
print(f"✅ Bootstrap successful - Team: {team_id}, User: {admin_user_id}")
headers = {"X-API-Key": api_key}
# 2. Verify authentication works
response = client.get("/api/v1/auth/verify", headers=headers)
assert response.status_code == 200
auth_info = response.json()
print("✅ Authentication verified")
# 3. Test team management
# Get the team (either created or from bootstrap)
response = client.get(f"/api/v1/teams/{team_id}", headers=headers)
assert response.status_code == 200
team = response.json()
print("✅ Team retrieval successful")
# Update team with unique description
update_data = {"description": f"Updated during E2E testing {unique_suffix}"}
response = client.put(f"/api/v1/teams/{team_id}", json=update_data, headers=headers)
assert response.status_code == 200
updated_team = response.json()
assert f"Updated during E2E testing {unique_suffix}" in updated_team["description"]
print("✅ Team update successful")
# List teams
response = client.get("/api/v1/teams", headers=headers)
assert response.status_code == 200
teams = response.json()
assert len(teams) >= 1
assert any(t["id"] == team_id for t in teams)
print("✅ Team listing successful")
# 4. Test user management
# Create a regular user with unique email
user_data = {
"email": f"user-{unique_suffix}@e2etest.com",
"name": f"E2E Regular User {unique_suffix}",
"is_admin": False,
"team_id": team_id
}
response = client.post("/api/v1/users", json=user_data, headers=headers)
assert response.status_code == 201
regular_user = response.json()
assert regular_user["email"] == f"user-{unique_suffix}@e2etest.com"
assert regular_user["is_admin"] is False
regular_user_id = regular_user["id"]
print("✅ User creation successful")
# Get user details
response = client.get(f"/api/v1/users/{regular_user_id}", headers=headers)
assert response.status_code == 200
retrieved_user = response.json()
assert retrieved_user["email"] == f"user-{unique_suffix}@e2etest.com"
print("✅ User retrieval successful")
# List users
response = client.get("/api/v1/users", headers=headers)
assert response.status_code == 200
users = response.json()
assert len(users) >= 1
user_emails = [u["email"] for u in users]
assert f"user-{unique_suffix}@e2etest.com" in user_emails
print("✅ User listing successful")
# 5. Test API key management
# Create additional API key with unique name
api_key_data = {
"name": f"Additional Test Key {unique_suffix}",
"description": f"Extra key for testing {unique_suffix}"
}
response = client.post("/api/v1/auth/api-keys", json=api_key_data, headers=headers)
assert response.status_code == 201
new_api_key = response.json()
assert new_api_key["name"] == f"Additional Test Key {unique_suffix}"
new_key_value = new_api_key["key"]
new_key_id = new_api_key["id"]
print("✅ Additional API key creation successful")
# Test the new API key works
new_headers = {"X-API-Key": new_key_value}
response = client.get("/api/v1/auth/verify", headers=new_headers)
assert response.status_code == 200
print("✅ New API key authentication successful")
# List API keys
response = client.get("/api/v1/auth/api-keys", headers=headers)
assert response.status_code == 200
api_keys = response.json()
assert len(api_keys) >= 1
print("✅ API key listing successful")
# Revoke the additional API key
response = client.delete(f"/api/v1/auth/api-keys/{new_key_id}", headers=headers)
assert response.status_code == 204
print("✅ API key revocation successful")
# Verify revoked key doesn't work
response = client.get("/api/v1/auth/verify", headers=new_headers)
assert response.status_code == 401
print("✅ Revoked API key properly rejected")
# 6. Test image upload and management
sample_image_file.seek(0)
files = {"file": (f"test_image_{unique_suffix}.jpg", sample_image_file, "image/jpeg")}
data = {
"description": f"E2E test image {unique_suffix}",
"tags": f"test,e2e,sample,{unique_suffix}"
}
response = client.post("/api/v1/images", files=files, data=data, headers=headers)
assert response.status_code == 201
image = response.json()
assert image["filename"] == f"test_image_{unique_suffix}.jpg"
assert image["description"] == f"E2E test image {unique_suffix}"
assert "test" in image["tags"]
assert unique_suffix in image["tags"]
image_id = image["id"]
print("✅ Image upload successful")
# Get image details
response = client.get(f"/api/v1/images/{image_id}", headers=headers)
assert response.status_code == 200
retrieved_image = response.json()
assert retrieved_image["filename"] == f"test_image_{unique_suffix}.jpg"
print("✅ Image retrieval successful")
# Update image metadata
update_data = {
"description": f"Updated E2E test image {unique_suffix}",
"tags": ["test", "e2e", "updated", unique_suffix]
}
response = client.put(f"/api/v1/images/{image_id}", json=update_data, headers=headers)
assert response.status_code == 200
updated_image = response.json()
assert updated_image["description"] == f"Updated E2E test image {unique_suffix}"
assert "updated" in updated_image["tags"]
print("✅ Image metadata update successful")
# List images
response = client.get("/api/v1/images", headers=headers)
assert response.status_code == 200
images = response.json()
assert len(images) >= 1
# Check if our image is in the list
our_images = [img for img in images if img["id"] == image_id]
assert len(our_images) == 1
print("✅ Image listing successful")
# Download image
response = client.get(f"/api/v1/images/{image_id}/download", headers=headers)
assert response.status_code == 200
assert response.headers["content-type"] == "image/jpeg"
print("✅ Image download successful")
# 7. Test search functionality
# Upload multiple images for search testing
test_images = [
{"filename": f"cat_{unique_suffix}.jpg", "description": f"A cute cat {unique_suffix}", "tags": f"animal,pet,cat,{unique_suffix}"},
{"filename": f"dog_{unique_suffix}.jpg", "description": f"A friendly dog {unique_suffix}", "tags": f"animal,pet,dog,{unique_suffix}"},
{"filename": f"landscape_{unique_suffix}.jpg", "description": f"Beautiful landscape {unique_suffix}", "tags": f"nature,landscape,outdoor,{unique_suffix}"}
]
uploaded_image_ids = []
for img_data in test_images:
sample_image_file.seek(0)
files = {"file": (img_data["filename"], sample_image_file, "image/jpeg")}
data = {
"description": img_data["description"],
"tags": img_data["tags"]
}
response = client.post("/api/v1/images", files=files, data=data, headers=headers)
assert response.status_code == 201
uploaded_image_ids.append(response.json()["id"])
print("✅ Multiple image uploads successful")
# Text search with unique suffix to find our images
response = client.get(f"/api/v1/search?query={unique_suffix}", headers=headers)
assert response.status_code == 200
search_results = response.json()
assert len(search_results) >= 1
# Verify our images are in the results
result_descriptions = [result["description"] for result in search_results]
assert any(unique_suffix in desc for desc in result_descriptions)
print("✅ Text search successful")
# Tag-based search with our unique tag
response = client.get(f"/api/v1/search?tags={unique_suffix}", headers=headers)
assert response.status_code == 200
search_results = response.json()
assert len(search_results) >= 4 # Our 4 uploaded images
print("✅ Tag-based search successful")
# Combined search
response = client.get(f"/api/v1/search?query=cat&tags={unique_suffix}", headers=headers)
assert response.status_code == 200
search_results = response.json()
assert len(search_results) >= 1
print("✅ Combined search successful")
print("🎉 Complete E2E workflow test passed!")
return {
"team_id": team_id,
"admin_user_id": admin_user_id,
"regular_user_id": regular_user_id,
"api_key": api_key,
"image_ids": [image_id] + uploaded_image_ids,
"unique_suffix": unique_suffix
}
def test_error_handling(self, client: TestClient, unique_suffix):
"""Test error handling scenarios"""
# Test bootstrap with duplicate data (should fail gracefully)
bootstrap_data = {
"team_name": f"Another Team {unique_suffix}",
"admin_email": f"another-{unique_suffix}@admin.com",
"admin_name": f"Another Admin {unique_suffix}",
"api_key_name": f"Another API Key {unique_suffix}"
}
response = client.post("/api/v1/auth/bootstrap", params=bootstrap_data)
if response.status_code == 400:
assert "Bootstrap already completed" in response.json()["detail"]
print("✅ Bootstrap protection working")
else:
# If bootstrap succeeded, that's also fine for a fresh database
print("✅ Bootstrap succeeded (fresh database)")
# Test invalid API key
invalid_headers = {"X-API-Key": "invalid-key"}
response = client.get("/api/v1/auth/verify", headers=invalid_headers)
assert response.status_code == 401
print("✅ Invalid API key properly rejected")
# Test missing API key
response = client.get("/api/v1/teams")
assert response.status_code == 401
print("✅ Missing API key properly rejected")
# Test file upload errors
response = client.post("/api/v1/images")
assert response.status_code == 401 # No API key
print("✅ Unauthorized image upload properly rejected")
print("🎉 Error handling test passed!")
@pytest.mark.integration
@pytest.mark.e2e
class TestE2EIntegrationWorkflows:
"""End-to-end integration tests that require real services"""
@pytest.fixture(scope="class")
def client(self):
"""Create test client for integration testing"""
if not os.getenv("E2E_INTEGRATION_TEST"):
pytest.skip("E2E integration tests disabled. Set E2E_INTEGRATION_TEST=1 to enable")
return TestClient(app)
def test_real_image_processing_workflow(self, client: TestClient):
"""Test the complete image processing workflow with real services"""
# This test would require:
# - Real Google Cloud Storage
# - Real Firestore database
# - Real Cloud Vision API
# - Real Pinecone vector database
# For integration tests, we would need to clear the database first
# or use a separate test database
# Bootstrap setup
bootstrap_data = {
"team_name": "Real Processing Team",
"admin_email": "real@processing.com",
"admin_name": "Real User",
"api_key_name": "Real API Key"
}
response = client.post("/api/v1/auth/bootstrap", params=bootstrap_data)
if response.status_code == 400:
# Bootstrap already done, skip this test
pytest.skip("Bootstrap already completed in real database")
assert response.status_code == 201
api_key = response.json()["key"]
headers = {"X-API-Key": api_key}
# Upload a real image
with open("images/sample_image.jpg", "rb") as f: # Assuming sample image exists
files = {"file": ("real_image.jpg", f, "image/jpeg")}
data = {"description": "Real image for processing", "tags": "real,processing,test"}
response = client.post("/api/v1/images", files=files, data=data, headers=headers)
assert response.status_code == 201
image = response.json()
image_id = image["id"]
# Wait for processing to complete (in real scenario, this would be async)
import time
time.sleep(5) # Wait for Cloud Function to process
# Check if embeddings were generated
response = client.get(f"/api/v1/images/{image_id}", headers=headers)
assert response.status_code == 200
processed_image = response.json()
assert processed_image["status"] == "ready"
assert "embedding_id" in processed_image
# Test semantic search
response = client.get("/api/v1/search/semantic?query=similar image", headers=headers)
assert response.status_code == 200
search_results = response.json()
assert len(search_results) >= 1
# Utility functions for E2E tests
def create_test_image(width: int = 100, height: int = 100, color: str = 'red') -> io.BytesIO:
"""Create a test image for upload testing"""
img = PILImage.new('RGB', (width, height), color=color)
img_bytes = io.BytesIO()
img.save(img_bytes, format='JPEG')
img_bytes.seek(0)
return img_bytes
if __name__ == "__main__":
# Run E2E tests
pytest.main([__file__, "-v", "-m", "e2e"])