201 lines
6.6 KiB
Python
201 lines
6.6 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Test script to verify admin image access functionality.
|
|
This script tests that:
|
|
1. Regular users can only see images from their own team
|
|
2. Admin users can see all images across all teams
|
|
"""
|
|
|
|
import asyncio
|
|
import sys
|
|
import os
|
|
from datetime import datetime
|
|
from bson import ObjectId
|
|
|
|
# Add the src directory to the path
|
|
sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'src'))
|
|
|
|
from src.models.image import ImageModel
|
|
from src.models.user import UserModel
|
|
from src.db.repositories.image_repository import image_repository
|
|
from src.db.repositories.user_repository import user_repository
|
|
from src.db.providers.firestore_provider import firestore_db
|
|
|
|
|
|
async def setup_test_data():
|
|
"""Set up test data for the admin functionality test"""
|
|
print("Setting up test data...")
|
|
|
|
# Create two teams
|
|
team1_id = ObjectId()
|
|
team2_id = ObjectId()
|
|
|
|
# Create users
|
|
regular_user = UserModel(
|
|
email="regular@test.com",
|
|
name="Regular User",
|
|
team_id=team1_id,
|
|
is_admin=False
|
|
)
|
|
|
|
admin_user = UserModel(
|
|
email="admin@test.com",
|
|
name="Admin User",
|
|
team_id=team1_id,
|
|
is_admin=True
|
|
)
|
|
|
|
# Create test images for team 1
|
|
image1_team1 = ImageModel(
|
|
filename="team1-image1.jpg",
|
|
original_filename="team1_image1.jpg",
|
|
file_size=1024,
|
|
content_type="image/jpeg",
|
|
storage_path="images/team1-image1.jpg",
|
|
team_id=team1_id,
|
|
uploader_id=regular_user.id,
|
|
description="Team 1 Image 1",
|
|
tags=["team1", "test"]
|
|
)
|
|
|
|
image2_team1 = ImageModel(
|
|
filename="team1-image2.jpg",
|
|
original_filename="team1_image2.jpg",
|
|
file_size=2048,
|
|
content_type="image/jpeg",
|
|
storage_path="images/team1-image2.jpg",
|
|
team_id=team1_id,
|
|
uploader_id=admin_user.id,
|
|
description="Team 1 Image 2",
|
|
tags=["team1", "admin"]
|
|
)
|
|
|
|
# Create test images for team 2
|
|
image1_team2 = ImageModel(
|
|
filename="team2-image1.jpg",
|
|
original_filename="team2_image1.jpg",
|
|
file_size=1536,
|
|
content_type="image/jpeg",
|
|
storage_path="images/team2-image1.jpg",
|
|
team_id=team2_id,
|
|
uploader_id=ObjectId(), # Different user from team 2
|
|
description="Team 2 Image 1",
|
|
tags=["team2", "test"]
|
|
)
|
|
|
|
return {
|
|
'regular_user': regular_user,
|
|
'admin_user': admin_user,
|
|
'team1_id': team1_id,
|
|
'team2_id': team2_id,
|
|
'images': [image1_team1, image2_team1, image1_team2]
|
|
}
|
|
|
|
|
|
async def test_regular_user_access(regular_user, team1_id):
|
|
"""Test that regular users only see their team's images"""
|
|
print("\n=== Testing Regular User Access ===")
|
|
|
|
# Simulate getting images for regular user (team-filtered)
|
|
team1_images = await image_repository.get_by_team(team1_id, skip=0, limit=50)
|
|
team1_count = await image_repository.count_by_team(team1_id)
|
|
|
|
print(f"Regular user sees {len(team1_images)} images from their team")
|
|
print(f"Total count for team: {team1_count}")
|
|
|
|
for image in team1_images:
|
|
print(f" - {image.filename} (Team: {image.team_id})")
|
|
|
|
# Verify all images belong to the user's team
|
|
for image in team1_images:
|
|
assert image.team_id == team1_id, f"Regular user should not see image from different team: {image.filename}"
|
|
|
|
print("✅ Regular user access test passed - only sees team images")
|
|
return len(team1_images)
|
|
|
|
|
|
async def test_admin_user_access(admin_user):
|
|
"""Test that admin users see all images across all teams"""
|
|
print("\n=== Testing Admin User Access ===")
|
|
|
|
# Simulate getting all images for admin user
|
|
all_images = await image_repository.get_all_with_pagination(skip=0, limit=50)
|
|
all_count = await image_repository.count_all()
|
|
|
|
print(f"Admin user sees {len(all_images)} images across all teams")
|
|
print(f"Total count across all teams: {all_count}")
|
|
|
|
teams_seen = set()
|
|
for image in all_images:
|
|
teams_seen.add(str(image.team_id))
|
|
print(f" - {image.filename} (Team: {image.team_id})")
|
|
|
|
print(f"Admin sees images from {len(teams_seen)} different teams")
|
|
|
|
# Verify admin sees more images than regular user would
|
|
assert len(all_images) >= 2, "Admin should see images from multiple teams"
|
|
assert len(teams_seen) >= 2, "Admin should see images from at least 2 teams"
|
|
|
|
print("✅ Admin user access test passed - sees all images across teams")
|
|
return len(all_images)
|
|
|
|
|
|
async def main():
|
|
"""Main test function"""
|
|
print("🧪 Testing Admin Image Access Functionality")
|
|
print("=" * 50)
|
|
|
|
try:
|
|
# Connect to database
|
|
firestore_db.connect()
|
|
print("✅ Connected to Firestore")
|
|
|
|
# Set up test data
|
|
test_data = await setup_test_data()
|
|
|
|
# Create test images in database
|
|
created_images = []
|
|
for image in test_data['images']:
|
|
created_image = await image_repository.create(image)
|
|
created_images.append(created_image)
|
|
print(f"Created test image: {created_image.filename}")
|
|
|
|
# Test regular user access
|
|
regular_count = await test_regular_user_access(
|
|
test_data['regular_user'],
|
|
test_data['team1_id']
|
|
)
|
|
|
|
# Test admin user access
|
|
admin_count = await test_admin_user_access(test_data['admin_user'])
|
|
|
|
# Verify admin sees more images than regular user
|
|
print(f"\n=== Summary ===")
|
|
print(f"Regular user images: {regular_count}")
|
|
print(f"Admin user images: {admin_count}")
|
|
|
|
if admin_count > regular_count:
|
|
print("✅ SUCCESS: Admin sees more images than regular user")
|
|
else:
|
|
print("❌ FAILURE: Admin should see more images than regular user")
|
|
|
|
# Clean up test data
|
|
print(f"\n=== Cleanup ===")
|
|
for image in created_images:
|
|
await image_repository.delete(image.id)
|
|
print(f"Deleted test image: {image.filename}")
|
|
|
|
print("✅ Test completed successfully!")
|
|
|
|
except Exception as e:
|
|
print(f"❌ Test failed with error: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
finally:
|
|
# Disconnect from database
|
|
firestore_db.disconnect()
|
|
print("✅ Disconnected from Firestore")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main()) |