import pytest import numpy as np from unittest.mock import patch, MagicMock from bson import ObjectId from io import BytesIO from PIL import Image from src.services.image_processor import ImageProcessor from src.models.image import ImageModel class TestImageProcessor: """Test image processing functionality""" @pytest.fixture def image_processor(self): """Create image processor instance""" return ImageProcessor() @pytest.fixture def sample_image_data(self): """Create sample image data""" # Create a simple test image using PIL img = Image.new('RGB', (800, 600), color='red') img_bytes = BytesIO() img.save(img_bytes, format='JPEG') img_bytes.seek(0) return img_bytes @pytest.fixture def sample_png_image(self): """Create sample PNG image data""" img = Image.new('RGBA', (400, 300), color=(255, 0, 0, 128)) img_bytes = BytesIO() img.save(img_bytes, format='PNG') img_bytes.seek(0) return img_bytes @pytest.fixture def sample_image_model(self): """Create a sample image model""" return ImageModel( filename="test-image.jpg", original_filename="test_image.jpg", file_size=1024, content_type="image/jpeg", storage_path="images/test-image.jpg", team_id=ObjectId(), uploader_id=ObjectId() ) def test_extract_image_metadata(self, image_processor, sample_image_data): """Test extracting basic image metadata""" # Extract metadata metadata = image_processor.extract_metadata(sample_image_data) # Verify metadata extraction assert 'width' in metadata assert 'height' in metadata assert 'format' in metadata assert 'mode' in metadata assert metadata['width'] == 800 assert metadata['height'] == 600 assert metadata['format'] == 'JPEG' def test_extract_exif_data(self, image_processor): """Test extracting EXIF data from images""" # Create image with EXIF data (simulated) img = Image.new('RGB', (100, 100), color='blue') img_bytes = BytesIO() img.save(img_bytes, format='JPEG') img_bytes.seek(0) # Extract EXIF data exif_data = image_processor.extract_exif_data(img_bytes) # Verify EXIF extraction (may be empty for generated images) assert isinstance(exif_data, dict) def test_resize_image(self, image_processor, sample_image_data): """Test resizing images while maintaining aspect ratio""" # Resize image resized_data = image_processor.resize_image( sample_image_data, max_width=400, max_height=300 ) # Verify resized image assert resized_data is not None # Check new dimensions resized_img = Image.open(resized_data) assert resized_img.width <= 400 assert resized_img.height <= 300 # Aspect ratio should be maintained original_ratio = 800 / 600 new_ratio = resized_img.width / resized_img.height assert abs(original_ratio - new_ratio) < 0.01 def test_generate_thumbnail(self, image_processor, sample_image_data): """Test generating image thumbnails""" # Generate thumbnail thumbnail_data = image_processor.generate_thumbnail( sample_image_data, size=(150, 150) ) # Verify thumbnail assert thumbnail_data is not None # Check thumbnail dimensions thumbnail_img = Image.open(thumbnail_data) assert thumbnail_img.width <= 150 assert thumbnail_img.height <= 150 def test_optimize_image_quality(self, image_processor, sample_image_data): """Test optimizing image quality and file size""" # Get original size original_size = len(sample_image_data.getvalue()) # Optimize image optimized_data = image_processor.optimize_image( sample_image_data, quality=85, optimize=True ) # Verify optimization assert optimized_data is not None optimized_size = len(optimized_data.getvalue()) # Optimized image should typically be smaller or similar size assert optimized_size <= original_size * 1.1 # Allow 10% tolerance def test_convert_image_format(self, image_processor, sample_png_image): """Test converting between image formats""" # Convert PNG to JPEG jpeg_data = image_processor.convert_format( sample_png_image, target_format='JPEG' ) # Verify conversion assert jpeg_data is not None # Check converted image converted_img = Image.open(jpeg_data) assert converted_img.format == 'JPEG' def test_detect_image_colors(self, image_processor, sample_image_data): """Test detecting dominant colors in images""" # Detect colors colors = image_processor.detect_dominant_colors( sample_image_data, num_colors=5 ) # Verify color detection assert isinstance(colors, list) assert len(colors) <= 5 # Each color should have RGB values and percentage for color in colors: assert 'rgb' in color assert 'percentage' in color assert len(color['rgb']) == 3 assert 0 <= color['percentage'] <= 100 def test_validate_image_format(self, image_processor, sample_image_data): """Test validating supported image formats""" # Valid image should pass validation is_valid = image_processor.validate_image_format(sample_image_data) assert is_valid is True # Invalid data should fail validation invalid_data = BytesIO(b'not_an_image') is_valid = image_processor.validate_image_format(invalid_data) assert is_valid is False def test_calculate_image_hash(self, image_processor, sample_image_data): """Test calculating perceptual hash for duplicate detection""" # Calculate hash image_hash = image_processor.calculate_perceptual_hash(sample_image_data) # Verify hash assert image_hash is not None assert isinstance(image_hash, str) assert len(image_hash) > 0 # Same image should produce same hash sample_image_data.seek(0) hash2 = image_processor.calculate_perceptual_hash(sample_image_data) assert image_hash == hash2 def test_detect_image_orientation(self, image_processor, sample_image_data): """Test detecting and correcting image orientation""" # Detect orientation orientation = image_processor.detect_orientation(sample_image_data) # Verify orientation detection assert orientation in [0, 90, 180, 270] # Auto-correct orientation if needed corrected_data = image_processor.auto_correct_orientation(sample_image_data) assert corrected_data is not None def test_extract_text_from_image(self, image_processor): """Test OCR text extraction from images""" # Create image with text (simulated) img = Image.new('RGB', (200, 100), color='white') img_bytes = BytesIO() img.save(img_bytes, format='JPEG') img_bytes.seek(0) with patch('src.services.image_processor.pytesseract') as mock_ocr: mock_ocr.image_to_string.return_value = "Sample text" # Extract text text = image_processor.extract_text(img_bytes) # Verify text extraction assert text == "Sample text" mock_ocr.image_to_string.assert_called_once() def test_batch_process_images(self, image_processor): """Test batch processing multiple images""" # Create batch of images image_batch = [] for i in range(3): img = Image.new('RGB', (100, 100), color=(i*80, 0, 0)) img_bytes = BytesIO() img.save(img_bytes, format='JPEG') img_bytes.seek(0) image_batch.append(img_bytes) # Process batch results = image_processor.batch_process( image_batch, operations=['resize', 'thumbnail', 'metadata'] ) # Verify batch processing assert len(results) == 3 for result in results: assert 'metadata' in result assert 'resized' in result assert 'thumbnail' in result def test_image_quality_assessment(self, image_processor, sample_image_data): """Test assessing image quality metrics""" # Assess quality quality_metrics = image_processor.assess_quality(sample_image_data) # Verify quality metrics assert 'sharpness' in quality_metrics assert 'brightness' in quality_metrics assert 'contrast' in quality_metrics assert 'overall_score' in quality_metrics # Scores should be in valid ranges assert 0 <= quality_metrics['overall_score'] <= 100 def test_watermark_addition(self, image_processor, sample_image_data): """Test adding watermarks to images""" # Add text watermark watermarked_data = image_processor.add_watermark( sample_image_data, watermark_text="SEREACT", position="bottom-right", opacity=0.5 ) # Verify watermark addition assert watermarked_data is not None # Check that image is still valid watermarked_img = Image.open(watermarked_data) assert watermarked_img.format == 'JPEG' def test_image_compression_levels(self, image_processor, sample_image_data): """Test different compression levels""" original_size = len(sample_image_data.getvalue()) # Test different quality levels for quality in [95, 85, 75, 60]: compressed_data = image_processor.compress_image( sample_image_data, quality=quality ) compressed_size = len(compressed_data.getvalue()) # Lower quality should generally result in smaller files if quality < 95: assert compressed_size <= original_size # Reset stream position sample_image_data.seek(0) def test_handle_corrupted_image(self, image_processor): """Test handling of corrupted image data""" # Create corrupted image data corrupted_data = BytesIO(b'\x89PNG\r\n\x1a\n\x00\x00corrupted') # Should handle gracefully with pytest.raises(Exception): image_processor.extract_metadata(corrupted_data) def test_large_image_processing(self, image_processor): """Test processing very large images""" # Create large image (simulated) large_img = Image.new('RGB', (4000, 3000), color='green') img_bytes = BytesIO() large_img.save(img_bytes, format='JPEG', quality=95) img_bytes.seek(0) # Process large image metadata = image_processor.extract_metadata(img_bytes) # Verify processing assert metadata['width'] == 4000 assert metadata['height'] == 3000 # Test resizing large image img_bytes.seek(0) resized_data = image_processor.resize_image( img_bytes, max_width=1920, max_height=1080 ) resized_img = Image.open(resized_data) assert resized_img.width <= 1920 assert resized_img.height <= 1080 def test_progressive_jpeg_support(self, image_processor, sample_image_data): """Test support for progressive JPEG format""" # Convert to progressive JPEG progressive_data = image_processor.convert_to_progressive_jpeg( sample_image_data ) # Verify progressive format assert progressive_data is not None # Check that it's still a valid JPEG progressive_img = Image.open(progressive_data) assert progressive_img.format == 'JPEG' class TestImageProcessorIntegration: """Integration tests for image processor with other services""" def test_integration_with_storage_service(self, image_processor, sample_image_data): """Test integration with storage service""" with patch('src.services.storage.StorageService') as mock_storage: mock_storage_instance = mock_storage.return_value mock_storage_instance.upload_file.return_value = ( 'images/processed.jpg', 'image/jpeg', 1024, {} ) # Process and upload image result = image_processor.process_and_upload( sample_image_data, operations=['resize', 'optimize'], team_id=str(ObjectId()) ) # Verify integration assert 'storage_path' in result mock_storage_instance.upload_file.assert_called_once() def test_integration_with_embedding_service(self, image_processor, sample_image_data): """Test integration with embedding service""" with patch('src.services.embedding_service.EmbeddingService') as mock_embedding: mock_embedding_instance = mock_embedding.return_value mock_embedding_instance.generate_embedding.return_value = [0.1] * 512 # Process image and generate embedding result = image_processor.process_for_embedding(sample_image_data) # Verify integration assert 'processed_image' in result assert 'embedding' in result mock_embedding_instance.generate_embedding.assert_called_once() def test_pubsub_message_processing(self, image_processor): """Test processing images from Pub/Sub messages""" # Mock Pub/Sub message message_data = { 'image_id': str(ObjectId()), 'storage_path': 'images/raw/test.jpg', 'operations': ['resize', 'thumbnail', 'optimize'] } with patch.object(image_processor, 'process_from_storage') as mock_process: mock_process.return_value = { 'processed_path': 'images/processed/test.jpg', 'thumbnail_path': 'images/thumbnails/test.jpg' } # Process message result = image_processor.handle_processing_message(message_data) # Verify message processing assert 'processed_path' in result mock_process.assert_called_once() def test_error_handling_and_retry(self, image_processor, sample_image_data): """Test error handling and retry mechanisms""" # Mock transient error followed by success with patch.object(image_processor, 'extract_metadata') as mock_extract: # First call fails, second succeeds mock_extract.side_effect = [ Exception("Transient error"), {'width': 800, 'height': 600, 'format': 'JPEG'} ] # Should retry and succeed metadata = image_processor.extract_metadata_with_retry( sample_image_data, max_retries=2 ) assert metadata['width'] == 800 assert mock_extract.call_count == 2 class TestImageProcessorPerformance: """Performance tests for image processing""" def test_processing_speed_benchmarks(self, image_processor): """Test processing speed for different image sizes""" import time sizes = [(100, 100), (500, 500), (1000, 1000)] for width, height in sizes: # Create test image img = Image.new('RGB', (width, height), color='blue') img_bytes = BytesIO() img.save(img_bytes, format='JPEG') img_bytes.seek(0) # Measure processing time start_time = time.time() metadata = image_processor.extract_metadata(img_bytes) processing_time = time.time() - start_time # Verify reasonable processing time (adjust thresholds as needed) assert processing_time < 5.0 # Should process within 5 seconds assert metadata['width'] == width assert metadata['height'] == height def test_memory_usage_optimization(self, image_processor): """Test memory usage during image processing""" # This would test memory usage patterns # Implementation depends on memory profiling tools pass def test_concurrent_processing(self, image_processor): """Test concurrent image processing""" import concurrent.futures # Create multiple test images images = [] for i in range(5): img = Image.new('RGB', (200, 200), color=(i*50, 0, 0)) img_bytes = BytesIO() img.save(img_bytes, format='JPEG') img_bytes.seek(0) images.append(img_bytes) # Process concurrently with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor: futures = [ executor.submit(image_processor.extract_metadata, img) for img in images ] results = [future.result() for future in futures] # Verify all processed successfully assert len(results) == 5 for result in results: assert 'width' in result assert 'height' in result