image_management_api/tests/integration/test_cloud_function.py
2025-05-24 14:26:09 +02:00

370 lines
14 KiB
Python

import pytest
import json
import base64
from unittest.mock import Mock, patch, MagicMock
import numpy as np
from datetime import datetime
# Mock the cloud function modules before importing
with patch.dict('sys.modules', {
'functions_framework': Mock(),
'google.cloud.vision': Mock(),
'google.cloud.firestore': Mock(),
'google.cloud.storage': Mock(),
'pinecone': Mock()
}):
# Import the cloud function after mocking
import sys
import os
sys.path.append(os.path.join(os.path.dirname(__file__), '../../deployment/cloud-function'))
# Mock the main module
main_module = Mock()
sys.modules['main'] = main_module
class TestCloudFunctionProcessing:
"""Test cases for Cloud Function image processing logic"""
@pytest.fixture
def mock_cloud_event(self):
"""Create a mock cloud event for testing"""
message_data = {
"image_id": "test-image-123",
"storage_path": "test-bucket/team-456/image.jpg",
"team_id": "team-456",
"retry_count": 0,
"task_type": "generate_embeddings"
}
encoded_data = base64.b64encode(json.dumps(message_data).encode('utf-8')).decode('utf-8')
cloud_event = Mock()
cloud_event.data = {
"message": {
"data": encoded_data
}
}
return cloud_event
@pytest.fixture
def mock_vision_client(self):
"""Mock Google Cloud Vision client"""
with patch('main.vision_client') as mock_client:
yield mock_client
@pytest.fixture
def mock_firestore_client(self):
"""Mock Firestore client"""
with patch('main.firestore_client') as mock_client:
yield mock_client
@pytest.fixture
def mock_storage_client(self):
"""Mock Cloud Storage client"""
with patch('main.storage_client') as mock_client:
yield mock_client
@pytest.fixture
def mock_pinecone_index(self):
"""Mock Pinecone index"""
with patch('main.index') as mock_index:
yield mock_index
def test_message_decoding(self, mock_cloud_event):
"""Test that Pub/Sub message is correctly decoded"""
# This would test the message decoding logic
message_data = base64.b64decode(mock_cloud_event.data["message"]["data"]).decode('utf-8')
message = json.loads(message_data)
assert message["image_id"] == "test-image-123"
assert message["storage_path"] == "test-bucket/team-456/image.jpg"
assert message["team_id"] == "team-456"
assert message["retry_count"] == 0
assert message["task_type"] == "generate_embeddings"
def test_missing_required_fields(self):
"""Test handling of messages with missing required fields"""
# Test with missing image_id
message_data = {
"storage_path": "test-bucket/team-456/image.jpg",
"team_id": "team-456",
"retry_count": 0
}
encoded_data = base64.b64encode(json.dumps(message_data).encode('utf-8')).decode('utf-8')
cloud_event = Mock()
cloud_event.data = {
"message": {
"data": encoded_data
}
}
# The function should handle this gracefully
message = json.loads(base64.b64decode(cloud_event.data["message"]["data"]).decode('utf-8'))
assert message.get('image_id') is None
assert message.get('storage_path') is not None
assert message.get('team_id') is not None
@patch('main.process_image')
@patch('main.update_image_status')
def test_successful_processing_flow(self, mock_update_status, mock_process_image, mock_cloud_event):
"""Test successful image processing flow"""
# Mock successful processing
mock_process_image.return_value = True
# Import and call the function
from main import process_image_embedding
# This would test the main function flow
# Since we can't easily test the actual function due to the decorator,
# we test the logic components
message_data = json.loads(base64.b64decode(mock_cloud_event.data["message"]["data"]).decode('utf-8'))
# Simulate the function logic
image_id = message_data.get('image_id')
storage_path = message_data.get('storage_path')
team_id = message_data.get('team_id')
retry_count = message_data.get('retry_count', 0)
# Update status to processing
mock_update_status.assert_not_called() # Not called yet
# Process image
success = mock_process_image(image_id, storage_path, team_id, retry_count)
assert success is True
@patch('main.process_image')
@patch('main.update_image_status')
def test_failed_processing_flow(self, mock_update_status, mock_process_image, mock_cloud_event):
"""Test failed image processing flow"""
# Mock failed processing
mock_process_image.return_value = False
message_data = json.loads(base64.b64decode(mock_cloud_event.data["message"]["data"]).decode('utf-8'))
# Simulate the function logic
image_id = message_data.get('image_id')
storage_path = message_data.get('storage_path')
team_id = message_data.get('team_id')
retry_count = message_data.get('retry_count', 0)
# Process image
success = mock_process_image(image_id, storage_path, team_id, retry_count)
assert success is False
class TestImageProcessingLogic:
"""Test cases for image processing logic"""
@pytest.fixture
def mock_storage_setup(self):
"""Setup mock storage client and blob"""
with patch('main.storage_client') as mock_storage:
mock_bucket = Mock()
mock_blob = Mock()
mock_storage.bucket.return_value = mock_bucket
mock_bucket.blob.return_value = mock_blob
mock_blob.exists.return_value = True
mock_blob.download_as_bytes.return_value = b"fake_image_data"
yield mock_storage, mock_bucket, mock_blob
@pytest.fixture
def mock_vision_response(self):
"""Mock Vision API response"""
mock_response = Mock()
mock_response.error.message = ""
# Mock object annotations
mock_obj = Mock()
mock_obj.name = "person"
mock_obj.score = 0.95
# Mock bounding box
mock_vertex1 = Mock()
mock_vertex1.x = 0.1
mock_vertex1.y = 0.1
mock_vertex2 = Mock()
mock_vertex2.x = 0.9
mock_vertex2.y = 0.9
mock_obj.bounding_poly.normalized_vertices = [mock_vertex1, Mock(), mock_vertex2, Mock()]
mock_response.localized_object_annotations = [mock_obj]
return mock_response
@pytest.fixture
def mock_label_response(self):
"""Mock Vision API label response"""
mock_response = Mock()
mock_label = Mock()
mock_label.description = "person"
mock_label.score = 0.98
mock_response.label_annotations = [mock_label]
return mock_response
@patch('main.generate_image_embeddings')
@patch('main.update_image_embedding_info')
def test_process_image_success(self, mock_update_embedding, mock_generate_embeddings, mock_storage_setup):
"""Test successful image processing"""
mock_storage, mock_bucket, mock_blob = mock_storage_setup
# Mock successful embedding generation
mock_embeddings = np.random.rand(512).astype(np.float32)
mock_generate_embeddings.return_value = mock_embeddings
# Mock Pinecone index
with patch('main.index') as mock_index:
mock_index.upsert = Mock()
from main import process_image
result = process_image("test-image-123", "test-bucket/path/image.jpg", "team-456", 0)
assert result is True
mock_generate_embeddings.assert_called_once()
mock_index.upsert.assert_called_once()
mock_update_embedding.assert_called_once()
def test_process_image_storage_not_found(self, mock_storage_setup):
"""Test processing when image not found in storage"""
mock_storage, mock_bucket, mock_blob = mock_storage_setup
mock_blob.exists.return_value = False
from main import process_image
result = process_image("test-image-123", "test-bucket/path/image.jpg", "team-456", 0)
assert result is False
@patch('main.generate_image_embeddings')
def test_process_image_embedding_generation_failed(self, mock_generate_embeddings, mock_storage_setup):
"""Test processing when embedding generation fails"""
mock_storage, mock_bucket, mock_blob = mock_storage_setup
mock_generate_embeddings.return_value = None
from main import process_image
result = process_image("test-image-123", "test-bucket/path/image.jpg", "team-456", 0)
assert result is False
@patch('main.vision_client')
def test_generate_image_embeddings_success(self, mock_vision_client, mock_vision_response, mock_label_response):
"""Test successful embedding generation"""
mock_vision_client.object_localization.return_value = mock_vision_response
mock_vision_client.label_detection.return_value = mock_label_response
from main import generate_image_embeddings
embeddings = generate_image_embeddings(b"fake_image_data")
assert embeddings is not None
assert isinstance(embeddings, np.ndarray)
assert embeddings.shape == (512,)
assert embeddings.dtype == np.float32
@patch('main.vision_client')
def test_generate_image_embeddings_vision_error(self, mock_vision_client):
"""Test embedding generation with Vision API error"""
mock_response = Mock()
mock_response.error.message = "Vision API error"
mock_vision_client.object_localization.return_value = mock_response
from main import generate_image_embeddings
embeddings = generate_image_embeddings(b"fake_image_data")
assert embeddings is None
@patch('main.vision_client')
def test_generate_image_embeddings_exception(self, mock_vision_client):
"""Test embedding generation with exception"""
mock_vision_client.object_localization.side_effect = Exception("API error")
from main import generate_image_embeddings
embeddings = generate_image_embeddings(b"fake_image_data")
assert embeddings is None
class TestFirestoreUpdates:
"""Test cases for Firestore update operations"""
@pytest.fixture
def mock_firestore_doc(self):
"""Mock Firestore document reference"""
with patch('main.firestore_client') as mock_client:
mock_doc = Mock()
mock_client.collection.return_value.document.return_value = mock_doc
yield mock_doc
def test_update_image_status_processing(self, mock_firestore_doc):
"""Test updating image status to processing"""
from main import update_image_status
update_image_status("test-image-123", "processing", 1)
mock_firestore_doc.update.assert_called_once()
call_args = mock_firestore_doc.update.call_args[0][0]
assert call_args["embedding_status"] == "processing"
assert call_args["embedding_retry_count"] == 1
assert "embedding_last_attempt" in call_args
def test_update_image_status_success(self, mock_firestore_doc):
"""Test updating image status to success"""
from main import update_image_status
update_image_status("test-image-123", "success", 2)
mock_firestore_doc.update.assert_called_once()
call_args = mock_firestore_doc.update.call_args[0][0]
assert call_args["embedding_status"] == "success"
assert call_args["has_embedding"] is True
assert call_args["embedding_error"] is None
def test_update_image_status_failed(self, mock_firestore_doc):
"""Test updating image status to failed"""
from main import update_image_status
update_image_status("test-image-123", "failed", 3, "Processing error")
mock_firestore_doc.update.assert_called_once()
call_args = mock_firestore_doc.update.call_args[0][0]
assert call_args["embedding_status"] == "failed"
assert call_args["embedding_error"] == "Processing error"
def test_update_image_embedding_info(self, mock_firestore_doc):
"""Test updating image embedding information"""
from main import update_image_embedding_info
update_image_embedding_info("test-image-123", "team-456_test-image-123", "google-vision-v1")
mock_firestore_doc.update.assert_called_once()
call_args = mock_firestore_doc.update.call_args[0][0]
assert call_args["embedding_id"] == "team-456_test-image-123"
assert call_args["embedding_model"] == "google-vision-v1"
assert call_args["has_embedding"] is True
def test_update_image_status_firestore_error(self, mock_firestore_doc):
"""Test handling Firestore update errors"""
mock_firestore_doc.update.side_effect = Exception("Firestore error")
from main import update_image_status
# Should not raise exception, just log error
update_image_status("test-image-123", "failed", 1, "Test error")
mock_firestore_doc.update.assert_called_once()