370 lines
14 KiB
Python
370 lines
14 KiB
Python
import pytest
|
|
import json
|
|
import base64
|
|
from unittest.mock import Mock, patch, MagicMock
|
|
import numpy as np
|
|
from datetime import datetime
|
|
|
|
# Mock the cloud function modules before importing
|
|
with patch.dict('sys.modules', {
|
|
'functions_framework': Mock(),
|
|
'google.cloud.vision': Mock(),
|
|
'google.cloud.firestore': Mock(),
|
|
'google.cloud.storage': Mock(),
|
|
'pinecone': Mock()
|
|
}):
|
|
# Import the cloud function after mocking
|
|
import sys
|
|
import os
|
|
sys.path.append(os.path.join(os.path.dirname(__file__), '../../deployment/cloud-function'))
|
|
|
|
# Mock the main module
|
|
main_module = Mock()
|
|
sys.modules['main'] = main_module
|
|
|
|
|
|
class TestCloudFunctionProcessing:
|
|
"""Test cases for Cloud Function image processing logic"""
|
|
|
|
@pytest.fixture
|
|
def mock_cloud_event(self):
|
|
"""Create a mock cloud event for testing"""
|
|
message_data = {
|
|
"image_id": "test-image-123",
|
|
"storage_path": "test-bucket/team-456/image.jpg",
|
|
"team_id": "team-456",
|
|
"retry_count": 0,
|
|
"task_type": "generate_embeddings"
|
|
}
|
|
|
|
encoded_data = base64.b64encode(json.dumps(message_data).encode('utf-8')).decode('utf-8')
|
|
|
|
cloud_event = Mock()
|
|
cloud_event.data = {
|
|
"message": {
|
|
"data": encoded_data
|
|
}
|
|
}
|
|
return cloud_event
|
|
|
|
@pytest.fixture
|
|
def mock_vision_client(self):
|
|
"""Mock Google Cloud Vision client"""
|
|
with patch('main.vision_client') as mock_client:
|
|
yield mock_client
|
|
|
|
@pytest.fixture
|
|
def mock_firestore_client(self):
|
|
"""Mock Firestore client"""
|
|
with patch('main.firestore_client') as mock_client:
|
|
yield mock_client
|
|
|
|
@pytest.fixture
|
|
def mock_storage_client(self):
|
|
"""Mock Cloud Storage client"""
|
|
with patch('main.storage_client') as mock_client:
|
|
yield mock_client
|
|
|
|
@pytest.fixture
|
|
def mock_pinecone_index(self):
|
|
"""Mock Pinecone index"""
|
|
with patch('main.index') as mock_index:
|
|
yield mock_index
|
|
|
|
def test_message_decoding(self, mock_cloud_event):
|
|
"""Test that Pub/Sub message is correctly decoded"""
|
|
# This would test the message decoding logic
|
|
message_data = base64.b64decode(mock_cloud_event.data["message"]["data"]).decode('utf-8')
|
|
message = json.loads(message_data)
|
|
|
|
assert message["image_id"] == "test-image-123"
|
|
assert message["storage_path"] == "test-bucket/team-456/image.jpg"
|
|
assert message["team_id"] == "team-456"
|
|
assert message["retry_count"] == 0
|
|
assert message["task_type"] == "generate_embeddings"
|
|
|
|
def test_missing_required_fields(self):
|
|
"""Test handling of messages with missing required fields"""
|
|
# Test with missing image_id
|
|
message_data = {
|
|
"storage_path": "test-bucket/team-456/image.jpg",
|
|
"team_id": "team-456",
|
|
"retry_count": 0
|
|
}
|
|
|
|
encoded_data = base64.b64encode(json.dumps(message_data).encode('utf-8')).decode('utf-8')
|
|
|
|
cloud_event = Mock()
|
|
cloud_event.data = {
|
|
"message": {
|
|
"data": encoded_data
|
|
}
|
|
}
|
|
|
|
# The function should handle this gracefully
|
|
message = json.loads(base64.b64decode(cloud_event.data["message"]["data"]).decode('utf-8'))
|
|
|
|
assert message.get('image_id') is None
|
|
assert message.get('storage_path') is not None
|
|
assert message.get('team_id') is not None
|
|
|
|
@patch('main.process_image')
|
|
@patch('main.update_image_status')
|
|
def test_successful_processing_flow(self, mock_update_status, mock_process_image, mock_cloud_event):
|
|
"""Test successful image processing flow"""
|
|
# Mock successful processing
|
|
mock_process_image.return_value = True
|
|
|
|
# Import and call the function
|
|
from main import process_image_embedding
|
|
|
|
# This would test the main function flow
|
|
# Since we can't easily test the actual function due to the decorator,
|
|
# we test the logic components
|
|
|
|
message_data = json.loads(base64.b64decode(mock_cloud_event.data["message"]["data"]).decode('utf-8'))
|
|
|
|
# Simulate the function logic
|
|
image_id = message_data.get('image_id')
|
|
storage_path = message_data.get('storage_path')
|
|
team_id = message_data.get('team_id')
|
|
retry_count = message_data.get('retry_count', 0)
|
|
|
|
# Update status to processing
|
|
mock_update_status.assert_not_called() # Not called yet
|
|
|
|
# Process image
|
|
success = mock_process_image(image_id, storage_path, team_id, retry_count)
|
|
assert success is True
|
|
|
|
@patch('main.process_image')
|
|
@patch('main.update_image_status')
|
|
def test_failed_processing_flow(self, mock_update_status, mock_process_image, mock_cloud_event):
|
|
"""Test failed image processing flow"""
|
|
# Mock failed processing
|
|
mock_process_image.return_value = False
|
|
|
|
message_data = json.loads(base64.b64decode(mock_cloud_event.data["message"]["data"]).decode('utf-8'))
|
|
|
|
# Simulate the function logic
|
|
image_id = message_data.get('image_id')
|
|
storage_path = message_data.get('storage_path')
|
|
team_id = message_data.get('team_id')
|
|
retry_count = message_data.get('retry_count', 0)
|
|
|
|
# Process image
|
|
success = mock_process_image(image_id, storage_path, team_id, retry_count)
|
|
assert success is False
|
|
|
|
|
|
class TestImageProcessingLogic:
|
|
"""Test cases for image processing logic"""
|
|
|
|
@pytest.fixture
|
|
def mock_storage_setup(self):
|
|
"""Setup mock storage client and blob"""
|
|
with patch('main.storage_client') as mock_storage:
|
|
mock_bucket = Mock()
|
|
mock_blob = Mock()
|
|
|
|
mock_storage.bucket.return_value = mock_bucket
|
|
mock_bucket.blob.return_value = mock_blob
|
|
mock_blob.exists.return_value = True
|
|
mock_blob.download_as_bytes.return_value = b"fake_image_data"
|
|
|
|
yield mock_storage, mock_bucket, mock_blob
|
|
|
|
@pytest.fixture
|
|
def mock_vision_response(self):
|
|
"""Mock Vision API response"""
|
|
mock_response = Mock()
|
|
mock_response.error.message = ""
|
|
|
|
# Mock object annotations
|
|
mock_obj = Mock()
|
|
mock_obj.name = "person"
|
|
mock_obj.score = 0.95
|
|
|
|
# Mock bounding box
|
|
mock_vertex1 = Mock()
|
|
mock_vertex1.x = 0.1
|
|
mock_vertex1.y = 0.1
|
|
mock_vertex2 = Mock()
|
|
mock_vertex2.x = 0.9
|
|
mock_vertex2.y = 0.9
|
|
|
|
mock_obj.bounding_poly.normalized_vertices = [mock_vertex1, Mock(), mock_vertex2, Mock()]
|
|
mock_response.localized_object_annotations = [mock_obj]
|
|
|
|
return mock_response
|
|
|
|
@pytest.fixture
|
|
def mock_label_response(self):
|
|
"""Mock Vision API label response"""
|
|
mock_response = Mock()
|
|
|
|
mock_label = Mock()
|
|
mock_label.description = "person"
|
|
mock_label.score = 0.98
|
|
|
|
mock_response.label_annotations = [mock_label]
|
|
return mock_response
|
|
|
|
@patch('main.generate_image_embeddings')
|
|
@patch('main.update_image_embedding_info')
|
|
def test_process_image_success(self, mock_update_embedding, mock_generate_embeddings, mock_storage_setup):
|
|
"""Test successful image processing"""
|
|
mock_storage, mock_bucket, mock_blob = mock_storage_setup
|
|
|
|
# Mock successful embedding generation
|
|
mock_embeddings = np.random.rand(512).astype(np.float32)
|
|
mock_generate_embeddings.return_value = mock_embeddings
|
|
|
|
# Mock Pinecone index
|
|
with patch('main.index') as mock_index:
|
|
mock_index.upsert = Mock()
|
|
|
|
from main import process_image
|
|
|
|
result = process_image("test-image-123", "test-bucket/path/image.jpg", "team-456", 0)
|
|
|
|
assert result is True
|
|
mock_generate_embeddings.assert_called_once()
|
|
mock_index.upsert.assert_called_once()
|
|
mock_update_embedding.assert_called_once()
|
|
|
|
def test_process_image_storage_not_found(self, mock_storage_setup):
|
|
"""Test processing when image not found in storage"""
|
|
mock_storage, mock_bucket, mock_blob = mock_storage_setup
|
|
mock_blob.exists.return_value = False
|
|
|
|
from main import process_image
|
|
|
|
result = process_image("test-image-123", "test-bucket/path/image.jpg", "team-456", 0)
|
|
|
|
assert result is False
|
|
|
|
@patch('main.generate_image_embeddings')
|
|
def test_process_image_embedding_generation_failed(self, mock_generate_embeddings, mock_storage_setup):
|
|
"""Test processing when embedding generation fails"""
|
|
mock_storage, mock_bucket, mock_blob = mock_storage_setup
|
|
mock_generate_embeddings.return_value = None
|
|
|
|
from main import process_image
|
|
|
|
result = process_image("test-image-123", "test-bucket/path/image.jpg", "team-456", 0)
|
|
|
|
assert result is False
|
|
|
|
@patch('main.vision_client')
|
|
def test_generate_image_embeddings_success(self, mock_vision_client, mock_vision_response, mock_label_response):
|
|
"""Test successful embedding generation"""
|
|
mock_vision_client.object_localization.return_value = mock_vision_response
|
|
mock_vision_client.label_detection.return_value = mock_label_response
|
|
|
|
from main import generate_image_embeddings
|
|
|
|
embeddings = generate_image_embeddings(b"fake_image_data")
|
|
|
|
assert embeddings is not None
|
|
assert isinstance(embeddings, np.ndarray)
|
|
assert embeddings.shape == (512,)
|
|
assert embeddings.dtype == np.float32
|
|
|
|
@patch('main.vision_client')
|
|
def test_generate_image_embeddings_vision_error(self, mock_vision_client):
|
|
"""Test embedding generation with Vision API error"""
|
|
mock_response = Mock()
|
|
mock_response.error.message = "Vision API error"
|
|
mock_vision_client.object_localization.return_value = mock_response
|
|
|
|
from main import generate_image_embeddings
|
|
|
|
embeddings = generate_image_embeddings(b"fake_image_data")
|
|
|
|
assert embeddings is None
|
|
|
|
@patch('main.vision_client')
|
|
def test_generate_image_embeddings_exception(self, mock_vision_client):
|
|
"""Test embedding generation with exception"""
|
|
mock_vision_client.object_localization.side_effect = Exception("API error")
|
|
|
|
from main import generate_image_embeddings
|
|
|
|
embeddings = generate_image_embeddings(b"fake_image_data")
|
|
|
|
assert embeddings is None
|
|
|
|
|
|
class TestFirestoreUpdates:
|
|
"""Test cases for Firestore update operations"""
|
|
|
|
@pytest.fixture
|
|
def mock_firestore_doc(self):
|
|
"""Mock Firestore document reference"""
|
|
with patch('main.firestore_client') as mock_client:
|
|
mock_doc = Mock()
|
|
mock_client.collection.return_value.document.return_value = mock_doc
|
|
yield mock_doc
|
|
|
|
def test_update_image_status_processing(self, mock_firestore_doc):
|
|
"""Test updating image status to processing"""
|
|
from main import update_image_status
|
|
|
|
update_image_status("test-image-123", "processing", 1)
|
|
|
|
mock_firestore_doc.update.assert_called_once()
|
|
call_args = mock_firestore_doc.update.call_args[0][0]
|
|
|
|
assert call_args["embedding_status"] == "processing"
|
|
assert call_args["embedding_retry_count"] == 1
|
|
assert "embedding_last_attempt" in call_args
|
|
|
|
def test_update_image_status_success(self, mock_firestore_doc):
|
|
"""Test updating image status to success"""
|
|
from main import update_image_status
|
|
|
|
update_image_status("test-image-123", "success", 2)
|
|
|
|
mock_firestore_doc.update.assert_called_once()
|
|
call_args = mock_firestore_doc.update.call_args[0][0]
|
|
|
|
assert call_args["embedding_status"] == "success"
|
|
assert call_args["has_embedding"] is True
|
|
assert call_args["embedding_error"] is None
|
|
|
|
def test_update_image_status_failed(self, mock_firestore_doc):
|
|
"""Test updating image status to failed"""
|
|
from main import update_image_status
|
|
|
|
update_image_status("test-image-123", "failed", 3, "Processing error")
|
|
|
|
mock_firestore_doc.update.assert_called_once()
|
|
call_args = mock_firestore_doc.update.call_args[0][0]
|
|
|
|
assert call_args["embedding_status"] == "failed"
|
|
assert call_args["embedding_error"] == "Processing error"
|
|
|
|
def test_update_image_embedding_info(self, mock_firestore_doc):
|
|
"""Test updating image embedding information"""
|
|
from main import update_image_embedding_info
|
|
|
|
update_image_embedding_info("test-image-123", "team-456_test-image-123", "google-vision-v1")
|
|
|
|
mock_firestore_doc.update.assert_called_once()
|
|
call_args = mock_firestore_doc.update.call_args[0][0]
|
|
|
|
assert call_args["embedding_id"] == "team-456_test-image-123"
|
|
assert call_args["embedding_model"] == "google-vision-v1"
|
|
assert call_args["has_embedding"] is True
|
|
|
|
def test_update_image_status_firestore_error(self, mock_firestore_doc):
|
|
"""Test handling Firestore update errors"""
|
|
mock_firestore_doc.update.side_effect = Exception("Firestore error")
|
|
|
|
from main import update_image_status
|
|
|
|
# Should not raise exception, just log error
|
|
update_image_status("test-image-123", "failed", 1, "Test error")
|
|
|
|
mock_firestore_doc.update.assert_called_once() |