from typing import Any, Dict, List, Optional, Type import logging import os from google.cloud import firestore from pydantic import BaseModel from src.core.config import settings from src.db.models.team import TeamModel from src.db.models.user import UserModel from src.db.models.api_key import ApiKeyModel from src.db.models.image import ImageModel logger = logging.getLogger(__name__) class FirestoreProvider: """Provider for Firestore database operations""" def __init__(self): self.client = None self._db = None self._collections = { "teams": TeamModel, "users": UserModel, "api_keys": ApiKeyModel, "images": ImageModel } def connect(self): """Connect to Firestore""" try: if settings.GCS_CREDENTIALS_FILE and os.path.exists(settings.GCS_CREDENTIALS_FILE): self.client = firestore.Client.from_service_account_json( settings.GCS_CREDENTIALS_FILE, database='imagedb' # Use the specific database ID ) else: # Use application default credentials with specific database self.client = firestore.Client(database='imagedb') self._db = self.client logger.info("Connected to Firestore database: imagedb") return True except Exception as e: logger.error(f"Failed to connect to Firestore: {e}") raise def disconnect(self): """Disconnect from Firestore""" try: self.client = None self._db = None logger.info("Disconnected from Firestore") except Exception as e: logger.error(f"Error disconnecting from Firestore: {e}") def get_collection(self, collection_name: str): """Get a Firestore collection reference""" if not self._db: raise ValueError("Not connected to Firestore") return self._db.collection(collection_name) async def add_document(self, collection_name: str, data: Dict[str, Any]) -> str: """ Add a document to a collection Args: collection_name: Collection name data: Document data Returns: Document ID """ try: collection = self.get_collection(collection_name) # Handle ObjectId conversion for Firestore processed_data = {} for key, value in data.items(): if value is None: processed_data[key] = None elif hasattr(value, '__str__') and key != 'id': processed_data[key] = str(value) else: processed_data[key] = value # Handle special case for document ID doc_id = None if "_id" in processed_data: doc_id = str(processed_data["_id"]) del processed_data["_id"] # Add document to Firestore if doc_id: doc_ref = collection.document(doc_id) doc_ref.set(processed_data) return doc_id else: doc_ref = collection.add(processed_data) return doc_ref[1].id except Exception as e: logger.error(f"Error adding document to {collection_name}: {e}") raise async def get_document(self, collection_name: str, doc_id: str) -> Optional[Dict[str, Any]]: """ Get a document by ID Args: collection_name: Collection name doc_id: Document ID Returns: Document data if found, None otherwise """ try: doc_ref = self.get_collection(collection_name).document(doc_id) doc = doc_ref.get() if doc.exists: data = doc.to_dict() # Properly handle None values and complex types for key, value in data.items(): if value == 'None' or value == 'null': data[key] = None # Handle lists stored as strings elif isinstance(value, str) and value.startswith('[') and value.endswith(']'): try: import ast data[key] = ast.literal_eval(value) except (SyntaxError, ValueError): pass # Keep as string if conversion fails # Handle dictionaries stored as strings elif isinstance(value, str) and value.startswith('{') and value.endswith('}'): try: import ast data[key] = ast.literal_eval(value) except (SyntaxError, ValueError): pass # Keep as string if conversion fails data["_id"] = doc_id return data return None except Exception as e: logger.error(f"Error getting document from {collection_name}: {e}") raise async def list_documents(self, collection_name: str) -> List[Dict[str, Any]]: """ List all documents in a collection Args: collection_name: Collection name Returns: List of documents """ try: docs = self.get_collection(collection_name).stream() results = [] for doc in docs: data = doc.to_dict() data["_id"] = doc.id results.append(data) return results except Exception as e: logger.error(f"Error listing documents in {collection_name}: {e}") raise async def update_document(self, collection_name: str, doc_id: str, data: Dict[str, Any]) -> bool: """ Update a document Args: collection_name: Collection name doc_id: Document ID data: Update data Returns: True if document was updated, False otherwise """ try: # Process data for Firestore processed_data = {} for key, value in data.items(): if key == "_id": continue elif value is None: processed_data[key] = None elif hasattr(value, '__str__'): processed_data[key] = str(value) else: processed_data[key] = value doc_ref = self.get_collection(collection_name).document(doc_id) doc_ref.update(processed_data) return True except Exception as e: logger.error(f"Error updating document in {collection_name}: {e}") raise async def delete_document(self, collection_name: str, doc_id: str) -> bool: """ Delete a document Args: collection_name: Collection name doc_id: Document ID Returns: True if document was deleted, False otherwise """ try: doc_ref = self.get_collection(collection_name).document(doc_id) doc_ref.delete() return True except Exception as e: logger.error(f"Error deleting document from {collection_name}: {e}") raise def convert_to_model(self, model_class: Type[BaseModel], doc_data: Dict[str, Any]) -> BaseModel: """ Convert Firestore document data to a Pydantic model Args: model_class: Pydantic model class doc_data: Firestore document data Returns: Model instance """ return model_class(**doc_data) # Create a singleton provider firestore_db = FirestoreProvider()