#!/usr/bin/env python3 """ Script to seed the Firestore database with initial data. """ import os import sys import asyncio import logging import argparse import string from datetime import datetime, timedelta import secrets import hashlib import json from io import BytesIO from PIL import Image from dotenv import load_dotenv # Load environment variables from .env file load_dotenv() # Add the parent directory to the path so we can import from src sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) from src.models.team import TeamModel from src.models.user import UserModel from src.models.api_key import ApiKeyModel from src.models.image import ImageModel from src.db.providers.firestore_provider import firestore_db from src.db.repositories.firestore_team_repository import firestore_team_repository from src.db.repositories.firestore_user_repository import firestore_user_repository from src.db.repositories.firestore_api_key_repository import firestore_api_key_repository from src.db.repositories.firestore_image_repository import firestore_image_repository from src.auth.security import hash_api_key as app_hash_api_key from src.services.storage import StorageService from src.services.pubsub_service import pubsub_service # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', ) logger = logging.getLogger(__name__) # Custom JSON encoder for handling ObjectId and datetime types class CustomJSONEncoder(json.JSONEncoder): def default(self, obj): if hasattr(obj, '__str__'): return str(obj) return super().default(obj) # Initialize services storage_service = StorageService() def create_sample_image(width=800, height=600, color=(100, 150, 200), filename="sample.jpg"): """Create a sample image for testing""" # Create a new image with the specified color img = Image.new('RGB', (width, height), color) # Add some simple graphics to make it more interesting from PIL import ImageDraw, ImageFont draw = ImageDraw.Draw(img) # Draw some rectangles draw.rectangle([50, 50, width-50, height-50], outline=(255, 255, 255), width=3) draw.rectangle([100, 100, width-100, height-100], outline=(255, 255, 0), width=2) # Add text try: # Try to use a default font font = ImageFont.load_default() text = f"Sample Image\n{width}x{height}" draw.text((width//2 - 50, height//2 - 20), text, fill=(255, 255, 255), font=font) except: # If font loading fails, just draw without text pass # Save to BytesIO img_bytes = BytesIO() img.save(img_bytes, format='JPEG', quality=85) img_bytes.seek(0) return img_bytes class MockUploadFile: """Mock UploadFile class to simulate FastAPI's UploadFile""" def __init__(self, content: BytesIO, filename: str, content_type: str): self.file = content self.filename = filename self.content_type = content_type self._position = 0 async def read(self, size: int = -1) -> bytes: return self.file.read(size) async def seek(self, position: int) -> None: self.file.seek(position) self._position = position def generate_api_key(team_id=None, user_id=None): """Generate a random API key using the same format as the application""" # Generate a random key prefix (visible part) prefix = ''.join(secrets.choice(string.ascii_letters + string.digits) for _ in range(8)) # Generate a secure random token for the key random_part = secrets.token_hex(16) # Format: prefix.random_part raw_api_key = f"{prefix}.{random_part}" return raw_api_key def hash_api_key(api_key): """Hash an API key for storage using the application's hashing method""" return app_hash_api_key(api_key) async def clear_database(): """Clear all collections from the database""" logger.info("Clearing database collections...") collections = ["teams", "users", "api_keys", "images"] for collection_name in collections: try: # Get all documents in collection docs = firestore_db.get_collection(collection_name).stream() for doc in docs: doc.reference.delete() logger.info(f"Cleared collection: {collection_name}") except Exception as e: logger.error(f"Error clearing collection {collection_name}: {e}") raise logger.info("Database cleared successfully!") async def seed_teams(): """Seed the database with team data""" logger.info("Seeding teams...") teams_data = [ { "name": "Sereact Development", "description": "Internal development team" }, { "name": "Marketing Team", "description": "Marketing and design team" }, { "name": "Customer Support", "description": "Customer support and success team" } ] team_ids = [] for team_data in teams_data: team = TeamModel(**team_data) created_team = await firestore_team_repository.create(team) team_ids.append(created_team.id) logger.info(f"Created team: {created_team.name} (ID: {created_team.id})") return team_ids async def seed_users(team_ids): """Seed the database with user data""" logger.info("Seeding users...") users_data = [ { "email": "admin@sereact.com", "name": "Admin User", "team_id": team_ids[0], "is_admin": True }, { "email": "developer@sereact.com", "name": "Developer User", "team_id": team_ids[0] }, { "email": "marketing@sereact.com", "name": "Marketing User", "team_id": team_ids[1] }, { "email": "support@sereact.com", "name": "Support User", "team_id": team_ids[2] } ] user_ids = [] for user_data in users_data: user = UserModel(**user_data) created_user = await firestore_user_repository.create(user) user_ids.append(created_user.id) logger.info(f"Created user: {created_user.name} (ID: {created_user.id})") return user_ids async def seed_api_keys(user_ids, team_ids): """Seed the database with API key data""" logger.info("Seeding API keys...") api_keys_data = [ { "user_id": user_ids[0], "team_id": team_ids[0], "name": "Admin Key", "description": "API key for admin user" }, { "user_id": user_ids[1], "team_id": team_ids[0], "name": "Development Key", "description": "API key for development user" }, { "user_id": user_ids[2], "team_id": team_ids[1], "name": "Marketing Key", "description": "API key for marketing user" }, { "user_id": user_ids[3], "team_id": team_ids[2], "name": "Support Key", "description": "API key for support user" } ] generated_keys = [] for api_key_data in api_keys_data: # Generate a unique API key api_key = generate_api_key() key_hash = hash_api_key(api_key) # Create API key object api_key_data["key_hash"] = key_hash api_key_data["expiry_date"] = datetime.utcnow() + timedelta(days=365) api_key_obj = ApiKeyModel(**api_key_data) created_api_key = await firestore_api_key_repository.create(api_key_obj) # Convert ObjectId to string for JSON serialization key_data = { "id": str(created_api_key.id), "key": api_key, "name": created_api_key.name, "user_id": str(created_api_key.user_id), "team_id": str(created_api_key.team_id) } generated_keys.append(key_data) logger.info(f"Created API key: {created_api_key.name} (ID: {created_api_key.id})") # Save API keys to a file # api_keys_file = "api_keys.json" # with open(api_keys_file, "w") as f: # json.dump(generated_keys, f, indent=2, cls=CustomJSONEncoder) # Save as plain text too for easy access # with open("api_keys.txt", "w") as f: # f.write("API KEYS\n") # f.write("="*80 + "\n\n") # for key in generated_keys: # f.write(f"Name: {key['name']}\n") # f.write(f"Key: {key['key']}\n") # f.write(f"ID: {key['id']}\n") # f.write("-"*80 + "\n\n") # Print the generated keys prominently print("\n") print("="*80) print(" GENERATED API KEYS") print("="*80) print("") for key in generated_keys: print(f"Name: {key['name']}") print(f"Key: {key['key']}") print(f"ID: {key['id']}") print("-"*80) print("\n") # logger.info(f"API keys saved to {api_keys_file} and api_keys.txt") return generated_keys async def seed_images(team_ids, user_ids): """Seed the database with real image uploads using the application's business logic""" logger.info("Seeding images with real uploads...") # Define sample images to create and upload sample_images = [ { "filename": "product_photo.jpg", "description": "Product photo for marketing", "tags": ["product", "marketing", "high-resolution"], "team_idx": 0, "user_idx": 0, "width": 1920, "height": 1080, "color": (70, 130, 180) # Steel blue }, { "filename": "company_logo.png", "description": "Company logo", "tags": ["logo", "branding"], "team_idx": 1, "user_idx": 2, "width": 800, "height": 600, "color": (255, 165, 0) # Orange }, { "filename": "support_screenshot.jpg", "description": "Screenshot for support ticket", "tags": ["support", "screenshot", "bug"], "team_idx": 2, "user_idx": 3, "width": 1280, "height": 720, "color": (144, 238, 144) # Light green } ] image_ids = [] for img_config in sample_images: try: logger.info(f"Creating and uploading image: {img_config['filename']}") # Create sample image img_content = create_sample_image( width=img_config['width'], height=img_config['height'], color=img_config['color'], filename=img_config['filename'] ) # Create mock upload file content_type = "image/jpeg" if img_config['filename'].endswith('.jpg') else "image/png" mock_file = MockUploadFile( content=img_content, filename=img_config['filename'], content_type=content_type ) # Get team and user IDs team_id = team_ids[img_config['team_idx']] user_id = user_ids[img_config['user_idx']] # Upload to storage using the actual StorageService logger.info(f"Uploading {img_config['filename']} to Google Cloud Storage...") storage_path, content_type, file_size, metadata = await storage_service.upload_file( mock_file, str(team_id) ) # Generate public URL public_url = storage_service.generate_public_url(storage_path) # Create image record using the actual business logic image = ImageModel( filename=img_config['filename'], original_filename=img_config['filename'], file_size=file_size, content_type=content_type, storage_path=storage_path, public_url=public_url, team_id=team_id, uploader_id=user_id, description=img_config['description'], tags=img_config['tags'], metadata=metadata ) # Save to database created_image = await firestore_image_repository.create(image) image_ids.append(created_image.id) logger.info(f"Created image record: {created_image.filename} (ID: {created_image.id})") logger.info(f"Storage path: {storage_path}") logger.info(f"Public URL: {public_url}") # Publish image processing task to Pub/Sub (this triggers Cloud Run) try: logger.info(f"Publishing image processing task to Pub/Sub for image {created_image.id}...") task_published = await pubsub_service.publish_image_processing_task( image_id=str(created_image.id), storage_path=storage_path, team_id=str(team_id) ) if task_published: logger.info(f"✅ Successfully published processing task for image {created_image.id}") else: logger.warning(f"❌ Failed to publish processing task for image {created_image.id}") except Exception as e: logger.warning(f"❌ Failed to publish image processing task: {e}") except Exception as e: logger.error(f"Error creating image {img_config['filename']}: {e}") raise logger.info(f"Successfully seeded {len(image_ids)} images with real uploads!") logger.info("Images uploaded to Google Cloud Storage and processing tasks sent to Cloud Run") return image_ids async def seed_database(clear=False): """Seed the database with initial data""" try: # Connect to Firestore firestore_db.connect() # Clear database if requested if clear: await clear_database() # Seed teams first team_ids = await seed_teams() # Seed users with team IDs user_ids = await seed_users(team_ids) # Seed API keys with user and team IDs api_keys = await seed_api_keys(user_ids, team_ids) # Seed images with team and user IDs image_ids = await seed_images(team_ids, user_ids) logger.info("Database seeding completed successfully!") except Exception as e: logger.error(f"Error seeding database: {e}") raise finally: # Disconnect from Firestore firestore_db.disconnect() def main(): """Main entry point""" parser = argparse.ArgumentParser(description="Seed the Firestore database with initial data") parser.add_argument("--clear", action="store_true", help="Clear all collections before seeding") parser.add_argument("--force", action="store_true", help="Force seeding even if data exists") args = parser.parse_args() asyncio.run(seed_database(clear=args.clear)) if __name__ == "__main__": main()