image_management_api/tests/api/test_images_pubsub.py
2025-05-24 14:26:09 +02:00

408 lines
15 KiB
Python

import pytest
from unittest.mock import Mock, patch, AsyncMock
from fastapi.testclient import TestClient
from fastapi import status
import io
from PIL import Image
from src.api.v1.images import router
from src.models.user import UserModel
from src.models.team import PyObjectId
class TestImageUploadWithPubSub:
"""Test cases for image upload with Pub/Sub integration"""
@pytest.fixture
def mock_current_user(self):
"""Mock current user"""
user = UserModel(
id=PyObjectId(),
email="test@example.com",
team_id=PyObjectId(),
is_active=True
)
return user
@pytest.fixture
def test_image_file(self):
"""Create a test image file"""
# Create a simple test image
img = Image.new('RGB', (100, 100), color='red')
img_bytes = io.BytesIO()
img.save(img_bytes, format='JPEG')
img_bytes.seek(0)
return img_bytes
@pytest.fixture
def mock_storage_service(self):
"""Mock storage service"""
with patch('src.api.v1.images.storage_service') as mock_service:
mock_service.upload_file = AsyncMock(return_value=(
"bucket/team-123/image.jpg",
"image/jpeg",
1024,
{"width": 100, "height": 100}
))
yield mock_service
@pytest.fixture
def mock_image_repository(self):
"""Mock image repository"""
with patch('src.api.v1.images.image_repository') as mock_repo:
mock_image = Mock()
mock_image.id = PyObjectId()
mock_image.filename = "test.jpg"
mock_image.original_filename = "test.jpg"
mock_image.file_size = 1024
mock_image.content_type = "image/jpeg"
mock_image.storage_path = "bucket/team-123/image.jpg"
mock_image.team_id = PyObjectId()
mock_image.uploader_id = PyObjectId()
mock_image.upload_date = "2023-01-01T00:00:00"
mock_image.description = None
mock_image.tags = []
mock_image.metadata = {}
mock_image.has_embedding = False
mock_image.collection_id = None
mock_repo.create = AsyncMock(return_value=mock_image)
yield mock_repo
@pytest.fixture
def mock_pubsub_service(self):
"""Mock Pub/Sub service"""
with patch('src.api.v1.images.pubsub_service') as mock_service:
mock_service.publish_image_processing_task = AsyncMock(return_value=True)
yield mock_service
@pytest.mark.asyncio
async def test_upload_image_publishes_to_pubsub(
self,
mock_current_user,
test_image_file,
mock_storage_service,
mock_image_repository,
mock_pubsub_service
):
"""Test that image upload publishes a task to Pub/Sub"""
with patch('src.api.v1.images.get_current_user', return_value=mock_current_user):
from src.api.v1.images import upload_image
from fastapi import UploadFile
# Create upload file
upload_file = UploadFile(
filename="test.jpg",
file=test_image_file,
content_type="image/jpeg"
)
# Mock request
request = Mock()
request.url.path = "/api/v1/images"
request.method = "POST"
# Call the upload function
response = await upload_image(
request=request,
file=upload_file,
current_user=mock_current_user
)
# Verify Pub/Sub task was published
mock_pubsub_service.publish_image_processing_task.assert_called_once()
# Check the call arguments
call_args = mock_pubsub_service.publish_image_processing_task.call_args
assert call_args[1]['image_id'] == str(mock_image_repository.create.return_value.id)
assert call_args[1]['storage_path'] == "bucket/team-123/image.jpg"
assert call_args[1]['team_id'] == str(mock_current_user.team_id)
# Verify response
assert response.filename == "test.jpg"
assert response.content_type == "image/jpeg"
@pytest.mark.asyncio
async def test_upload_image_pubsub_failure_continues(
self,
mock_current_user,
test_image_file,
mock_storage_service,
mock_image_repository,
mock_pubsub_service
):
"""Test that upload continues even if Pub/Sub publishing fails"""
# Mock Pub/Sub failure
mock_pubsub_service.publish_image_processing_task = AsyncMock(return_value=False)
with patch('src.api.v1.images.get_current_user', return_value=mock_current_user):
from src.api.v1.images import upload_image
from fastapi import UploadFile
# Create upload file
upload_file = UploadFile(
filename="test.jpg",
file=test_image_file,
content_type="image/jpeg"
)
# Mock request
request = Mock()
request.url.path = "/api/v1/images"
request.method = "POST"
# Call the upload function - should not raise exception
response = await upload_image(
request=request,
file=upload_file,
current_user=mock_current_user
)
# Verify Pub/Sub task was attempted
mock_pubsub_service.publish_image_processing_task.assert_called_once()
# Verify response is still successful
assert response.filename == "test.jpg"
assert response.content_type == "image/jpeg"
@pytest.mark.asyncio
async def test_upload_image_pubsub_exception_continues(
self,
mock_current_user,
test_image_file,
mock_storage_service,
mock_image_repository,
mock_pubsub_service
):
"""Test that upload continues even if Pub/Sub publishing raises exception"""
# Mock Pub/Sub exception
mock_pubsub_service.publish_image_processing_task = AsyncMock(
side_effect=Exception("Pub/Sub error")
)
with patch('src.api.v1.images.get_current_user', return_value=mock_current_user):
from src.api.v1.images import upload_image
from fastapi import UploadFile
# Create upload file
upload_file = UploadFile(
filename="test.jpg",
file=test_image_file,
content_type="image/jpeg"
)
# Mock request
request = Mock()
request.url.path = "/api/v1/images"
request.method = "POST"
# Call the upload function - should not raise exception
response = await upload_image(
request=request,
file=upload_file,
current_user=mock_current_user
)
# Verify Pub/Sub task was attempted
mock_pubsub_service.publish_image_processing_task.assert_called_once()
# Verify response is still successful
assert response.filename == "test.jpg"
assert response.content_type == "image/jpeg"
@pytest.mark.asyncio
async def test_upload_image_with_description_and_tags(
self,
mock_current_user,
test_image_file,
mock_storage_service,
mock_image_repository,
mock_pubsub_service
):
"""Test image upload with description and tags"""
with patch('src.api.v1.images.get_current_user', return_value=mock_current_user):
from src.api.v1.images import upload_image
from fastapi import UploadFile
# Create upload file
upload_file = UploadFile(
filename="test.jpg",
file=test_image_file,
content_type="image/jpeg"
)
# Mock request
request = Mock()
request.url.path = "/api/v1/images"
request.method = "POST"
# Call the upload function with description and tags
response = await upload_image(
request=request,
file=upload_file,
description="Test image",
tags="nature, landscape, outdoor",
current_user=mock_current_user
)
# Verify Pub/Sub task was published
mock_pubsub_service.publish_image_processing_task.assert_called_once()
# Verify image was created with correct data
mock_image_repository.create.assert_called_once()
created_image_data = mock_image_repository.create.call_args[0][0]
assert created_image_data.description == "Test image"
assert created_image_data.tags == ["nature", "landscape", "outdoor"]
@pytest.mark.asyncio
async def test_upload_image_with_collection_id(
self,
mock_current_user,
test_image_file,
mock_storage_service,
mock_image_repository,
mock_pubsub_service
):
"""Test image upload with collection ID"""
with patch('src.api.v1.images.get_current_user', return_value=mock_current_user):
from src.api.v1.images import upload_image
from fastapi import UploadFile
# Create upload file
upload_file = UploadFile(
filename="test.jpg",
file=test_image_file,
content_type="image/jpeg"
)
# Mock request
request = Mock()
request.url.path = "/api/v1/images"
request.method = "POST"
collection_id = str(PyObjectId())
# Call the upload function with collection ID
response = await upload_image(
request=request,
file=upload_file,
collection_id=collection_id,
current_user=mock_current_user
)
# Verify Pub/Sub task was published
mock_pubsub_service.publish_image_processing_task.assert_called_once()
# Verify image was created with collection ID
mock_image_repository.create.assert_called_once()
created_image_data = mock_image_repository.create.call_args[0][0]
assert str(created_image_data.collection_id) == collection_id
class TestImageModelUpdates:
"""Test cases for updated image model with embedding fields"""
def test_image_model_has_embedding_fields(self):
"""Test that ImageModel has the new embedding fields"""
from src.models.image import ImageModel
# Create an image model instance
image = ImageModel(
filename="test.jpg",
original_filename="test.jpg",
file_size=1024,
content_type="image/jpeg",
storage_path="bucket/path/image.jpg",
team_id=PyObjectId(),
uploader_id=PyObjectId()
)
# Check that embedding fields exist with default values
assert hasattr(image, 'embedding_status')
assert hasattr(image, 'embedding_error')
assert hasattr(image, 'embedding_retry_count')
assert hasattr(image, 'embedding_last_attempt')
# Check default values
assert image.embedding_status == "pending"
assert image.embedding_error is None
assert image.embedding_retry_count == 0
assert image.embedding_last_attempt is None
assert image.has_embedding is False
def test_image_model_embedding_fields_can_be_set(self):
"""Test that embedding fields can be set"""
from src.models.image import ImageModel
from datetime import datetime
now = datetime.utcnow()
# Create an image model instance with embedding fields
image = ImageModel(
filename="test.jpg",
original_filename="test.jpg",
file_size=1024,
content_type="image/jpeg",
storage_path="bucket/path/image.jpg",
team_id=PyObjectId(),
uploader_id=PyObjectId(),
embedding_status="processing",
embedding_error="Test error",
embedding_retry_count=2,
embedding_last_attempt=now,
has_embedding=True
)
# Check that values were set correctly
assert image.embedding_status == "processing"
assert image.embedding_error == "Test error"
assert image.embedding_retry_count == 2
assert image.embedding_last_attempt == now
assert image.has_embedding is True
class TestPubSubServiceIntegration:
"""Integration tests for Pub/Sub service with image API"""
@pytest.mark.asyncio
async def test_end_to_end_image_upload_flow(self):
"""Test the complete flow from image upload to Pub/Sub message"""
# This would be an integration test that verifies the entire flow
# from API call to Pub/Sub message publication
# Mock all dependencies
with patch('src.api.v1.images.storage_service') as mock_storage, \
patch('src.api.v1.images.image_repository') as mock_repo, \
patch('src.api.v1.images.pubsub_service') as mock_pubsub, \
patch('src.api.v1.images.get_current_user') as mock_auth:
# Setup mocks
mock_user = Mock()
mock_user.id = PyObjectId()
mock_user.team_id = PyObjectId()
mock_auth.return_value = mock_user
mock_storage.upload_file = AsyncMock(return_value=(
"bucket/team/image.jpg", "image/jpeg", 1024, {}
))
mock_image = Mock()
mock_image.id = PyObjectId()
mock_repo.create = AsyncMock(return_value=mock_image)
mock_pubsub.publish_image_processing_task = AsyncMock(return_value=True)
# Create test client
from fastapi import FastAPI
app = FastAPI()
app.include_router(router)
# This would test the actual HTTP endpoint
# but requires more complex setup for file uploads
# For now, verify the mocks would be called correctly
assert mock_storage.upload_file is not None
assert mock_repo.create is not None
assert mock_pubsub.publish_image_processing_task is not None