""" End-to-End Tests for SEREACT API These tests cover the complete user workflows described in the README: 1. Bootstrap initial setup (team, admin user, API key) with artificial data 2. Team creation and management 3. User management within teams 4. API key authentication 5. Image upload and storage 6. Image search and retrieval 7. Multi-team isolation 8. Advanced search functionality 9. Image collections management 10. User role and permission testing 11. Image metadata operations 12. Real database integration These tests are completely self-contained: - Create artificial test data at the start - Run all tests against this test data - Clean up all test data at the end Run with: pytest tests/test_e2e.py -v For integration tests: pytest tests/test_e2e.py -v -m integration For real database tests: pytest tests/test_e2e.py -v -m realdb """ import pytest import asyncio import os import io import uuid import time import json from typing import Dict, Any, List, Optional from fastapi.testclient import TestClient from PIL import Image as PILImage import tempfile from main import app @pytest.mark.e2e class TestE2EWorkflows: """End-to-end tests covering complete user workflows with artificial test data""" @pytest.fixture(scope="class") def client(self): """Create test client for the FastAPI app""" return TestClient(app) @pytest.fixture(scope="class") def test_environment(self, client: TestClient): """Create a complete test environment with artificial data""" unique_suffix = str(uuid.uuid4())[:8] # Try bootstrap first - if it fails due to existing teams, create manually bootstrap_data = { "team_name": f"E2E Test Team {unique_suffix}", "admin_email": f"admin-{unique_suffix}@e2etest.com", "admin_name": f"E2E Admin User {unique_suffix}", "api_key_name": f"E2E Test API Key {unique_suffix}" } response = client.post("/api/v1/auth/bootstrap", params=bootstrap_data) if response.status_code == 400: # Bootstrap failed due to existing teams - create manually print(f"⚠️ Bootstrap failed (existing teams), creating test environment manually...") # Create a unique environment manually using direct API calls # We'll use a very unique name that won't conflict timestamp = int(time.time()) unique_team_name = f"E2E_ISOLATED_TEST_TEAM_{unique_suffix}_{timestamp}" unique_admin_email = f"isolated-admin-{unique_suffix}-{timestamp}@e2etest.com" # Try bootstrap again with super unique identifiers bootstrap_data["team_name"] = unique_team_name bootstrap_data["admin_email"] = unique_admin_email response = client.post("/api/v1/auth/bootstrap", params=bootstrap_data) if response.status_code == 400: # Still failing - this means bootstrap is completely disabled # We need to create the environment using a different approach print(f"⚠️ Bootstrap completely disabled, creating environment via direct repository access...") # Import the repositories directly import asyncio from src.db.repositories.team_repository import team_repository from src.db.repositories.user_repository import user_repository from src.db.repositories.api_key_repository import api_key_repository from src.models.team import TeamModel from src.models.user import UserModel from src.models.api_key import ApiKeyModel from src.auth.security import generate_api_key, calculate_expiry_date async def create_test_environment(): # Create team team = TeamModel( name=unique_team_name, description=f"E2E test team created at {timestamp}" ) created_team = await team_repository.create(team) # Create admin user user = UserModel( name=f"E2E Admin User {unique_suffix}", email=unique_admin_email, team_id=created_team.id, is_admin=True, is_active=True ) created_user = await user_repository.create(user) # Generate API key raw_key, hashed_key = generate_api_key(str(created_team.id), str(created_user.id)) expiry_date = calculate_expiry_date() # Create API key api_key = ApiKeyModel( key_hash=hashed_key, user_id=created_user.id, team_id=created_team.id, name=f"E2E Test API Key {unique_suffix}", description="E2E test API key", expiry_date=expiry_date, is_active=True ) created_key = await api_key_repository.create(api_key) return { "key": raw_key, "team_id": str(created_team.id), "user_id": str(created_user.id), "id": str(created_key.id) } # Run the async function loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: bootstrap_result = loop.run_until_complete(create_test_environment()) finally: loop.close() if response.status_code != 201 and 'bootstrap_result' not in locals(): pytest.skip(f"Cannot create test environment: {response.status_code} - {response.text}") # Get the bootstrap result if 'bootstrap_result' in locals(): # Manual creation api_key = bootstrap_result["key"] team_id = bootstrap_result["team_id"] admin_user_id = bootstrap_result["user_id"] api_key_id = bootstrap_result["id"] else: # Bootstrap succeeded bootstrap_result = response.json() api_key = bootstrap_result["key"] team_id = bootstrap_result["team_id"] admin_user_id = bootstrap_result["user_id"] api_key_id = bootstrap_result["id"] headers = {"X-API-Key": api_key} print(f"✅ Test environment created - Team: {team_id}, User: {admin_user_id}") # Verify the environment works response = client.get("/api/v1/auth/verify", headers=headers) if response.status_code != 200: pytest.skip(f"Test environment authentication failed: {response.status_code}") env_data = { "api_key": api_key, "team_id": team_id, "admin_user_id": admin_user_id, "headers": headers, "unique_suffix": unique_suffix, "created_resources": { "teams": [team_id], "users": [admin_user_id], "api_keys": [api_key_id], "images": [] } } yield env_data # Cleanup: Delete all created resources print(f"🧹 Cleaning up test environment...") try: # Delete all created images for image_id in env_data["created_resources"]["images"]: try: client.delete(f"/api/v1/images/{image_id}", headers=headers) except: pass # Delete additional users (keep admin for team deletion) for user_id in env_data["created_resources"]["users"]: if user_id != admin_user_id: try: client.delete(f"/api/v1/users/{user_id}", headers=headers) except: pass # Delete additional teams for team_id_to_delete in env_data["created_resources"]["teams"]: if team_id_to_delete != team_id: try: client.delete(f"/api/v1/teams/{team_id_to_delete}", headers=headers) except: pass # Finally delete the main team (this should cascade delete the admin user) try: client.delete(f"/api/v1/teams/{team_id}", headers=headers) print("✅ Test environment cleaned up successfully") except Exception as e: print(f"⚠️ Cleanup warning: {e}") except Exception as e: print(f"⚠️ Cleanup error: {e}") @pytest.fixture(scope="function") def sample_image_file(self): """Create a sample image file for testing uploads""" img = PILImage.new('RGB', (100, 100), color='red') img_bytes = io.BytesIO() img.save(img_bytes, format='JPEG') img_bytes.seek(0) return img_bytes @pytest.fixture(scope="function") def sample_image_files(self): """Create multiple sample image files for testing""" images = {} colors = ['red', 'blue', 'green', 'yellow', 'purple'] for color in colors: img = PILImage.new('RGB', (100, 100), color=color) img_bytes = io.BytesIO() img.save(img_bytes, format='JPEG') img_bytes.seek(0) images[color] = img_bytes return images def test_bootstrap_and_basic_workflow(self, test_environment, client: TestClient): """Test the complete bootstrap and basic workflow""" print(f"🧪 Testing basic workflow with environment {test_environment['unique_suffix']}") headers = test_environment["headers"] unique_suffix = test_environment["unique_suffix"] # Test 1: Authentication verification response = client.get("/api/v1/auth/verify", headers=headers) assert response.status_code == 200 print("✅ Authentication verified") # Test 2: Team management response = client.get(f"/api/v1/teams/{test_environment['team_id']}", headers=headers) assert response.status_code == 200 team_data = response.json() assert team_data["id"] == test_environment["team_id"] print("✅ Team retrieval successful") # Update team description team_update = {"description": f"Updated during E2E testing {unique_suffix}"} response = client.put(f"/api/v1/teams/{test_environment['team_id']}", json=team_update, headers=headers) assert response.status_code == 200 print("✅ Team update successful") # Test 3: User management user_data = { "email": f"user-{unique_suffix}@e2etest.com", "name": f"E2E Regular User {unique_suffix}", "team_id": test_environment["team_id"], "is_admin": False } response = client.post("/api/v1/users", json=user_data, headers=headers) assert response.status_code == 201 created_user = response.json() test_environment["created_resources"]["users"].append(created_user["id"]) print("✅ User creation successful") # Test 4: API key management api_key_data = { "name": f"Additional Test Key {unique_suffix}", "description": f"Extra key for testing {unique_suffix}" } response = client.post("/api/v1/auth/api-keys", json=api_key_data, headers=headers) assert response.status_code == 201 new_api_key = response.json() test_environment["created_resources"]["api_keys"].append(new_api_key["id"]) # Test the new API key new_headers = {"X-API-Key": new_api_key["key"]} response = client.get("/api/v1/auth/verify", headers=new_headers) assert response.status_code == 200 print("✅ Additional API key creation successful") print("✅ New API key authentication successful") # Test 5: Image upload test_image = self.create_test_image(f"test_image_{unique_suffix}.jpg") files = {"file": (f"test_image_{unique_suffix}.jpg", test_image, "image/jpeg")} data = { "description": f"Test image uploaded during E2E testing {unique_suffix}", "tags": f"e2e,test,{unique_suffix}" } response = client.post("/api/v1/images", files=files, data=data, headers=headers) assert response.status_code == 201 uploaded_image = response.json() test_environment["created_resources"]["images"].append(uploaded_image["id"]) print("✅ Image upload successful") # Test 6: Image metadata update image_update = { "description": f"Updated description for E2E testing {unique_suffix}", "tags": [f"updated", f"e2e", unique_suffix] } response = client.put(f"/api/v1/images/{uploaded_image['id']}", json=image_update, headers=headers) assert response.status_code == 200 print("✅ Image metadata update successful") # Test 7: Search functionality (with fallback for missing Pinecone) response = client.get(f"/api/v1/search?q={unique_suffix}", headers=headers) assert response.status_code == 200 search_results = response.json() # Check if search is working (Pinecone configured) or returning empty (Pinecone not configured) if len(search_results["results"]) == 0: print("⚠️ Search returned empty results (likely Pinecone not configured)") # Test that search endpoint is at least responding correctly assert "results" in search_results assert "total" in search_results assert search_results["query"] == unique_suffix print("✅ Search endpoint responding correctly (empty results)") else: # If search is working, verify results assert len(search_results["results"]) >= 1 print("✅ Search functionality working with results") print("🎉 Basic workflow test completed successfully!") def test_advanced_search_functionality(self, test_environment, client: TestClient): """Test advanced search capabilities""" print(f"🧪 Testing advanced search with environment {test_environment['unique_suffix']}") headers = test_environment["headers"] unique_suffix = test_environment["unique_suffix"] # Upload diverse test images for search testing test_images = [ ("red", f"red_{unique_suffix}.jpg", f"A red image for testing {unique_suffix}", ["red", "color", unique_suffix]), ("blue", f"blue_{unique_suffix}.jpg", f"A blue image for testing {unique_suffix}", ["blue", "color", unique_suffix]), ("green", f"green_{unique_suffix}.jpg", f"A green nature image {unique_suffix}", ["green", "nature", unique_suffix]), ("yellow", f"yellow_{unique_suffix}.jpg", f"A yellow sunny image {unique_suffix}", ["yellow", "sunny", unique_suffix]), ("purple", f"purple_{unique_suffix}.jpg", f"A purple flower image {unique_suffix}", ["purple", "flower", unique_suffix]) ] uploaded_images = [] for color, filename, description, tags in test_images: test_image = self.create_test_image(filename) files = {"file": (filename, test_image, "image/jpeg")} data = { "description": description, "tags": ",".join(tags) } response = client.post("/api/v1/images", files=files, data=data, headers=headers) assert response.status_code == 201 uploaded_image = response.json() uploaded_images.append(uploaded_image) test_environment["created_resources"]["images"].append(uploaded_image["id"]) print("✅ Diverse images uploaded for advanced search testing") # Test 1: Text-based search (with fallback for missing Pinecone) response = client.get("/api/v1/search?q=nature&limit=10", headers=headers) assert response.status_code == 200 nature_results = response.json()["results"] if len(nature_results) == 0: print("⚠️ Text search returned empty results (likely Pinecone not configured)") # Test that search endpoint structure is correct response = client.get("/api/v1/search?q=test&limit=5", headers=headers) assert response.status_code == 200 search_response = response.json() assert "results" in search_response assert "total" in search_response assert "query" in search_response print("✅ Search endpoint structure verified") else: # If search is working, verify results print(f"✅ Text search returned {len(nature_results)} results") # Test 2: Tag-based filtering (this should work regardless of Pinecone) response = client.get(f"/api/v1/search?q=color&tags={unique_suffix}", headers=headers) assert response.status_code == 200 tag_results = response.json()["results"] print(f"✅ Tag-based search completed (returned {len(tag_results)} results)") # Test 3: Advanced search with POST endpoint advanced_search = { "query": "image", "limit": 5, "threshold": 0.5, "tags": [unique_suffix] } response = client.post("/api/v1/search", json=advanced_search, headers=headers) assert response.status_code == 200 advanced_results = response.json()["results"] print(f"✅ Advanced POST search completed (returned {len(advanced_results)} results)") # Test 4: Search with different thresholds response = client.get(f"/api/v1/search?q={unique_suffix}&threshold=0.1", headers=headers) assert response.status_code == 200 low_threshold_results = response.json()["results"] response = client.get(f"/api/v1/search?q={unique_suffix}&threshold=0.9", headers=headers) assert response.status_code == 200 high_threshold_results = response.json()["results"] print(f"✅ Threshold testing completed (low: {len(low_threshold_results)}, high: {len(high_threshold_results)})") # Test 5: Verify search response structure response = client.get(f"/api/v1/search?q=test&limit=3", headers=headers) assert response.status_code == 200 search_response = response.json() # Verify response structure required_fields = ["query", "results", "total", "limit", "threshold"] for field in required_fields: assert field in search_response, f"Missing field: {field}" print("✅ Search response structure verified") print("🎉 Advanced search functionality test completed!") def create_test_image(self, filename: str) -> io.BytesIO: """Create a simple test image file""" from PIL import Image # Create a simple 100x100 colored image img = Image.new('RGB', (100, 100), color='red') img_bytes = io.BytesIO() img.save(img_bytes, format='JPEG') img_bytes.seek(0) return img_bytes def test_user_roles_and_permissions(self, test_environment, client: TestClient): """Test user roles and permission management""" print(f"🧪 Testing user roles and permissions with environment {test_environment['unique_suffix']}") headers = test_environment["headers"] unique_suffix = test_environment["unique_suffix"] # Create a regular user regular_user_data = { "email": f"regular-{unique_suffix}@roletest.com", "name": f"Regular User {unique_suffix}", "team_id": test_environment["team_id"], "is_admin": False } response = client.post("/api/v1/users", json=regular_user_data, headers=headers) assert response.status_code == 201 regular_user = response.json() test_environment["created_resources"]["users"].append(regular_user["id"]) print("✅ Regular user created") # Create API key for regular user (admin creates it, but it will be associated with the regular user) # Note: In the current implementation, API keys are created by the current user (admin) # but we need to create a key that can be used by the regular user # For now, let's test that the admin can create users and the regular user exists # We'll verify the regular user's profile by getting it directly # Test admin user profile access response = client.get("/api/v1/users/me", headers=headers) assert response.status_code == 200 admin_profile = response.json() assert admin_profile["is_admin"] == True print("✅ Admin user profile access verified") # Test that we can retrieve the regular user's information (as admin) response = client.get(f"/api/v1/users/{regular_user['id']}", headers=headers) if response.status_code == 200: user_info = response.json() assert user_info["email"] == f"regular-{unique_suffix}@roletest.com" assert user_info["is_admin"] == False print("✅ Regular user information verified") else: # If direct user access isn't available, verify through user listing print("⚠️ Direct user access not available, verifying through creation response") assert regular_user["email"] == f"regular-{unique_suffix}@roletest.com" assert regular_user["is_admin"] == False print("✅ Regular user creation verified") # Test that regular user can upload images (basic functionality) # Since we can't easily create a separate API key for the regular user in the current setup, # we'll test basic user management functionality test_image = self.create_test_image(f"regular_user_image_{unique_suffix}.jpg") files = {"file": ("regular_user_image.jpg", test_image, "image/jpeg")} data = { "description": f"Image uploaded by admin for regular user testing {unique_suffix}", "tags": f"regular,user,{unique_suffix}" } response = client.post("/api/v1/images", files=files, data=data, headers=headers) assert response.status_code == 201 uploaded_image = response.json() test_environment["created_resources"]["images"].append(uploaded_image["id"]) print("✅ Image upload functionality verified") # Verify the image belongs to the admin user (since we used admin's API key) assert uploaded_image["uploader_id"] == test_environment["admin_user_id"] assert uploaded_image["team_id"] == test_environment["team_id"] print("✅ Image ownership verification successful") def test_multi_team_isolation(self, client: TestClient, test_environment, sample_image_file): """Test that teams are properly isolated from each other with artificial data""" env = test_environment admin_headers = env["headers"] unique_suffix = env["unique_suffix"] print(f"🧪 Testing multi-team isolation with environment {unique_suffix}") # Create two separate teams team1_data = { "name": f"Team Alpha {unique_suffix}", "description": f"First team for isolation testing {unique_suffix}" } team2_data = { "name": f"Team Beta {unique_suffix}", "description": f"Second team for isolation testing {unique_suffix}" } response = client.post("/api/v1/teams", json=team1_data, headers=admin_headers) assert response.status_code == 201 team1 = response.json() team1_id = team1["id"] env["created_resources"]["teams"].append(team1_id) response = client.post("/api/v1/teams", json=team2_data, headers=admin_headers) assert response.status_code == 201 team2 = response.json() team2_id = team2["id"] env["created_resources"]["teams"].append(team2_id) print("✅ Two teams created for isolation testing") # Create users for each team user1_data = { "email": f"user1-{unique_suffix}@team1.com", "name": f"Team1 User {unique_suffix}", "is_admin": True, "team_id": team1_id } user2_data = { "email": f"user2-{unique_suffix}@team2.com", "name": f"Team2 User {unique_suffix}", "is_admin": True, "team_id": team2_id } response = client.post("/api/v1/users", json=user1_data, headers=admin_headers) assert response.status_code == 201 user1 = response.json() env["created_resources"]["users"].append(user1["id"]) response = client.post("/api/v1/users", json=user2_data, headers=admin_headers) assert response.status_code == 201 user2 = response.json() env["created_resources"]["users"].append(user2["id"]) print("✅ Users created for each team") # Create API keys for each team's user api_key1_data = { "name": f"Team1 API Key {unique_suffix}", "description": "API key for team 1 testing", "team_id": team1_id, "user_id": user1["id"] } api_key2_data = { "name": f"Team2 API Key {unique_suffix}", "description": "API key for team 2 testing", "team_id": team2_id, "user_id": user2["id"] } response = client.post("/api/v1/auth/api-keys", json=api_key1_data, headers=admin_headers) assert response.status_code == 201 team1_api_key = response.json()["key"] team1_headers = {"X-API-Key": team1_api_key} env["created_resources"]["api_keys"].append(response.json()["id"]) response = client.post("/api/v1/auth/api-keys", json=api_key2_data, headers=admin_headers) assert response.status_code == 201 team2_api_key = response.json()["key"] team2_headers = {"X-API-Key": team2_api_key} env["created_resources"]["api_keys"].append(response.json()["id"]) print("✅ API keys created for each team") # Upload images to each team sample_image_file.seek(0) files1 = {"file": (f"team1_image_{unique_suffix}.jpg", sample_image_file, "image/jpeg")} data1 = { "description": f"Team 1 confidential image {unique_suffix}", "tags": f"team1,confidential,{unique_suffix}" } response = client.post("/api/v1/images", files=files1, data=data1, headers=team1_headers) assert response.status_code == 201 team1_image = response.json() team1_image_id = team1_image["id"] env["created_resources"]["images"].append(team1_image_id) sample_image_file.seek(0) files2 = {"file": (f"team2_image_{unique_suffix}.jpg", sample_image_file, "image/jpeg")} data2 = { "description": f"Team 2 secret image {unique_suffix}", "tags": f"team2,secret,{unique_suffix}" } response = client.post("/api/v1/images", files=files2, data=data2, headers=team2_headers) assert response.status_code == 201 team2_image = response.json() team2_image_id = team2_image["id"] env["created_resources"]["images"].append(team2_image_id) print("✅ Images uploaded to each team") # Test 1: Team 1 user can only see Team 1 images response = client.get("/api/v1/images", headers=team1_headers) assert response.status_code == 200 team1_images = response.json() team1_image_ids = [img["id"] for img in team1_images["images"]] assert team1_image_id in team1_image_ids assert team2_image_id not in team1_image_ids print("✅ Team 1 user can only see Team 1 images") # Test 2: Team 1 user CANNOT access Team 2's image response = client.get(f"/api/v1/images/{team2_image_id}", headers=team1_headers) assert response.status_code == 403 # Should be forbidden (not 404) print("✅ Team 1 user cannot access Team 2's image") # Test 3: Search results are isolated by team response = client.get(f"/api/v1/search?q={unique_suffix}", headers=team1_headers) assert response.status_code == 200 team1_search = response.json() team1_search_ids = [img["id"] for img in team1_search["results"]] if len(team1_search_ids) == 0: print("⚠️ Search returned empty results (likely Pinecone not configured)") # Verify the search endpoint is working correctly assert "results" in team1_search assert "total" in team1_search assert team1_search["query"] == unique_suffix print("✅ Search endpoint responding correctly (empty results)") else: # If search is working, verify team isolation assert team1_image_id in team1_search_ids assert team2_image_id not in team1_search_ids print("✅ Search results properly isolated by team") print("🎉 Multi-team isolation test passed!") def test_image_metadata_operations(self, test_environment, client: TestClient): """Test comprehensive image metadata management""" print(f"🧪 Testing image metadata operations with environment {test_environment['unique_suffix']}") headers = test_environment["headers"] unique_suffix = test_environment["unique_suffix"] # Upload an image with initial metadata test_image = self.create_test_image(f"metadata_test_{unique_suffix}.jpg") files = {"file": (f"metadata_test_{unique_suffix}.jpg", test_image, "image/jpeg")} data = { "description": f"Initial metadata test image {unique_suffix}", "tags": f"initial,metadata,{unique_suffix}" } response = client.post("/api/v1/images", files=files, data=data, headers=headers) assert response.status_code == 201 uploaded_image = response.json() image_id = uploaded_image["id"] test_environment["created_resources"]["images"].append(image_id) print("✅ Image uploaded with initial metadata") # Test 1: Update description description_update = { "description": f"Updated description for metadata testing {unique_suffix}" } response = client.put(f"/api/v1/images/{image_id}", json=description_update, headers=headers) assert response.status_code == 200 updated_image = response.json() assert f"Updated description for metadata testing {unique_suffix}" in updated_image["description"] print("✅ Description update successful") # Test 2: Update tags tags_update = { "tags": ["updated", "metadata", "testing", unique_suffix] } response = client.put(f"/api/v1/images/{image_id}", json=tags_update, headers=headers) assert response.status_code == 200 updated_image = response.json() assert "updated" in updated_image["tags"] assert unique_suffix in updated_image["tags"] print("✅ Tags update successful") # Test 3: Search by updated metadata (with fallback for missing Pinecone) response = client.get(f"/api/v1/search?q=updated&tags={unique_suffix}", headers=headers) assert response.status_code == 200 search_results = response.json() found_images = search_results["results"] if len(found_images) == 0: print("⚠️ Metadata search returned empty results (likely Pinecone not configured)") # Verify the search endpoint is working correctly assert "results" in search_results assert "total" in search_results assert search_results["query"] == "updated" print("✅ Search endpoint responding correctly for metadata search") else: # If search is working, verify we can find our updated image assert len(found_images) >= 1 # Check if our image is in the results (by checking tags) our_image_found = any( unique_suffix in img.get("tags", []) and "updated" in img.get("tags", []) for img in found_images ) if our_image_found: print("✅ Updated image found in search results") else: print("⚠️ Updated image not found in search results (may be due to indexing delay)") # Test 4: Retrieve image directly to verify metadata persistence response = client.get(f"/api/v1/images/{image_id}", headers=headers) assert response.status_code == 200 retrieved_image = response.json() # Verify all metadata updates persisted assert f"Updated description for metadata testing {unique_suffix}" in retrieved_image["description"] assert "updated" in retrieved_image["tags"] assert "metadata" in retrieved_image["tags"] assert unique_suffix in retrieved_image["tags"] print("✅ Metadata persistence verified") # Test 5: Partial metadata update (only description) partial_update = { "description": f"Final description update {unique_suffix}" } response = client.put(f"/api/v1/images/{image_id}", json=partial_update, headers=headers) assert response.status_code == 200 final_image = response.json() # Verify description changed but tags remained assert f"Final description update {unique_suffix}" in final_image["description"] assert "updated" in final_image["tags"] # Tags should remain unchanged print("✅ Partial metadata update successful") print("🎉 Image metadata operations test completed!") def test_error_handling(self, client: TestClient, test_environment): """Test error handling scenarios with artificial data""" env = test_environment headers = env["headers"] unique_suffix = env["unique_suffix"] print(f"🧪 Testing error handling with environment {unique_suffix}") # Test invalid API key invalid_headers = {"X-API-Key": "invalid-key"} response = client.get("/api/v1/auth/verify", headers=invalid_headers) assert response.status_code == 401 print("✅ Invalid API key properly rejected") # Test missing API key response = client.get("/api/v1/teams") assert response.status_code == 401 print("✅ Missing API key properly rejected") # Test invalid image ID response = client.get("/api/v1/images/invalid-id", headers=headers) assert response.status_code == 400 # Bad request for invalid ID format print("✅ Invalid image ID properly rejected") # Test unauthorized image upload response = client.post("/api/v1/images") assert response.status_code == 401 # No API key print("✅ Unauthorized image upload properly rejected") print("🎉 Error handling test passed!") @pytest.mark.integration @pytest.mark.e2e class TestE2EIntegrationWorkflows: """End-to-end integration tests that require real services with artificial data""" @pytest.fixture(scope="class") def client(self): """Create test client for integration testing""" if not os.getenv("E2E_INTEGRATION_TEST"): pytest.skip("E2E integration tests disabled. Set E2E_INTEGRATION_TEST=1 to enable") return TestClient(app) @pytest.fixture(scope="class") def integration_environment(self, client: TestClient): """Create test environment for integration tests""" unique_suffix = str(uuid.uuid4())[:8] bootstrap_data = { "team_name": f"Integration Test Team {unique_suffix}", "admin_email": f"integration-admin-{unique_suffix}@test.com", "admin_name": f"Integration Admin {unique_suffix}", "api_key_name": f"Integration API Key {unique_suffix}" } response = client.post("/api/v1/auth/bootstrap", params=bootstrap_data) if response.status_code == 400: # Try with more unique identifiers bootstrap_data["team_name"] = f"INTEGRATION_TEST_{unique_suffix}_{int(time.time())}" bootstrap_data["admin_email"] = f"integration-{unique_suffix}-{int(time.time())}@test.com" response = client.post("/api/v1/auth/bootstrap", params=bootstrap_data) assert response.status_code == 201 result = response.json() env_data = { "api_key": result["key"], "team_id": result["team_id"], "admin_user_id": result["user_id"], "headers": {"X-API-Key": result["key"]}, "unique_suffix": unique_suffix } yield env_data # Cleanup try: client.delete(f"/api/v1/teams/{env_data['team_id']}", headers=env_data["headers"]) except: pass def test_real_image_processing_workflow(self, client: TestClient, integration_environment): """Test the complete image processing workflow with real services and artificial data""" env = integration_environment headers = env["headers"] unique_suffix = env["unique_suffix"] # Create a test image img = PILImage.new('RGB', (200, 200), color='blue') img_bytes = io.BytesIO() img.save(img_bytes, format='JPEG') img_bytes.seek(0) files = {"file": (f"integration_test_{unique_suffix}.jpg", img_bytes, "image/jpeg")} data = {"description": f"Integration test image {unique_suffix}", "tags": f"integration,test,{unique_suffix}"} response = client.post("/api/v1/images", files=files, data=data, headers=headers) assert response.status_code == 201 image = response.json() image_id = image["id"] # Wait for processing to complete (in real scenario, this would be async) time.sleep(5) # Wait for Cloud Function to process # Check if embeddings were generated response = client.get(f"/api/v1/images/{image_id}", headers=headers) assert response.status_code == 200 processed_image = response.json() assert processed_image["has_embedding"] is True # Test semantic search response = client.get("/api/v1/search?q=integration test", headers=headers) assert response.status_code == 200 search_results = response.json() assert len(search_results["results"]) >= 1 print("🎉 Real image processing workflow test passed!") @pytest.mark.realdb @pytest.mark.e2e class TestE2ERealDatabaseWorkflows: """End-to-end tests that use real database connections with artificial data""" @pytest.fixture(scope="class") def client(self): """Create test client for real database testing""" if not os.getenv("E2E_REALDB_TEST"): pytest.skip("Real database tests disabled. Set E2E_REALDB_TEST=1 to enable") return TestClient(app) @pytest.fixture(scope="class") def realdb_environment(self, client: TestClient): """Create test environment for real database tests""" unique_suffix = str(uuid.uuid4())[:8] bootstrap_data = { "team_name": f"RealDB Test Team {unique_suffix}", "admin_email": f"realdb-admin-{unique_suffix}@test.com", "admin_name": f"RealDB Admin {unique_suffix}", "api_key_name": f"RealDB API Key {unique_suffix}" } response = client.post("/api/v1/auth/bootstrap", params=bootstrap_data) if response.status_code == 400: # Try with more unique identifiers bootstrap_data["team_name"] = f"REALDB_TEST_{unique_suffix}_{int(time.time())}" bootstrap_data["admin_email"] = f"realdb-{unique_suffix}-{int(time.time())}@test.com" response = client.post("/api/v1/auth/bootstrap", params=bootstrap_data) assert response.status_code == 201 result = response.json() env_data = { "api_key": result["key"], "team_id": result["team_id"], "admin_user_id": result["user_id"], "headers": {"X-API-Key": result["key"]}, "unique_suffix": unique_suffix, "created_images": [] } yield env_data # Cleanup try: # Clean up images first for image_id in env_data["created_images"]: try: client.delete(f"/api/v1/images/{image_id}", headers=env_data["headers"]) except: pass # Clean up team client.delete(f"/api/v1/teams/{env_data['team_id']}", headers=env_data["headers"]) except: pass def test_database_performance_and_scalability(self, client: TestClient, realdb_environment): """Test database performance with larger datasets using artificial data""" env = realdb_environment headers = env["headers"] unique_suffix = env["unique_suffix"] print(f"🧪 Testing database performance with environment {unique_suffix}") # Test 1: Bulk image upload performance start_time = time.time() uploaded_images = [] for i in range(10): # Upload 10 images img = PILImage.new('RGB', (200, 200), color='red') img_bytes = io.BytesIO() img.save(img_bytes, format='JPEG') img_bytes.seek(0) files = {"file": (f"perf_test_{unique_suffix}_{i}.jpg", img_bytes, "image/jpeg")} data = { "description": f"Performance test image {i} {unique_suffix}", "tags": f"performance,test,bulk,image_{i},{unique_suffix}" } response = client.post("/api/v1/images", files=files, data=data, headers=headers) assert response.status_code == 201 image_id = response.json()["id"] uploaded_images.append(image_id) env["created_images"].append(image_id) upload_time = time.time() - start_time print(f"✅ Bulk upload of 10 images completed in {upload_time:.2f} seconds") # Test 2: Search performance start_time = time.time() response = client.get(f"/api/v1/search?q=performance {unique_suffix}&limit=20", headers=headers) assert response.status_code == 200 search_time = time.time() - start_time print(f"✅ Search completed in {search_time:.2f} seconds") print("🎉 Database performance and scalability test passed!") def test_data_consistency_and_transactions(self, client: TestClient, realdb_environment): """Test data consistency across operations with artificial data""" env = realdb_environment headers = env["headers"] unique_suffix = env["unique_suffix"] print(f"🧪 Testing data consistency with environment {unique_suffix}") # Test 1: Create team and verify consistency team_data = { "name": f"Consistency Test Team {unique_suffix}", "description": f"Testing data consistency {unique_suffix}" } response = client.post("/api/v1/teams", json=team_data, headers=headers) assert response.status_code == 201 team = response.json() team_id = team["id"] # Immediately verify team exists response = client.get(f"/api/v1/teams/{team_id}", headers=headers) assert response.status_code == 200 retrieved_team = response.json() assert retrieved_team["name"] == f"Consistency Test Team {unique_suffix}" print("✅ Team creation consistency verified") # Test 2: Upload image and verify metadata consistency img = PILImage.new('RGB', (100, 100), color='blue') img_bytes = io.BytesIO() img.save(img_bytes, format='JPEG') img_bytes.seek(0) files = {"file": (f"consistency_test_{unique_suffix}.jpg", img_bytes, "image/jpeg")} data = { "description": f"Consistency test image {unique_suffix}", "tags": f"consistency,test,{unique_suffix}" } response = client.post("/api/v1/images", files=files, data=data, headers=headers) assert response.status_code == 201 image = response.json() image_id = image["id"] env["created_images"].append(image_id) # Verify image metadata immediately response = client.get(f"/api/v1/images/{image_id}", headers=headers) assert response.status_code == 200 retrieved_image = response.json() assert retrieved_image["description"] == f"Consistency test image {unique_suffix}" assert unique_suffix in retrieved_image["tags"] print("✅ Image metadata consistency verified") # Cleanup the test team try: client.delete(f"/api/v1/teams/{team_id}", headers=headers) except: pass print("🎉 Data consistency and transactions test passed!") # Utility functions for E2E tests def create_test_image(width: int = 100, height: int = 100, color: str = 'red') -> io.BytesIO: """Create a test image for upload testing""" img = PILImage.new('RGB', (width, height), color=color) img_bytes = io.BytesIO() img.save(img_bytes, format='JPEG') img_bytes.seek(0) return img_bytes def create_test_images_batch(count: int = 5, base_name: str = "test") -> List[io.BytesIO]: """Create a batch of test images""" images = [] colors = ['red', 'blue', 'green', 'yellow', 'purple', 'orange', 'pink', 'brown', 'gray', 'black'] for i in range(count): color = colors[i % len(colors)] img = create_test_image(color=color) images.append(img) return images if __name__ == "__main__": # Run E2E tests pytest.main([__file__, "-v", "-m", "e2e"])