347 lines
12 KiB
Python
347 lines
12 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Script to seed the Firestore database with initial data.
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import asyncio
|
|
import logging
|
|
import argparse
|
|
import string
|
|
from datetime import datetime, timedelta
|
|
import secrets
|
|
import hashlib
|
|
import json
|
|
|
|
from dotenv import load_dotenv
|
|
|
|
# Load environment variables from .env file
|
|
load_dotenv()
|
|
|
|
# Add the parent directory to the path so we can import from src
|
|
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
|
|
|
|
from src.db.models.team import TeamModel
|
|
from src.db.models.user import UserModel
|
|
from src.db.models.api_key import ApiKeyModel
|
|
from src.db.models.image import ImageModel
|
|
from src.db.providers.firestore_provider import firestore_db
|
|
from src.db.repositories.firestore_team_repository import firestore_team_repository
|
|
from src.db.repositories.firestore_user_repository import firestore_user_repository
|
|
from src.db.repositories.firestore_api_key_repository import firestore_api_key_repository
|
|
from src.db.repositories.firestore_image_repository import firestore_image_repository
|
|
from src.core.security import hash_api_key as app_hash_api_key
|
|
|
|
# Configure logging
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Custom JSON encoder for handling ObjectId and datetime types
|
|
class CustomJSONEncoder(json.JSONEncoder):
|
|
def default(self, obj):
|
|
if hasattr(obj, '__str__'):
|
|
return str(obj)
|
|
return super().default(obj)
|
|
|
|
def generate_api_key(team_id=None, user_id=None):
|
|
"""Generate a random API key using the same format as the application"""
|
|
# Generate a random key prefix (visible part)
|
|
prefix = ''.join(secrets.choice(string.ascii_letters + string.digits) for _ in range(8))
|
|
|
|
# Generate a secure random token for the key
|
|
random_part = secrets.token_hex(16)
|
|
|
|
# Format: prefix.random_part
|
|
raw_api_key = f"{prefix}.{random_part}"
|
|
|
|
return raw_api_key
|
|
|
|
def hash_api_key(api_key):
|
|
"""Hash an API key for storage using the application's hashing method"""
|
|
return app_hash_api_key(api_key)
|
|
|
|
async def clear_database():
|
|
"""Clear all collections from the database"""
|
|
logger.info("Clearing database collections...")
|
|
|
|
collections = ["teams", "users", "api_keys", "images"]
|
|
|
|
for collection_name in collections:
|
|
try:
|
|
# Get all documents in collection
|
|
docs = firestore_db.get_collection(collection_name).stream()
|
|
for doc in docs:
|
|
doc.reference.delete()
|
|
logger.info(f"Cleared collection: {collection_name}")
|
|
except Exception as e:
|
|
logger.error(f"Error clearing collection {collection_name}: {e}")
|
|
raise
|
|
|
|
logger.info("Database cleared successfully!")
|
|
|
|
async def seed_teams():
|
|
"""Seed the database with team data"""
|
|
logger.info("Seeding teams...")
|
|
|
|
teams_data = [
|
|
{
|
|
"name": "Sereact Development",
|
|
"description": "Internal development team"
|
|
},
|
|
{
|
|
"name": "Marketing Team",
|
|
"description": "Marketing and design team"
|
|
},
|
|
{
|
|
"name": "Customer Support",
|
|
"description": "Customer support and success team"
|
|
}
|
|
]
|
|
|
|
team_ids = []
|
|
for team_data in teams_data:
|
|
team = TeamModel(**team_data)
|
|
created_team = await firestore_team_repository.create(team)
|
|
team_ids.append(created_team.id)
|
|
logger.info(f"Created team: {created_team.name} (ID: {created_team.id})")
|
|
|
|
return team_ids
|
|
|
|
async def seed_users(team_ids):
|
|
"""Seed the database with user data"""
|
|
logger.info("Seeding users...")
|
|
|
|
users_data = [
|
|
{
|
|
"email": "admin@sereact.com",
|
|
"name": "Admin User",
|
|
"team_id": team_ids[0],
|
|
"is_admin": True
|
|
},
|
|
{
|
|
"email": "developer@sereact.com",
|
|
"name": "Developer User",
|
|
"team_id": team_ids[0]
|
|
},
|
|
{
|
|
"email": "marketing@sereact.com",
|
|
"name": "Marketing User",
|
|
"team_id": team_ids[1]
|
|
},
|
|
{
|
|
"email": "support@sereact.com",
|
|
"name": "Support User",
|
|
"team_id": team_ids[2]
|
|
}
|
|
]
|
|
|
|
user_ids = []
|
|
for user_data in users_data:
|
|
user = UserModel(**user_data)
|
|
created_user = await firestore_user_repository.create(user)
|
|
user_ids.append(created_user.id)
|
|
logger.info(f"Created user: {created_user.name} (ID: {created_user.id})")
|
|
|
|
return user_ids
|
|
|
|
async def seed_api_keys(user_ids, team_ids):
|
|
"""Seed the database with API key data"""
|
|
logger.info("Seeding API keys...")
|
|
|
|
api_keys_data = [
|
|
{
|
|
"user_id": user_ids[0],
|
|
"team_id": team_ids[0],
|
|
"name": "Admin Key",
|
|
"description": "API key for admin user"
|
|
},
|
|
{
|
|
"user_id": user_ids[1],
|
|
"team_id": team_ids[0],
|
|
"name": "Development Key",
|
|
"description": "API key for development user"
|
|
},
|
|
{
|
|
"user_id": user_ids[2],
|
|
"team_id": team_ids[1],
|
|
"name": "Marketing Key",
|
|
"description": "API key for marketing user"
|
|
},
|
|
{
|
|
"user_id": user_ids[3],
|
|
"team_id": team_ids[2],
|
|
"name": "Support Key",
|
|
"description": "API key for support user"
|
|
}
|
|
]
|
|
|
|
generated_keys = []
|
|
for api_key_data in api_keys_data:
|
|
# Generate a unique API key
|
|
api_key = generate_api_key()
|
|
key_hash = hash_api_key(api_key)
|
|
|
|
# Create API key object
|
|
api_key_data["key_hash"] = key_hash
|
|
api_key_data["expiry_date"] = datetime.utcnow() + timedelta(days=365)
|
|
|
|
api_key_obj = ApiKeyModel(**api_key_data)
|
|
created_api_key = await firestore_api_key_repository.create(api_key_obj)
|
|
|
|
# Convert ObjectId to string for JSON serialization
|
|
key_data = {
|
|
"id": str(created_api_key.id),
|
|
"key": api_key,
|
|
"name": created_api_key.name,
|
|
"user_id": str(created_api_key.user_id),
|
|
"team_id": str(created_api_key.team_id)
|
|
}
|
|
|
|
generated_keys.append(key_data)
|
|
logger.info(f"Created API key: {created_api_key.name} (ID: {created_api_key.id})")
|
|
|
|
# Save API keys to a file
|
|
api_keys_file = "api_keys.json"
|
|
with open(api_keys_file, "w") as f:
|
|
json.dump(generated_keys, f, indent=2, cls=CustomJSONEncoder)
|
|
|
|
# Save as plain text too for easy access
|
|
with open("api_keys.txt", "w") as f:
|
|
f.write("API KEYS\n")
|
|
f.write("="*80 + "\n\n")
|
|
for key in generated_keys:
|
|
f.write(f"Name: {key['name']}\n")
|
|
f.write(f"Key: {key['key']}\n")
|
|
f.write(f"ID: {key['id']}\n")
|
|
f.write("-"*80 + "\n\n")
|
|
|
|
# Print the generated keys prominently
|
|
print("\n")
|
|
print("="*80)
|
|
print(" GENERATED API KEYS")
|
|
print("="*80)
|
|
print("")
|
|
for key in generated_keys:
|
|
print(f"Name: {key['name']}")
|
|
print(f"Key: {key['key']}")
|
|
print(f"ID: {key['id']}")
|
|
print("-"*80)
|
|
print("\n")
|
|
|
|
logger.info(f"API keys saved to {api_keys_file} and api_keys.txt")
|
|
|
|
return generated_keys
|
|
|
|
async def seed_images(team_ids, user_ids):
|
|
"""Seed the database with image metadata"""
|
|
logger.info("Seeding images...")
|
|
|
|
images_data = [
|
|
{
|
|
"filename": "image1.jpg",
|
|
"original_filename": "product_photo.jpg",
|
|
"file_size": 1024 * 1024, # 1MB
|
|
"content_type": "image/jpeg",
|
|
"storage_path": "teams/{}/images/image1.jpg".format(team_ids[0]),
|
|
"public_url": "https://storage.googleapis.com/example-bucket/teams/{}/images/image1.jpg".format(team_ids[0]),
|
|
"team_id": team_ids[0],
|
|
"uploader_id": user_ids[0],
|
|
"description": "Product photo for marketing",
|
|
"tags": ["product", "marketing", "high-resolution"],
|
|
"metadata": {
|
|
"width": 1920,
|
|
"height": 1080,
|
|
"color_space": "sRGB"
|
|
}
|
|
},
|
|
{
|
|
"filename": "image2.png",
|
|
"original_filename": "logo.png",
|
|
"file_size": 512 * 1024, # 512KB
|
|
"content_type": "image/png",
|
|
"storage_path": "teams/{}/images/image2.png".format(team_ids[1]),
|
|
"public_url": "https://storage.googleapis.com/example-bucket/teams/{}/images/image2.png".format(team_ids[1]),
|
|
"team_id": team_ids[1],
|
|
"uploader_id": user_ids[2],
|
|
"description": "Company logo",
|
|
"tags": ["logo", "branding"],
|
|
"metadata": {
|
|
"width": 800,
|
|
"height": 600,
|
|
"color_space": "sRGB"
|
|
}
|
|
},
|
|
{
|
|
"filename": "image3.jpg",
|
|
"original_filename": "support_screenshot.jpg",
|
|
"file_size": 256 * 1024, # 256KB
|
|
"content_type": "image/jpeg",
|
|
"storage_path": "teams/{}/images/image3.jpg".format(team_ids[2]),
|
|
"public_url": "https://storage.googleapis.com/example-bucket/teams/{}/images/image3.jpg".format(team_ids[2]),
|
|
"team_id": team_ids[2],
|
|
"uploader_id": user_ids[3],
|
|
"description": "Screenshot for support ticket",
|
|
"tags": ["support", "screenshot", "bug"],
|
|
"metadata": {
|
|
"width": 1280,
|
|
"height": 720,
|
|
"color_space": "sRGB"
|
|
}
|
|
}
|
|
]
|
|
|
|
image_ids = []
|
|
for image_data in images_data:
|
|
image = ImageModel(**image_data)
|
|
created_image = await firestore_image_repository.create(image)
|
|
image_ids.append(created_image.id)
|
|
logger.info(f"Created image: {created_image.filename} (ID: {created_image.id})")
|
|
|
|
return image_ids
|
|
|
|
async def seed_database(clear=False):
|
|
"""Seed the database with initial data"""
|
|
try:
|
|
# Connect to Firestore
|
|
firestore_db.connect()
|
|
|
|
# Clear database if requested
|
|
if clear:
|
|
await clear_database()
|
|
|
|
# Seed teams first
|
|
team_ids = await seed_teams()
|
|
|
|
# Seed users with team IDs
|
|
user_ids = await seed_users(team_ids)
|
|
|
|
# Seed API keys with user and team IDs
|
|
api_keys = await seed_api_keys(user_ids, team_ids)
|
|
|
|
# Seed images with team and user IDs
|
|
image_ids = await seed_images(team_ids, user_ids)
|
|
|
|
logger.info("Database seeding completed successfully!")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error seeding database: {e}")
|
|
raise
|
|
finally:
|
|
# Disconnect from Firestore
|
|
firestore_db.disconnect()
|
|
|
|
def main():
|
|
"""Main entry point"""
|
|
parser = argparse.ArgumentParser(description="Seed the Firestore database with initial data")
|
|
parser.add_argument("--clear", action="store_true", help="Clear all collections before seeding")
|
|
parser.add_argument("--force", action="store_true", help="Force seeding even if data exists")
|
|
args = parser.parse_args()
|
|
|
|
asyncio.run(seed_database(clear=args.clear))
|
|
|
|
if __name__ == "__main__":
|
|
main() |