import pytest import json from unittest.mock import Mock, patch, AsyncMock from google.cloud import pubsub_v1 from google.api_core.exceptions import GoogleAPIError from src.services.pubsub_service import PubSubService, pubsub_service class TestPubSubService: """Test cases for PubSubService""" @pytest.fixture def mock_publisher(self): """Mock Pub/Sub publisher client""" with patch('src.services.pubsub_service.pubsub_v1.PublisherClient') as mock_client: mock_instance = Mock() mock_client.return_value = mock_instance mock_instance.topic_path.return_value = "projects/test-project/topics/test-topic" yield mock_instance @pytest.fixture def pubsub_service_instance(self, mock_publisher): """Create a PubSubService instance with mocked dependencies""" with patch('src.services.pubsub_service.settings') as mock_settings: mock_settings.FIRESTORE_PROJECT_ID = "test-project" mock_settings.PUBSUB_TOPIC = "test-topic" service = PubSubService() return service def test_init_success(self, mock_publisher): """Test successful initialization of PubSubService""" with patch('src.services.pubsub_service.settings') as mock_settings: mock_settings.FIRESTORE_PROJECT_ID = "test-project" mock_settings.PUBSUB_TOPIC = "test-topic" service = PubSubService() assert service.project_id == "test-project" assert service.topic_name == "test-topic" assert service.publisher is not None assert service._topic_path == "projects/test-project/topics/test-topic" def test_init_missing_config(self): """Test initialization with missing configuration""" with patch('src.services.pubsub_service.settings') as mock_settings: mock_settings.FIRESTORE_PROJECT_ID = "" mock_settings.PUBSUB_TOPIC = "" service = PubSubService() assert service.publisher is None assert service._topic_path is None def test_init_client_error(self): """Test initialization with client creation error""" with patch('src.services.pubsub_service.settings') as mock_settings: mock_settings.FIRESTORE_PROJECT_ID = "test-project" mock_settings.PUBSUB_TOPIC = "test-topic" with patch('src.services.pubsub_service.pubsub_v1.PublisherClient', side_effect=Exception("Client error")): service = PubSubService() assert service.publisher is None @pytest.mark.asyncio async def test_publish_image_processing_task_success(self, pubsub_service_instance): """Test successful publishing of image processing task""" # Mock the future result mock_future = Mock() mock_future.result.return_value = "message-id-123" pubsub_service_instance.publisher.publish.return_value = mock_future result = await pubsub_service_instance.publish_image_processing_task( image_id="test-image-id", storage_path="bucket/path/to/image.jpg", team_id="test-team-id", retry_count=0 ) assert result is True # Verify publish was called with correct parameters pubsub_service_instance.publisher.publish.assert_called_once() call_args = pubsub_service_instance.publisher.publish.call_args # Check topic path assert call_args[0][0] == pubsub_service_instance._topic_path # Check message data message_data = json.loads(call_args[0][1].decode('utf-8')) assert message_data["image_id"] == "test-image-id" assert message_data["storage_path"] == "bucket/path/to/image.jpg" assert message_data["team_id"] == "test-team-id" assert message_data["retry_count"] == 0 assert message_data["task_type"] == "generate_embeddings" # Check attributes attributes = call_args[1] assert attributes["task_type"] == "generate_embeddings" assert attributes["image_id"] == "test-image-id" assert attributes["retry_count"] == "0" @pytest.mark.asyncio async def test_publish_image_processing_task_no_client(self): """Test publishing when client is not initialized""" service = PubSubService() service.publisher = None service._topic_path = None result = await service.publish_image_processing_task( image_id="test-image-id", storage_path="bucket/path/to/image.jpg", team_id="test-team-id" ) assert result is False @pytest.mark.asyncio async def test_publish_image_processing_task_publish_error(self, pubsub_service_instance): """Test publishing with publish error""" pubsub_service_instance.publisher.publish.side_effect = GoogleAPIError("Publish failed") result = await pubsub_service_instance.publish_image_processing_task( image_id="test-image-id", storage_path="bucket/path/to/image.jpg", team_id="test-team-id" ) assert result is False @pytest.mark.asyncio async def test_publish_image_processing_task_future_timeout(self, pubsub_service_instance): """Test publishing with future timeout""" mock_future = Mock() mock_future.result.side_effect = Exception("Timeout") pubsub_service_instance.publisher.publish.return_value = mock_future result = await pubsub_service_instance.publish_image_processing_task( image_id="test-image-id", storage_path="bucket/path/to/image.jpg", team_id="test-team-id" ) assert result is False @pytest.mark.asyncio async def test_publish_retry_task_success(self, pubsub_service_instance): """Test successful publishing of retry task""" # Mock the future result mock_future = Mock() mock_future.result.return_value = "message-id-123" pubsub_service_instance.publisher.publish.return_value = mock_future result = await pubsub_service_instance.publish_retry_task( image_id="test-image-id", storage_path="bucket/path/to/image.jpg", team_id="test-team-id", retry_count=1, error_message="Previous attempt failed" ) assert result is True # Verify publish was called with incremented retry count call_args = pubsub_service_instance.publisher.publish.call_args message_data = json.loads(call_args[0][1].decode('utf-8')) assert message_data["retry_count"] == 2 @pytest.mark.asyncio async def test_publish_retry_task_max_retries(self, pubsub_service_instance): """Test retry task when max retries reached""" result = await pubsub_service_instance.publish_retry_task( image_id="test-image-id", storage_path="bucket/path/to/image.jpg", team_id="test-team-id", retry_count=3, # Max retries reached error_message="Previous attempt failed" ) assert result is False # Verify publish was not called pubsub_service_instance.publisher.publish.assert_not_called() @pytest.mark.asyncio async def test_publish_retry_task_with_retry_count_2(self, pubsub_service_instance): """Test retry task with retry count 2 (should still work)""" # Mock the future result mock_future = Mock() mock_future.result.return_value = "message-id-123" pubsub_service_instance.publisher.publish.return_value = mock_future result = await pubsub_service_instance.publish_retry_task( image_id="test-image-id", storage_path="bucket/path/to/image.jpg", team_id="test-team-id", retry_count=2, error_message="Previous attempt failed" ) assert result is True # Verify publish was called with retry count 3 call_args = pubsub_service_instance.publisher.publish.call_args message_data = json.loads(call_args[0][1].decode('utf-8')) assert message_data["retry_count"] == 3 def test_singleton_service(self): """Test that pubsub_service is properly initialized""" # This tests the module-level singleton assert pubsub_service is not None assert isinstance(pubsub_service, PubSubService) class TestPubSubServiceIntegration: """Integration tests for PubSubService""" @pytest.mark.asyncio async def test_message_format_compatibility(self): """Test that message format is compatible with Cloud Function expectations""" with patch('src.services.pubsub_service.settings') as mock_settings: mock_settings.FIRESTORE_PROJECT_ID = "test-project" mock_settings.PUBSUB_TOPIC = "test-topic" with patch('src.services.pubsub_service.pubsub_v1.PublisherClient') as mock_client: mock_instance = Mock() mock_client.return_value = mock_instance mock_instance.topic_path.return_value = "projects/test-project/topics/test-topic" mock_future = Mock() mock_future.result.return_value = "message-id-123" mock_instance.publish.return_value = mock_future service = PubSubService() await service.publish_image_processing_task( image_id="67890abcdef123456789", storage_path="my-bucket/team-123/image.jpg", team_id="team-123", retry_count=1 ) # Verify the message structure matches what Cloud Function expects call_args = mock_instance.publish.call_args message_data = json.loads(call_args[0][1].decode('utf-8')) # Required fields for Cloud Function required_fields = ["image_id", "storage_path", "team_id", "retry_count", "task_type"] for field in required_fields: assert field in message_data # Verify data types assert isinstance(message_data["image_id"], str) assert isinstance(message_data["storage_path"], str) assert isinstance(message_data["team_id"], str) assert isinstance(message_data["retry_count"], int) assert isinstance(message_data["task_type"], str) # Verify attributes attributes = call_args[1] assert "task_type" in attributes assert "image_id" in attributes assert "retry_count" in attributes