image_management_api/tests/services/test_image_processor.py
2025-05-24 12:27:32 +02:00

489 lines
18 KiB
Python

import pytest
import numpy as np
from unittest.mock import patch, MagicMock
from bson import ObjectId
from io import BytesIO
from PIL import Image
from src.services.image_processor import ImageProcessor
from src.models.image import ImageModel
class TestImageProcessor:
"""Test image processing functionality"""
@pytest.fixture
def image_processor(self):
"""Create image processor instance"""
return ImageProcessor()
@pytest.fixture
def sample_image_data(self):
"""Create sample image data"""
# Create a simple test image using PIL
img = Image.new('RGB', (800, 600), color='red')
img_bytes = BytesIO()
img.save(img_bytes, format='JPEG')
img_bytes.seek(0)
return img_bytes
@pytest.fixture
def sample_png_image(self):
"""Create sample PNG image data"""
img = Image.new('RGBA', (400, 300), color=(255, 0, 0, 128))
img_bytes = BytesIO()
img.save(img_bytes, format='PNG')
img_bytes.seek(0)
return img_bytes
@pytest.fixture
def sample_image_model(self):
"""Create a sample image model"""
return ImageModel(
filename="test-image.jpg",
original_filename="test_image.jpg",
file_size=1024,
content_type="image/jpeg",
storage_path="images/test-image.jpg",
team_id=ObjectId(),
uploader_id=ObjectId()
)
def test_extract_image_metadata(self, image_processor, sample_image_data):
"""Test extracting basic image metadata"""
# Extract metadata
metadata = image_processor.extract_metadata(sample_image_data)
# Verify metadata extraction
assert 'width' in metadata
assert 'height' in metadata
assert 'format' in metadata
assert 'mode' in metadata
assert metadata['width'] == 800
assert metadata['height'] == 600
assert metadata['format'] == 'JPEG'
def test_extract_exif_data(self, image_processor):
"""Test extracting EXIF data from images"""
# Create image with EXIF data (simulated)
img = Image.new('RGB', (100, 100), color='blue')
img_bytes = BytesIO()
img.save(img_bytes, format='JPEG')
img_bytes.seek(0)
# Extract EXIF data
exif_data = image_processor.extract_exif_data(img_bytes)
# Verify EXIF extraction (may be empty for generated images)
assert isinstance(exif_data, dict)
def test_resize_image(self, image_processor, sample_image_data):
"""Test resizing images while maintaining aspect ratio"""
# Resize image
resized_data = image_processor.resize_image(
sample_image_data,
max_width=400,
max_height=300
)
# Verify resized image
assert resized_data is not None
# Check new dimensions
resized_img = Image.open(resized_data)
assert resized_img.width <= 400
assert resized_img.height <= 300
# Aspect ratio should be maintained
original_ratio = 800 / 600
new_ratio = resized_img.width / resized_img.height
assert abs(original_ratio - new_ratio) < 0.01
def test_generate_thumbnail(self, image_processor, sample_image_data):
"""Test generating image thumbnails"""
# Generate thumbnail
thumbnail_data = image_processor.generate_thumbnail(
sample_image_data,
size=(150, 150)
)
# Verify thumbnail
assert thumbnail_data is not None
# Check thumbnail dimensions
thumbnail_img = Image.open(thumbnail_data)
assert thumbnail_img.width <= 150
assert thumbnail_img.height <= 150
def test_optimize_image_quality(self, image_processor, sample_image_data):
"""Test optimizing image quality and file size"""
# Get original size
original_size = len(sample_image_data.getvalue())
# Optimize image
optimized_data = image_processor.optimize_image(
sample_image_data,
quality=85,
optimize=True
)
# Verify optimization
assert optimized_data is not None
optimized_size = len(optimized_data.getvalue())
# Optimized image should typically be smaller or similar size
assert optimized_size <= original_size * 1.1 # Allow 10% tolerance
def test_convert_image_format(self, image_processor, sample_png_image):
"""Test converting between image formats"""
# Convert PNG to JPEG
jpeg_data = image_processor.convert_format(
sample_png_image,
target_format='JPEG'
)
# Verify conversion
assert jpeg_data is not None
# Check converted image
converted_img = Image.open(jpeg_data)
assert converted_img.format == 'JPEG'
def test_detect_image_colors(self, image_processor, sample_image_data):
"""Test detecting dominant colors in images"""
# Detect colors
colors = image_processor.detect_dominant_colors(
sample_image_data,
num_colors=5
)
# Verify color detection
assert isinstance(colors, list)
assert len(colors) <= 5
# Each color should have RGB values and percentage
for color in colors:
assert 'rgb' in color
assert 'percentage' in color
assert len(color['rgb']) == 3
assert 0 <= color['percentage'] <= 100
def test_validate_image_format(self, image_processor, sample_image_data):
"""Test validating supported image formats"""
# Valid image should pass validation
is_valid = image_processor.validate_image_format(sample_image_data)
assert is_valid is True
# Invalid data should fail validation
invalid_data = BytesIO(b'not_an_image')
is_valid = image_processor.validate_image_format(invalid_data)
assert is_valid is False
def test_calculate_image_hash(self, image_processor, sample_image_data):
"""Test calculating perceptual hash for duplicate detection"""
# Calculate hash
image_hash = image_processor.calculate_perceptual_hash(sample_image_data)
# Verify hash
assert image_hash is not None
assert isinstance(image_hash, str)
assert len(image_hash) > 0
# Same image should produce same hash
sample_image_data.seek(0)
hash2 = image_processor.calculate_perceptual_hash(sample_image_data)
assert image_hash == hash2
def test_detect_image_orientation(self, image_processor, sample_image_data):
"""Test detecting and correcting image orientation"""
# Detect orientation
orientation = image_processor.detect_orientation(sample_image_data)
# Verify orientation detection
assert orientation in [0, 90, 180, 270]
# Auto-correct orientation if needed
corrected_data = image_processor.auto_correct_orientation(sample_image_data)
assert corrected_data is not None
def test_extract_text_from_image(self, image_processor):
"""Test OCR text extraction from images"""
# Create image with text (simulated)
img = Image.new('RGB', (200, 100), color='white')
img_bytes = BytesIO()
img.save(img_bytes, format='JPEG')
img_bytes.seek(0)
with patch('src.services.image_processor.pytesseract') as mock_ocr:
mock_ocr.image_to_string.return_value = "Sample text"
# Extract text
text = image_processor.extract_text(img_bytes)
# Verify text extraction
assert text == "Sample text"
mock_ocr.image_to_string.assert_called_once()
def test_batch_process_images(self, image_processor):
"""Test batch processing multiple images"""
# Create batch of images
image_batch = []
for i in range(3):
img = Image.new('RGB', (100, 100), color=(i*80, 0, 0))
img_bytes = BytesIO()
img.save(img_bytes, format='JPEG')
img_bytes.seek(0)
image_batch.append(img_bytes)
# Process batch
results = image_processor.batch_process(
image_batch,
operations=['resize', 'thumbnail', 'metadata']
)
# Verify batch processing
assert len(results) == 3
for result in results:
assert 'metadata' in result
assert 'resized' in result
assert 'thumbnail' in result
def test_image_quality_assessment(self, image_processor, sample_image_data):
"""Test assessing image quality metrics"""
# Assess quality
quality_metrics = image_processor.assess_quality(sample_image_data)
# Verify quality metrics
assert 'sharpness' in quality_metrics
assert 'brightness' in quality_metrics
assert 'contrast' in quality_metrics
assert 'overall_score' in quality_metrics
# Scores should be in valid ranges
assert 0 <= quality_metrics['overall_score'] <= 100
def test_watermark_addition(self, image_processor, sample_image_data):
"""Test adding watermarks to images"""
# Add text watermark
watermarked_data = image_processor.add_watermark(
sample_image_data,
watermark_text="SEREACT",
position="bottom-right",
opacity=0.5
)
# Verify watermark addition
assert watermarked_data is not None
# Check that image is still valid
watermarked_img = Image.open(watermarked_data)
assert watermarked_img.format == 'JPEG'
def test_image_compression_levels(self, image_processor, sample_image_data):
"""Test different compression levels"""
original_size = len(sample_image_data.getvalue())
# Test different quality levels
for quality in [95, 85, 75, 60]:
compressed_data = image_processor.compress_image(
sample_image_data,
quality=quality
)
compressed_size = len(compressed_data.getvalue())
# Lower quality should generally result in smaller files
if quality < 95:
assert compressed_size <= original_size
# Reset stream position
sample_image_data.seek(0)
def test_handle_corrupted_image(self, image_processor):
"""Test handling of corrupted image data"""
# Create corrupted image data
corrupted_data = BytesIO(b'\x89PNG\r\n\x1a\n\x00\x00corrupted')
# Should handle gracefully
with pytest.raises(Exception):
image_processor.extract_metadata(corrupted_data)
def test_large_image_processing(self, image_processor):
"""Test processing very large images"""
# Create large image (simulated)
large_img = Image.new('RGB', (4000, 3000), color='green')
img_bytes = BytesIO()
large_img.save(img_bytes, format='JPEG', quality=95)
img_bytes.seek(0)
# Process large image
metadata = image_processor.extract_metadata(img_bytes)
# Verify processing
assert metadata['width'] == 4000
assert metadata['height'] == 3000
# Test resizing large image
img_bytes.seek(0)
resized_data = image_processor.resize_image(
img_bytes,
max_width=1920,
max_height=1080
)
resized_img = Image.open(resized_data)
assert resized_img.width <= 1920
assert resized_img.height <= 1080
def test_progressive_jpeg_support(self, image_processor, sample_image_data):
"""Test support for progressive JPEG format"""
# Convert to progressive JPEG
progressive_data = image_processor.convert_to_progressive_jpeg(
sample_image_data
)
# Verify progressive format
assert progressive_data is not None
# Check that it's still a valid JPEG
progressive_img = Image.open(progressive_data)
assert progressive_img.format == 'JPEG'
class TestImageProcessorIntegration:
"""Integration tests for image processor with other services"""
def test_integration_with_storage_service(self, image_processor, sample_image_data):
"""Test integration with storage service"""
with patch('src.services.storage.StorageService') as mock_storage:
mock_storage_instance = mock_storage.return_value
mock_storage_instance.upload_file.return_value = (
'images/processed.jpg', 'image/jpeg', 1024, {}
)
# Process and upload image
result = image_processor.process_and_upload(
sample_image_data,
operations=['resize', 'optimize'],
team_id=str(ObjectId())
)
# Verify integration
assert 'storage_path' in result
mock_storage_instance.upload_file.assert_called_once()
def test_integration_with_embedding_service(self, image_processor, sample_image_data):
"""Test integration with embedding service"""
with patch('src.services.embedding_service.EmbeddingService') as mock_embedding:
mock_embedding_instance = mock_embedding.return_value
mock_embedding_instance.generate_embedding.return_value = [0.1] * 512
# Process image and generate embedding
result = image_processor.process_for_embedding(sample_image_data)
# Verify integration
assert 'processed_image' in result
assert 'embedding' in result
mock_embedding_instance.generate_embedding.assert_called_once()
def test_pubsub_message_processing(self, image_processor):
"""Test processing images from Pub/Sub messages"""
# Mock Pub/Sub message
message_data = {
'image_id': str(ObjectId()),
'storage_path': 'images/raw/test.jpg',
'operations': ['resize', 'thumbnail', 'optimize']
}
with patch.object(image_processor, 'process_from_storage') as mock_process:
mock_process.return_value = {
'processed_path': 'images/processed/test.jpg',
'thumbnail_path': 'images/thumbnails/test.jpg'
}
# Process message
result = image_processor.handle_processing_message(message_data)
# Verify message processing
assert 'processed_path' in result
mock_process.assert_called_once()
def test_error_handling_and_retry(self, image_processor, sample_image_data):
"""Test error handling and retry mechanisms"""
# Mock transient error followed by success
with patch.object(image_processor, 'extract_metadata') as mock_extract:
# First call fails, second succeeds
mock_extract.side_effect = [
Exception("Transient error"),
{'width': 800, 'height': 600, 'format': 'JPEG'}
]
# Should retry and succeed
metadata = image_processor.extract_metadata_with_retry(
sample_image_data,
max_retries=2
)
assert metadata['width'] == 800
assert mock_extract.call_count == 2
class TestImageProcessorPerformance:
"""Performance tests for image processing"""
def test_processing_speed_benchmarks(self, image_processor):
"""Test processing speed for different image sizes"""
import time
sizes = [(100, 100), (500, 500), (1000, 1000)]
for width, height in sizes:
# Create test image
img = Image.new('RGB', (width, height), color='blue')
img_bytes = BytesIO()
img.save(img_bytes, format='JPEG')
img_bytes.seek(0)
# Measure processing time
start_time = time.time()
metadata = image_processor.extract_metadata(img_bytes)
processing_time = time.time() - start_time
# Verify reasonable processing time (adjust thresholds as needed)
assert processing_time < 5.0 # Should process within 5 seconds
assert metadata['width'] == width
assert metadata['height'] == height
def test_memory_usage_optimization(self, image_processor):
"""Test memory usage during image processing"""
# This would test memory usage patterns
# Implementation depends on memory profiling tools
pass
def test_concurrent_processing(self, image_processor):
"""Test concurrent image processing"""
import concurrent.futures
# Create multiple test images
images = []
for i in range(5):
img = Image.new('RGB', (200, 200), color=(i*50, 0, 0))
img_bytes = BytesIO()
img.save(img_bytes, format='JPEG')
img_bytes.seek(0)
images.append(img_bytes)
# Process concurrently
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
futures = [
executor.submit(image_processor.extract_metadata, img)
for img in images
]
results = [future.result() for future in futures]
# Verify all processed successfully
assert len(results) == 5
for result in results:
assert 'width' in result
assert 'height' in result