427 lines
15 KiB
Python
427 lines
15 KiB
Python
import pytest
|
|
from unittest.mock import Mock, patch, AsyncMock
|
|
from src.db.repositories.firestore_repository import FirestoreRepository
|
|
from src.db.repositories.firestore_team_repository import FirestoreTeamRepository
|
|
from src.db.repositories.firestore_user_repository import FirestoreUserRepository
|
|
from src.db.repositories.firestore_api_key_repository import FirestoreApiKeyRepository
|
|
from src.db.repositories.firestore_image_repository import FirestoreImageRepository
|
|
from src.models.team import TeamModel
|
|
from src.models.user import UserModel
|
|
from src.models.api_key import ApiKeyModel
|
|
from src.models.image import ImageModel
|
|
from pydantic import BaseModel
|
|
|
|
|
|
class TestFirestoreRepository:
|
|
"""Test cases for the base FirestoreRepository"""
|
|
|
|
@pytest.fixture
|
|
def mock_firestore_db(self):
|
|
"""Mock firestore_db for testing"""
|
|
with patch('src.db.repositories.firestore_repository.firestore_db') as mock_db:
|
|
# Make the async methods return coroutines
|
|
mock_db.add_document = AsyncMock()
|
|
mock_db.get_document = AsyncMock()
|
|
mock_db.list_documents = AsyncMock()
|
|
mock_db.update_document = AsyncMock()
|
|
mock_db.delete_document = AsyncMock()
|
|
yield mock_db
|
|
|
|
@pytest.fixture
|
|
def test_model_class(self):
|
|
"""Create a test model class for testing"""
|
|
class TestModel(BaseModel):
|
|
name: str
|
|
value: int
|
|
|
|
return TestModel
|
|
|
|
@pytest.fixture
|
|
def repository(self, test_model_class):
|
|
"""Create a FirestoreRepository instance for testing"""
|
|
return FirestoreRepository("test_collection", test_model_class)
|
|
|
|
def test_init(self, repository, test_model_class):
|
|
"""Test repository initialization"""
|
|
assert repository.collection_name == "test_collection"
|
|
assert repository.model_class == test_model_class
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_create(self, repository, test_model_class, mock_firestore_db):
|
|
"""Test creating a new document"""
|
|
# Setup
|
|
test_model = test_model_class(name="Test", value=123)
|
|
mock_firestore_db.add_document.return_value = "generated_id"
|
|
mock_firestore_db.get_document.return_value = {
|
|
"name": "Test",
|
|
"value": 123,
|
|
"_id": "generated_id"
|
|
}
|
|
|
|
# Execute
|
|
result = await repository.create(test_model)
|
|
|
|
# Assert
|
|
assert isinstance(result, test_model_class)
|
|
assert result.name == "Test"
|
|
assert result.value == 123
|
|
mock_firestore_db.add_document.assert_called_once_with(
|
|
"test_collection",
|
|
{"name": "Test", "value": 123}
|
|
)
|
|
mock_firestore_db.get_document.assert_called_once_with(
|
|
"test_collection",
|
|
"generated_id"
|
|
)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_by_id_found(self, repository, test_model_class, mock_firestore_db):
|
|
"""Test getting a document by ID when it exists"""
|
|
# Setup
|
|
mock_firestore_db.get_document.return_value = {
|
|
"name": "Test",
|
|
"value": 123,
|
|
"_id": "test_id"
|
|
}
|
|
|
|
# Execute
|
|
result = await repository.get_by_id("test_id")
|
|
|
|
# Assert
|
|
assert isinstance(result, test_model_class)
|
|
assert result.name == "Test"
|
|
assert result.value == 123
|
|
mock_firestore_db.get_document.assert_called_once_with("test_collection", "test_id")
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_by_id_not_found(self, repository, mock_firestore_db):
|
|
"""Test getting a document by ID when it doesn't exist"""
|
|
# Setup
|
|
mock_firestore_db.get_document.return_value = None
|
|
|
|
# Execute
|
|
result = await repository.get_by_id("nonexistent_id")
|
|
|
|
# Assert
|
|
assert result is None
|
|
mock_firestore_db.get_document.assert_called_once_with("test_collection", "nonexistent_id")
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_all(self, repository, test_model_class, mock_firestore_db):
|
|
"""Test getting all documents"""
|
|
# Setup
|
|
mock_firestore_db.list_documents.return_value = [
|
|
{"name": "Test1", "value": 123, "_id": "id1"},
|
|
{"name": "Test2", "value": 456, "_id": "id2"}
|
|
]
|
|
|
|
# Execute
|
|
result = await repository.get_all()
|
|
|
|
# Assert
|
|
assert len(result) == 2
|
|
assert all(isinstance(item, test_model_class) for item in result)
|
|
assert result[0].name == "Test1"
|
|
assert result[1].name == "Test2"
|
|
mock_firestore_db.list_documents.assert_called_once_with("test_collection")
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_update_success(self, repository, test_model_class, mock_firestore_db):
|
|
"""Test updating a document successfully"""
|
|
# Setup
|
|
update_data = {"name": "Updated", "value": 999}
|
|
mock_firestore_db.update_document.return_value = True
|
|
mock_firestore_db.get_document.return_value = {
|
|
"name": "Updated",
|
|
"value": 999,
|
|
"_id": "test_id"
|
|
}
|
|
|
|
# Execute
|
|
result = await repository.update("test_id", update_data)
|
|
|
|
# Assert
|
|
assert isinstance(result, test_model_class)
|
|
assert result.name == "Updated"
|
|
assert result.value == 999
|
|
mock_firestore_db.update_document.assert_called_once_with(
|
|
"test_collection",
|
|
"test_id",
|
|
update_data
|
|
)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_update_failure(self, repository, mock_firestore_db):
|
|
"""Test updating a document that doesn't exist"""
|
|
# Setup
|
|
update_data = {"name": "Updated"}
|
|
mock_firestore_db.update_document.return_value = False
|
|
|
|
# Execute
|
|
result = await repository.update("nonexistent_id", update_data)
|
|
|
|
# Assert
|
|
assert result is None
|
|
mock_firestore_db.update_document.assert_called_once_with(
|
|
"test_collection",
|
|
"nonexistent_id",
|
|
update_data
|
|
)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_delete_success(self, repository, mock_firestore_db):
|
|
"""Test deleting a document successfully"""
|
|
# Setup
|
|
mock_firestore_db.delete_document.return_value = True
|
|
|
|
# Execute
|
|
result = await repository.delete("test_id")
|
|
|
|
# Assert
|
|
assert result is True
|
|
mock_firestore_db.delete_document.assert_called_once_with("test_collection", "test_id")
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_delete_failure(self, repository, mock_firestore_db):
|
|
"""Test deleting a document that doesn't exist"""
|
|
# Setup
|
|
mock_firestore_db.delete_document.return_value = False
|
|
|
|
# Execute
|
|
result = await repository.delete("nonexistent_id")
|
|
|
|
# Assert
|
|
assert result is False
|
|
mock_firestore_db.delete_document.assert_called_once_with("test_collection", "nonexistent_id")
|
|
|
|
|
|
class TestFirestoreTeamRepository:
|
|
"""Test cases for FirestoreTeamRepository"""
|
|
|
|
@pytest.fixture
|
|
def repository(self):
|
|
"""Create a FirestoreTeamRepository instance for testing"""
|
|
return FirestoreTeamRepository()
|
|
|
|
def test_init(self, repository):
|
|
"""Test repository initialization"""
|
|
assert repository.collection_name == "teams"
|
|
assert repository.model_class == TeamModel
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_by_id(self, repository):
|
|
"""Test getting team by ID"""
|
|
with patch.object(repository.__class__.__bases__[0], 'get_by_id') as mock_get_by_id:
|
|
mock_get_by_id.return_value = Mock()
|
|
|
|
await repository.get_by_id("team_id")
|
|
|
|
mock_get_by_id.assert_called_once_with("team_id")
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_update(self, repository):
|
|
"""Test updating team"""
|
|
with patch.object(repository.__class__.__bases__[0], 'update') as mock_update:
|
|
mock_update.return_value = Mock()
|
|
|
|
await repository.update("team_id", {"name": "Updated Team"})
|
|
|
|
mock_update.assert_called_once_with("team_id", {"name": "Updated Team"})
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_delete(self, repository):
|
|
"""Test deleting team"""
|
|
with patch.object(repository.__class__.__bases__[0], 'delete') as mock_delete:
|
|
mock_delete.return_value = True
|
|
|
|
result = await repository.delete("team_id")
|
|
|
|
assert result is True
|
|
mock_delete.assert_called_once_with("team_id")
|
|
|
|
|
|
class TestFirestoreUserRepository:
|
|
"""Test cases for FirestoreUserRepository"""
|
|
|
|
@pytest.fixture
|
|
def repository(self):
|
|
"""Create a FirestoreUserRepository instance for testing"""
|
|
return FirestoreUserRepository()
|
|
|
|
def test_init(self, repository):
|
|
"""Test repository initialization"""
|
|
assert repository.collection_name == "users"
|
|
assert repository.model_class == UserModel
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_by_email(self, repository):
|
|
"""Test getting user by email"""
|
|
mock_users = [
|
|
Mock(email="test1@example.com"),
|
|
Mock(email="test2@example.com"),
|
|
Mock(email="target@example.com")
|
|
]
|
|
|
|
with patch.object(repository, 'get_all') as mock_get_all:
|
|
mock_get_all.return_value = mock_users
|
|
|
|
result = await repository.get_by_email("target@example.com")
|
|
|
|
assert result == mock_users[2]
|
|
mock_get_all.assert_called_once()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_by_email_not_found(self, repository):
|
|
"""Test getting user by email when not found"""
|
|
mock_users = [
|
|
Mock(email="test1@example.com"),
|
|
Mock(email="test2@example.com")
|
|
]
|
|
|
|
with patch.object(repository, 'get_all') as mock_get_all:
|
|
mock_get_all.return_value = mock_users
|
|
|
|
result = await repository.get_by_email("notfound@example.com")
|
|
|
|
assert result is None
|
|
mock_get_all.assert_called_once()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_by_team_id(self, repository):
|
|
"""Test getting users by team ID"""
|
|
mock_users = [
|
|
Mock(team_id="team1"),
|
|
Mock(team_id="team2"),
|
|
Mock(team_id="team1")
|
|
]
|
|
|
|
with patch.object(repository, 'get_all') as mock_get_all:
|
|
mock_get_all.return_value = mock_users
|
|
|
|
result = await repository.get_by_team_id("team1")
|
|
|
|
assert len(result) == 2
|
|
assert result[0] == mock_users[0]
|
|
assert result[1] == mock_users[2]
|
|
mock_get_all.assert_called_once()
|
|
|
|
|
|
class TestFirestoreApiKeyRepository:
|
|
"""Test cases for FirestoreApiKeyRepository"""
|
|
|
|
@pytest.fixture
|
|
def repository(self):
|
|
"""Create a FirestoreApiKeyRepository instance for testing"""
|
|
return FirestoreApiKeyRepository()
|
|
|
|
def test_init(self, repository):
|
|
"""Test repository initialization"""
|
|
assert repository.collection_name == "api_keys"
|
|
assert repository.model_class == ApiKeyModel
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_by_key_hash(self, repository):
|
|
"""Test getting API key by hash"""
|
|
mock_api_keys = [
|
|
Mock(key_hash="hash1"),
|
|
Mock(key_hash="hash2"),
|
|
Mock(key_hash="target_hash")
|
|
]
|
|
|
|
with patch.object(repository, 'get_all') as mock_get_all:
|
|
mock_get_all.return_value = mock_api_keys
|
|
|
|
result = await repository.get_by_key_hash("target_hash")
|
|
|
|
assert result == mock_api_keys[2]
|
|
mock_get_all.assert_called_once()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_by_user_id(self, repository):
|
|
"""Test getting API keys by user ID"""
|
|
mock_api_keys = [
|
|
Mock(user_id="user1"),
|
|
Mock(user_id="user2"),
|
|
Mock(user_id="user1")
|
|
]
|
|
|
|
with patch.object(repository, 'get_all') as mock_get_all:
|
|
mock_get_all.return_value = mock_api_keys
|
|
|
|
result = await repository.get_by_user_id("user1")
|
|
|
|
assert len(result) == 2
|
|
assert result[0] == mock_api_keys[0]
|
|
assert result[1] == mock_api_keys[2]
|
|
mock_get_all.assert_called_once()
|
|
|
|
|
|
class TestFirestoreImageRepository:
|
|
"""Test cases for FirestoreImageRepository"""
|
|
|
|
@pytest.fixture
|
|
def repository(self):
|
|
"""Create a FirestoreImageRepository instance for testing"""
|
|
return FirestoreImageRepository()
|
|
|
|
def test_init(self, repository):
|
|
"""Test repository initialization"""
|
|
assert repository.collection_name == "images"
|
|
assert repository.model_class == ImageModel
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_by_team_id(self, repository):
|
|
"""Test getting images by team ID"""
|
|
mock_images = [
|
|
Mock(team_id="team1"),
|
|
Mock(team_id="team2"),
|
|
Mock(team_id="team1")
|
|
]
|
|
|
|
with patch.object(repository, 'get_all') as mock_get_all:
|
|
mock_get_all.return_value = mock_images
|
|
|
|
result = await repository.get_by_team_id("team1")
|
|
|
|
assert len(result) == 2
|
|
assert result[0] == mock_images[0]
|
|
assert result[1] == mock_images[2]
|
|
mock_get_all.assert_called_once()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_by_uploader_id(self, repository):
|
|
"""Test getting images by uploader ID"""
|
|
mock_images = [
|
|
Mock(uploader_id="user1"),
|
|
Mock(uploader_id="user2"),
|
|
Mock(uploader_id="user1")
|
|
]
|
|
|
|
with patch.object(repository, 'get_all') as mock_get_all:
|
|
mock_get_all.return_value = mock_images
|
|
|
|
result = await repository.get_by_uploader_id("user1")
|
|
|
|
assert len(result) == 2
|
|
assert result[0] == mock_images[0]
|
|
assert result[1] == mock_images[2]
|
|
mock_get_all.assert_called_once()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_by_tag(self, repository):
|
|
"""Test getting images by tag"""
|
|
mock_images = [
|
|
Mock(tags=["tag1", "tag2"]),
|
|
Mock(tags=["tag3"]),
|
|
Mock(tags=["tag1", "tag4"])
|
|
]
|
|
|
|
with patch.object(repository, 'get_all') as mock_get_all:
|
|
mock_get_all.return_value = mock_images
|
|
|
|
result = await repository.get_by_tag("tag1")
|
|
|
|
assert len(result) == 2
|
|
assert result[0] == mock_images[0]
|
|
assert result[1] == mock_images[2]
|
|
mock_get_all.assert_called_once() |