489 lines
18 KiB
Python
489 lines
18 KiB
Python
import pytest
|
|
import numpy as np
|
|
from unittest.mock import patch, MagicMock
|
|
from bson import ObjectId
|
|
from io import BytesIO
|
|
from PIL import Image
|
|
|
|
from src.services.image_processor import ImageProcessor
|
|
from src.models.image import ImageModel
|
|
|
|
|
|
class TestImageProcessor:
|
|
"""Test image processing functionality"""
|
|
|
|
@pytest.fixture
|
|
def image_processor(self):
|
|
"""Create image processor instance"""
|
|
return ImageProcessor()
|
|
|
|
@pytest.fixture
|
|
def sample_image_data(self):
|
|
"""Create sample image data"""
|
|
# Create a simple test image using PIL
|
|
img = Image.new('RGB', (800, 600), color='red')
|
|
img_bytes = BytesIO()
|
|
img.save(img_bytes, format='JPEG')
|
|
img_bytes.seek(0)
|
|
return img_bytes
|
|
|
|
@pytest.fixture
|
|
def sample_png_image(self):
|
|
"""Create sample PNG image data"""
|
|
img = Image.new('RGBA', (400, 300), color=(255, 0, 0, 128))
|
|
img_bytes = BytesIO()
|
|
img.save(img_bytes, format='PNG')
|
|
img_bytes.seek(0)
|
|
return img_bytes
|
|
|
|
@pytest.fixture
|
|
def sample_image_model(self):
|
|
"""Create a sample image model"""
|
|
return ImageModel(
|
|
filename="test-image.jpg",
|
|
original_filename="test_image.jpg",
|
|
file_size=1024,
|
|
content_type="image/jpeg",
|
|
storage_path="images/test-image.jpg",
|
|
team_id=ObjectId(),
|
|
uploader_id=ObjectId()
|
|
)
|
|
|
|
def test_extract_image_metadata(self, image_processor, sample_image_data):
|
|
"""Test extracting basic image metadata"""
|
|
# Extract metadata
|
|
metadata = image_processor.extract_metadata(sample_image_data)
|
|
|
|
# Verify metadata extraction
|
|
assert 'width' in metadata
|
|
assert 'height' in metadata
|
|
assert 'format' in metadata
|
|
assert 'mode' in metadata
|
|
assert metadata['width'] == 800
|
|
assert metadata['height'] == 600
|
|
assert metadata['format'] == 'JPEG'
|
|
|
|
def test_extract_exif_data(self, image_processor):
|
|
"""Test extracting EXIF data from images"""
|
|
# Create image with EXIF data (simulated)
|
|
img = Image.new('RGB', (100, 100), color='blue')
|
|
img_bytes = BytesIO()
|
|
img.save(img_bytes, format='JPEG')
|
|
img_bytes.seek(0)
|
|
|
|
# Extract EXIF data
|
|
exif_data = image_processor.extract_exif_data(img_bytes)
|
|
|
|
# Verify EXIF extraction (may be empty for generated images)
|
|
assert isinstance(exif_data, dict)
|
|
|
|
def test_resize_image(self, image_processor, sample_image_data):
|
|
"""Test resizing images while maintaining aspect ratio"""
|
|
# Resize image
|
|
resized_data = image_processor.resize_image(
|
|
sample_image_data,
|
|
max_width=400,
|
|
max_height=300
|
|
)
|
|
|
|
# Verify resized image
|
|
assert resized_data is not None
|
|
|
|
# Check new dimensions
|
|
resized_img = Image.open(resized_data)
|
|
assert resized_img.width <= 400
|
|
assert resized_img.height <= 300
|
|
|
|
# Aspect ratio should be maintained
|
|
original_ratio = 800 / 600
|
|
new_ratio = resized_img.width / resized_img.height
|
|
assert abs(original_ratio - new_ratio) < 0.01
|
|
|
|
def test_generate_thumbnail(self, image_processor, sample_image_data):
|
|
"""Test generating image thumbnails"""
|
|
# Generate thumbnail
|
|
thumbnail_data = image_processor.generate_thumbnail(
|
|
sample_image_data,
|
|
size=(150, 150)
|
|
)
|
|
|
|
# Verify thumbnail
|
|
assert thumbnail_data is not None
|
|
|
|
# Check thumbnail dimensions
|
|
thumbnail_img = Image.open(thumbnail_data)
|
|
assert thumbnail_img.width <= 150
|
|
assert thumbnail_img.height <= 150
|
|
|
|
def test_optimize_image_quality(self, image_processor, sample_image_data):
|
|
"""Test optimizing image quality and file size"""
|
|
# Get original size
|
|
original_size = len(sample_image_data.getvalue())
|
|
|
|
# Optimize image
|
|
optimized_data = image_processor.optimize_image(
|
|
sample_image_data,
|
|
quality=85,
|
|
optimize=True
|
|
)
|
|
|
|
# Verify optimization
|
|
assert optimized_data is not None
|
|
optimized_size = len(optimized_data.getvalue())
|
|
|
|
# Optimized image should typically be smaller or similar size
|
|
assert optimized_size <= original_size * 1.1 # Allow 10% tolerance
|
|
|
|
def test_convert_image_format(self, image_processor, sample_png_image):
|
|
"""Test converting between image formats"""
|
|
# Convert PNG to JPEG
|
|
jpeg_data = image_processor.convert_format(
|
|
sample_png_image,
|
|
target_format='JPEG'
|
|
)
|
|
|
|
# Verify conversion
|
|
assert jpeg_data is not None
|
|
|
|
# Check converted image
|
|
converted_img = Image.open(jpeg_data)
|
|
assert converted_img.format == 'JPEG'
|
|
|
|
def test_detect_image_colors(self, image_processor, sample_image_data):
|
|
"""Test detecting dominant colors in images"""
|
|
# Detect colors
|
|
colors = image_processor.detect_dominant_colors(
|
|
sample_image_data,
|
|
num_colors=5
|
|
)
|
|
|
|
# Verify color detection
|
|
assert isinstance(colors, list)
|
|
assert len(colors) <= 5
|
|
|
|
# Each color should have RGB values and percentage
|
|
for color in colors:
|
|
assert 'rgb' in color
|
|
assert 'percentage' in color
|
|
assert len(color['rgb']) == 3
|
|
assert 0 <= color['percentage'] <= 100
|
|
|
|
def test_validate_image_format(self, image_processor, sample_image_data):
|
|
"""Test validating supported image formats"""
|
|
# Valid image should pass validation
|
|
is_valid = image_processor.validate_image_format(sample_image_data)
|
|
assert is_valid is True
|
|
|
|
# Invalid data should fail validation
|
|
invalid_data = BytesIO(b'not_an_image')
|
|
is_valid = image_processor.validate_image_format(invalid_data)
|
|
assert is_valid is False
|
|
|
|
def test_calculate_image_hash(self, image_processor, sample_image_data):
|
|
"""Test calculating perceptual hash for duplicate detection"""
|
|
# Calculate hash
|
|
image_hash = image_processor.calculate_perceptual_hash(sample_image_data)
|
|
|
|
# Verify hash
|
|
assert image_hash is not None
|
|
assert isinstance(image_hash, str)
|
|
assert len(image_hash) > 0
|
|
|
|
# Same image should produce same hash
|
|
sample_image_data.seek(0)
|
|
hash2 = image_processor.calculate_perceptual_hash(sample_image_data)
|
|
assert image_hash == hash2
|
|
|
|
def test_detect_image_orientation(self, image_processor, sample_image_data):
|
|
"""Test detecting and correcting image orientation"""
|
|
# Detect orientation
|
|
orientation = image_processor.detect_orientation(sample_image_data)
|
|
|
|
# Verify orientation detection
|
|
assert orientation in [0, 90, 180, 270]
|
|
|
|
# Auto-correct orientation if needed
|
|
corrected_data = image_processor.auto_correct_orientation(sample_image_data)
|
|
assert corrected_data is not None
|
|
|
|
def test_extract_text_from_image(self, image_processor):
|
|
"""Test OCR text extraction from images"""
|
|
# Create image with text (simulated)
|
|
img = Image.new('RGB', (200, 100), color='white')
|
|
img_bytes = BytesIO()
|
|
img.save(img_bytes, format='JPEG')
|
|
img_bytes.seek(0)
|
|
|
|
with patch('src.services.image_processor.pytesseract') as mock_ocr:
|
|
mock_ocr.image_to_string.return_value = "Sample text"
|
|
|
|
# Extract text
|
|
text = image_processor.extract_text(img_bytes)
|
|
|
|
# Verify text extraction
|
|
assert text == "Sample text"
|
|
mock_ocr.image_to_string.assert_called_once()
|
|
|
|
def test_batch_process_images(self, image_processor):
|
|
"""Test batch processing multiple images"""
|
|
# Create batch of images
|
|
image_batch = []
|
|
for i in range(3):
|
|
img = Image.new('RGB', (100, 100), color=(i*80, 0, 0))
|
|
img_bytes = BytesIO()
|
|
img.save(img_bytes, format='JPEG')
|
|
img_bytes.seek(0)
|
|
image_batch.append(img_bytes)
|
|
|
|
# Process batch
|
|
results = image_processor.batch_process(
|
|
image_batch,
|
|
operations=['resize', 'thumbnail', 'metadata']
|
|
)
|
|
|
|
# Verify batch processing
|
|
assert len(results) == 3
|
|
for result in results:
|
|
assert 'metadata' in result
|
|
assert 'resized' in result
|
|
assert 'thumbnail' in result
|
|
|
|
def test_image_quality_assessment(self, image_processor, sample_image_data):
|
|
"""Test assessing image quality metrics"""
|
|
# Assess quality
|
|
quality_metrics = image_processor.assess_quality(sample_image_data)
|
|
|
|
# Verify quality metrics
|
|
assert 'sharpness' in quality_metrics
|
|
assert 'brightness' in quality_metrics
|
|
assert 'contrast' in quality_metrics
|
|
assert 'overall_score' in quality_metrics
|
|
|
|
# Scores should be in valid ranges
|
|
assert 0 <= quality_metrics['overall_score'] <= 100
|
|
|
|
def test_watermark_addition(self, image_processor, sample_image_data):
|
|
"""Test adding watermarks to images"""
|
|
# Add text watermark
|
|
watermarked_data = image_processor.add_watermark(
|
|
sample_image_data,
|
|
watermark_text="SEREACT",
|
|
position="bottom-right",
|
|
opacity=0.5
|
|
)
|
|
|
|
# Verify watermark addition
|
|
assert watermarked_data is not None
|
|
|
|
# Check that image is still valid
|
|
watermarked_img = Image.open(watermarked_data)
|
|
assert watermarked_img.format == 'JPEG'
|
|
|
|
def test_image_compression_levels(self, image_processor, sample_image_data):
|
|
"""Test different compression levels"""
|
|
original_size = len(sample_image_data.getvalue())
|
|
|
|
# Test different quality levels
|
|
for quality in [95, 85, 75, 60]:
|
|
compressed_data = image_processor.compress_image(
|
|
sample_image_data,
|
|
quality=quality
|
|
)
|
|
|
|
compressed_size = len(compressed_data.getvalue())
|
|
|
|
# Lower quality should generally result in smaller files
|
|
if quality < 95:
|
|
assert compressed_size <= original_size
|
|
|
|
# Reset stream position
|
|
sample_image_data.seek(0)
|
|
|
|
def test_handle_corrupted_image(self, image_processor):
|
|
"""Test handling of corrupted image data"""
|
|
# Create corrupted image data
|
|
corrupted_data = BytesIO(b'\x89PNG\r\n\x1a\n\x00\x00corrupted')
|
|
|
|
# Should handle gracefully
|
|
with pytest.raises(Exception):
|
|
image_processor.extract_metadata(corrupted_data)
|
|
|
|
def test_large_image_processing(self, image_processor):
|
|
"""Test processing very large images"""
|
|
# Create large image (simulated)
|
|
large_img = Image.new('RGB', (4000, 3000), color='green')
|
|
img_bytes = BytesIO()
|
|
large_img.save(img_bytes, format='JPEG', quality=95)
|
|
img_bytes.seek(0)
|
|
|
|
# Process large image
|
|
metadata = image_processor.extract_metadata(img_bytes)
|
|
|
|
# Verify processing
|
|
assert metadata['width'] == 4000
|
|
assert metadata['height'] == 3000
|
|
|
|
# Test resizing large image
|
|
img_bytes.seek(0)
|
|
resized_data = image_processor.resize_image(
|
|
img_bytes,
|
|
max_width=1920,
|
|
max_height=1080
|
|
)
|
|
|
|
resized_img = Image.open(resized_data)
|
|
assert resized_img.width <= 1920
|
|
assert resized_img.height <= 1080
|
|
|
|
def test_progressive_jpeg_support(self, image_processor, sample_image_data):
|
|
"""Test support for progressive JPEG format"""
|
|
# Convert to progressive JPEG
|
|
progressive_data = image_processor.convert_to_progressive_jpeg(
|
|
sample_image_data
|
|
)
|
|
|
|
# Verify progressive format
|
|
assert progressive_data is not None
|
|
|
|
# Check that it's still a valid JPEG
|
|
progressive_img = Image.open(progressive_data)
|
|
assert progressive_img.format == 'JPEG'
|
|
|
|
|
|
class TestImageProcessorIntegration:
|
|
"""Integration tests for image processor with other services"""
|
|
|
|
def test_integration_with_storage_service(self, image_processor, sample_image_data):
|
|
"""Test integration with storage service"""
|
|
with patch('src.services.storage.StorageService') as mock_storage:
|
|
mock_storage_instance = mock_storage.return_value
|
|
mock_storage_instance.upload_file.return_value = (
|
|
'images/processed.jpg', 'image/jpeg', 1024, {}
|
|
)
|
|
|
|
# Process and upload image
|
|
result = image_processor.process_and_upload(
|
|
sample_image_data,
|
|
operations=['resize', 'optimize'],
|
|
team_id=str(ObjectId())
|
|
)
|
|
|
|
# Verify integration
|
|
assert 'storage_path' in result
|
|
mock_storage_instance.upload_file.assert_called_once()
|
|
|
|
def test_integration_with_embedding_service(self, image_processor, sample_image_data):
|
|
"""Test integration with embedding service"""
|
|
with patch('src.services.embedding_service.EmbeddingService') as mock_embedding:
|
|
mock_embedding_instance = mock_embedding.return_value
|
|
mock_embedding_instance.generate_embedding.return_value = [0.1] * 512
|
|
|
|
# Process image and generate embedding
|
|
result = image_processor.process_for_embedding(sample_image_data)
|
|
|
|
# Verify integration
|
|
assert 'processed_image' in result
|
|
assert 'embedding' in result
|
|
mock_embedding_instance.generate_embedding.assert_called_once()
|
|
|
|
def test_pubsub_message_processing(self, image_processor):
|
|
"""Test processing images from Pub/Sub messages"""
|
|
# Mock Pub/Sub message
|
|
message_data = {
|
|
'image_id': str(ObjectId()),
|
|
'storage_path': 'images/raw/test.jpg',
|
|
'operations': ['resize', 'thumbnail', 'optimize']
|
|
}
|
|
|
|
with patch.object(image_processor, 'process_from_storage') as mock_process:
|
|
mock_process.return_value = {
|
|
'processed_path': 'images/processed/test.jpg',
|
|
'thumbnail_path': 'images/thumbnails/test.jpg'
|
|
}
|
|
|
|
# Process message
|
|
result = image_processor.handle_processing_message(message_data)
|
|
|
|
# Verify message processing
|
|
assert 'processed_path' in result
|
|
mock_process.assert_called_once()
|
|
|
|
def test_error_handling_and_retry(self, image_processor, sample_image_data):
|
|
"""Test error handling and retry mechanisms"""
|
|
# Mock transient error followed by success
|
|
with patch.object(image_processor, 'extract_metadata') as mock_extract:
|
|
# First call fails, second succeeds
|
|
mock_extract.side_effect = [
|
|
Exception("Transient error"),
|
|
{'width': 800, 'height': 600, 'format': 'JPEG'}
|
|
]
|
|
|
|
# Should retry and succeed
|
|
metadata = image_processor.extract_metadata_with_retry(
|
|
sample_image_data,
|
|
max_retries=2
|
|
)
|
|
|
|
assert metadata['width'] == 800
|
|
assert mock_extract.call_count == 2
|
|
|
|
|
|
class TestImageProcessorPerformance:
|
|
"""Performance tests for image processing"""
|
|
|
|
def test_processing_speed_benchmarks(self, image_processor):
|
|
"""Test processing speed for different image sizes"""
|
|
import time
|
|
|
|
sizes = [(100, 100), (500, 500), (1000, 1000)]
|
|
|
|
for width, height in sizes:
|
|
# Create test image
|
|
img = Image.new('RGB', (width, height), color='blue')
|
|
img_bytes = BytesIO()
|
|
img.save(img_bytes, format='JPEG')
|
|
img_bytes.seek(0)
|
|
|
|
# Measure processing time
|
|
start_time = time.time()
|
|
metadata = image_processor.extract_metadata(img_bytes)
|
|
processing_time = time.time() - start_time
|
|
|
|
# Verify reasonable processing time (adjust thresholds as needed)
|
|
assert processing_time < 5.0 # Should process within 5 seconds
|
|
assert metadata['width'] == width
|
|
assert metadata['height'] == height
|
|
|
|
def test_memory_usage_optimization(self, image_processor):
|
|
"""Test memory usage during image processing"""
|
|
# This would test memory usage patterns
|
|
# Implementation depends on memory profiling tools
|
|
pass
|
|
|
|
def test_concurrent_processing(self, image_processor):
|
|
"""Test concurrent image processing"""
|
|
import concurrent.futures
|
|
|
|
# Create multiple test images
|
|
images = []
|
|
for i in range(5):
|
|
img = Image.new('RGB', (200, 200), color=(i*50, 0, 0))
|
|
img_bytes = BytesIO()
|
|
img.save(img_bytes, format='JPEG')
|
|
img_bytes.seek(0)
|
|
images.append(img_bytes)
|
|
|
|
# Process concurrently
|
|
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
|
|
futures = [
|
|
executor.submit(image_processor.extract_metadata, img)
|
|
for img in images
|
|
]
|
|
|
|
results = [future.result() for future in futures]
|
|
|
|
# Verify all processed successfully
|
|
assert len(results) == 5
|
|
for result in results:
|
|
assert 'width' in result
|
|
assert 'height' in result |