image_management_api/scripts/seed_firestore.py
2025-05-26 18:47:21 +02:00

448 lines
15 KiB
Python

#!/usr/bin/env python3
"""
Script to seed the Firestore database with initial data.
"""
import os
import sys
import asyncio
import logging
import argparse
import string
from datetime import datetime, timedelta
import secrets
import hashlib
import json
from io import BytesIO
from PIL import Image
from dotenv import load_dotenv
# Load environment variables from .env file
load_dotenv()
# Add the parent directory to the path so we can import from src
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
from src.models.team import TeamModel
from src.models.user import UserModel
from src.models.api_key import ApiKeyModel
from src.models.image import ImageModel
from src.db.providers.firestore_provider import firestore_db
from src.db.repositories.firestore_team_repository import firestore_team_repository
from src.db.repositories.firestore_user_repository import firestore_user_repository
from src.db.repositories.firestore_api_key_repository import firestore_api_key_repository
from src.db.repositories.firestore_image_repository import firestore_image_repository
from src.auth.security import hash_api_key as app_hash_api_key
from src.services.storage import StorageService
from src.services.pubsub_service import pubsub_service
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
)
logger = logging.getLogger(__name__)
# Custom JSON encoder for handling ObjectId and datetime types
class CustomJSONEncoder(json.JSONEncoder):
def default(self, obj):
if hasattr(obj, '__str__'):
return str(obj)
return super().default(obj)
# Initialize services
storage_service = StorageService()
def create_sample_image(width=800, height=600, color=(100, 150, 200), filename="sample.jpg"):
"""Create a sample image for testing"""
# Create a new image with the specified color
img = Image.new('RGB', (width, height), color)
# Add some simple graphics to make it more interesting
from PIL import ImageDraw, ImageFont
draw = ImageDraw.Draw(img)
# Draw some rectangles
draw.rectangle([50, 50, width-50, height-50], outline=(255, 255, 255), width=3)
draw.rectangle([100, 100, width-100, height-100], outline=(255, 255, 0), width=2)
# Add text
try:
# Try to use a default font
font = ImageFont.load_default()
text = f"Sample Image\n{width}x{height}"
draw.text((width//2 - 50, height//2 - 20), text, fill=(255, 255, 255), font=font)
except:
# If font loading fails, just draw without text
pass
# Save to BytesIO
img_bytes = BytesIO()
img.save(img_bytes, format='JPEG', quality=85)
img_bytes.seek(0)
return img_bytes
class MockUploadFile:
"""Mock UploadFile class to simulate FastAPI's UploadFile"""
def __init__(self, content: BytesIO, filename: str, content_type: str):
self.file = content
self.filename = filename
self.content_type = content_type
self._position = 0
async def read(self, size: int = -1) -> bytes:
return self.file.read(size)
async def seek(self, position: int) -> None:
self.file.seek(position)
self._position = position
def generate_api_key(team_id=None, user_id=None):
"""Generate a random API key using the same format as the application"""
# Generate a random key prefix (visible part)
prefix = ''.join(secrets.choice(string.ascii_letters + string.digits) for _ in range(8))
# Generate a secure random token for the key
random_part = secrets.token_hex(16)
# Format: prefix.random_part
raw_api_key = f"{prefix}.{random_part}"
return raw_api_key
def hash_api_key(api_key):
"""Hash an API key for storage using the application's hashing method"""
return app_hash_api_key(api_key)
async def clear_database():
"""Clear all collections from the database"""
logger.info("Clearing database collections...")
collections = ["teams", "users", "api_keys", "images"]
for collection_name in collections:
try:
# Get all documents in collection
docs = firestore_db.get_collection(collection_name).stream()
for doc in docs:
doc.reference.delete()
logger.info(f"Cleared collection: {collection_name}")
except Exception as e:
logger.error(f"Error clearing collection {collection_name}: {e}")
raise
logger.info("Database cleared successfully!")
async def seed_teams():
"""Seed the database with team data"""
logger.info("Seeding teams...")
teams_data = [
{
"name": "Contoso Development",
"description": "Internal development team"
},
{
"name": "Marketing Team",
"description": "Marketing and design team"
},
{
"name": "Customer Support",
"description": "Customer support and success team"
}
]
team_ids = []
for team_data in teams_data:
team = TeamModel(**team_data)
created_team = await firestore_team_repository.create(team)
team_ids.append(created_team.id)
logger.info(f"Created team: {created_team.name} (ID: {created_team.id})")
return team_ids
async def seed_users(team_ids):
"""Seed the database with user data"""
logger.info("Seeding users...")
users_data = [
{
"email": "admin@contoso.com",
"name": "Admin User",
"team_id": team_ids[0],
"is_admin": True
},
{
"email": "developer@contoso.com",
"name": "Developer User",
"team_id": team_ids[0]
},
{
"email": "marketing@contoso.com",
"name": "Marketing User",
"team_id": team_ids[1]
},
{
"email": "support@contoso.com",
"name": "Support User",
"team_id": team_ids[2]
}
]
user_ids = []
for user_data in users_data:
user = UserModel(**user_data)
created_user = await firestore_user_repository.create(user)
user_ids.append(created_user.id)
logger.info(f"Created user: {created_user.name} (ID: {created_user.id})")
return user_ids
async def seed_api_keys(user_ids, team_ids):
"""Seed the database with API key data"""
logger.info("Seeding API keys...")
api_keys_data = [
{
"user_id": user_ids[0],
"team_id": team_ids[0],
"name": "Admin Key",
"description": "API key for admin user"
},
{
"user_id": user_ids[1],
"team_id": team_ids[0],
"name": "Development Key",
"description": "API key for development user"
},
{
"user_id": user_ids[2],
"team_id": team_ids[1],
"name": "Marketing Key",
"description": "API key for marketing user"
},
{
"user_id": user_ids[3],
"team_id": team_ids[2],
"name": "Support Key",
"description": "API key for support user"
}
]
generated_keys = []
for api_key_data in api_keys_data:
# Generate a unique API key
api_key = generate_api_key()
key_hash = hash_api_key(api_key)
# Create API key object
api_key_data["key_hash"] = key_hash
api_key_data["expiry_date"] = datetime.utcnow() + timedelta(days=365)
api_key_obj = ApiKeyModel(**api_key_data)
created_api_key = await firestore_api_key_repository.create(api_key_obj)
# Convert ObjectId to string for JSON serialization
key_data = {
"id": str(created_api_key.id),
"key": api_key,
"name": created_api_key.name,
"user_id": str(created_api_key.user_id),
"team_id": str(created_api_key.team_id)
}
generated_keys.append(key_data)
logger.info(f"Created API key: {created_api_key.name} (ID: {created_api_key.id})")
# Save API keys to a file
# api_keys_file = "api_keys.json"
# with open(api_keys_file, "w") as f:
# json.dump(generated_keys, f, indent=2, cls=CustomJSONEncoder)
# Save as plain text too for easy access
# with open("api_keys.txt", "w") as f:
# f.write("API KEYS\n")
# f.write("="*80 + "\n\n")
# for key in generated_keys:
# f.write(f"Name: {key['name']}\n")
# f.write(f"Key: {key['key']}\n")
# f.write(f"ID: {key['id']}\n")
# f.write("-"*80 + "\n\n")
# Print the generated keys prominently
print("\n")
print("="*80)
print(" GENERATED API KEYS")
print("="*80)
print("")
for key in generated_keys:
print(f"Name: {key['name']}")
print(f"Key: {key['key']}")
print(f"ID: {key['id']}")
print("-"*80)
print("\n")
# logger.info(f"API keys saved to {api_keys_file} and api_keys.txt")
return generated_keys
async def seed_images(team_ids, user_ids):
"""Seed the database with real image uploads using the application's business logic"""
logger.info("Seeding images with real uploads...")
# Define sample images to create and upload
sample_images = [
{
"filename": "product_photo.jpg",
"description": "Product photo for marketing",
"team_idx": 0,
"user_idx": 0,
"width": 1920,
"height": 1080,
"color": (70, 130, 180) # Steel blue
},
{
"filename": "company_logo.png",
"description": "Company logo",
"team_idx": 1,
"user_idx": 2,
"width": 800,
"height": 600,
"color": (255, 165, 0) # Orange
},
{
"filename": "support_screenshot.jpg",
"description": "Screenshot for support ticket",
"team_idx": 2,
"user_idx": 3,
"width": 1280,
"height": 720,
"color": (144, 238, 144) # Light green
}
]
image_ids = []
for img_config in sample_images:
try:
logger.info(f"Creating and uploading image: {img_config['filename']}")
# Create sample image
img_content = create_sample_image(
width=img_config['width'],
height=img_config['height'],
color=img_config['color'],
filename=img_config['filename']
)
# Create mock upload file
content_type = "image/jpeg" if img_config['filename'].endswith('.jpg') else "image/png"
mock_file = MockUploadFile(
content=img_content,
filename=img_config['filename'],
content_type=content_type
)
# Get team and user IDs
team_id = team_ids[img_config['team_idx']]
user_id = user_ids[img_config['user_idx']]
# Upload to storage using the actual StorageService
logger.info(f"Uploading {img_config['filename']} to Google Cloud Storage...")
storage_path, content_type, file_size, metadata = await storage_service.upload_file(
mock_file, str(team_id)
)
# Generate public URL
public_url = storage_service.generate_public_url(storage_path)
# Create image record using the actual business logic
image = ImageModel(
filename=img_config['filename'],
original_filename=img_config['filename'],
file_size=file_size,
content_type=content_type,
storage_path=storage_path,
public_url=public_url,
team_id=team_id,
uploader_id=user_id,
description=img_config['description'],
metadata=metadata
)
# Save to database
created_image = await firestore_image_repository.create(image)
image_ids.append(created_image.id)
logger.info(f"Created image record: {created_image.filename} (ID: {created_image.id})")
logger.info(f"Storage path: {storage_path}")
logger.info(f"Public URL: {public_url}")
# Publish image processing task to Pub/Sub (this triggers Cloud Run)
try:
logger.info(f"Publishing image processing task to Pub/Sub for image {created_image.id}...")
task_published = await pubsub_service.publish_image_processing_task(
image_id=str(created_image.id),
storage_path=storage_path,
team_id=str(team_id)
)
if task_published:
logger.info(f"✅ Successfully published processing task for image {created_image.id}")
else:
logger.warning(f"❌ Failed to publish processing task for image {created_image.id}")
except Exception as e:
logger.warning(f"❌ Failed to publish image processing task: {e}")
except Exception as e:
logger.error(f"Error creating image {img_config['filename']}: {e}")
raise
logger.info(f"Successfully seeded {len(image_ids)} images with real uploads!")
logger.info("Images uploaded to Google Cloud Storage and processing tasks sent to Cloud Run")
return image_ids
async def seed_database(clear=False):
"""Seed the database with initial data"""
try:
# Connect to Firestore
firestore_db.connect()
# Clear database if requested
if clear:
await clear_database()
# Seed teams first
team_ids = await seed_teams()
# Seed users with team IDs
user_ids = await seed_users(team_ids)
# Seed API keys with user and team IDs
api_keys = await seed_api_keys(user_ids, team_ids)
# Seed images with team and user IDs
image_ids = await seed_images(team_ids, user_ids)
logger.info("Database seeding completed successfully!")
except Exception as e:
logger.error(f"Error seeding database: {e}")
raise
finally:
# Disconnect from Firestore
firestore_db.disconnect()
def main():
"""Main entry point"""
parser = argparse.ArgumentParser(description="Seed the Firestore database with initial data")
parser.add_argument("--clear", action="store_true", help="Clear all collections before seeding")
parser.add_argument("--force", action="store_true", help="Force seeding even if data exists")
args = parser.parse_args()
asyncio.run(seed_database(clear=args.clear))
if __name__ == "__main__":
main()