image_management_api/src/services/image_service.py
2025-05-25 19:05:34 +02:00

370 lines
14 KiB
Python

import logging
from typing import Optional, List, Tuple
from fastapi import UploadFile, Request
from bson import ObjectId
import io
from src.db.repositories.image_repository import image_repository
from src.services.storage import StorageService
from src.services.image_processor import ImageProcessor
from src.services.embedding_service import EmbeddingService
from src.services.pubsub_service import pubsub_service
from src.models.image import ImageModel
from src.models.user import UserModel
from src.schemas.image import ImageResponse, ImageListResponse, ImageCreate, ImageUpdate
logger = logging.getLogger(__name__)
class ImageService:
"""Service class for handling image-related business logic"""
def __init__(self):
self.storage_service = StorageService()
self.image_processor = ImageProcessor()
self.embedding_service = EmbeddingService()
def _generate_api_download_url(self, request: Request, image_id: str) -> str:
"""Generate API download URL for an image"""
base_url = str(request.base_url).rstrip('/')
return f"{base_url}/api/v1/images/{image_id}/download"
async def upload_image(
self,
file: UploadFile,
user: UserModel,
request: Request,
description: Optional[str] = None,
collection_id: Optional[str] = None
) -> ImageResponse:
"""
Upload a new image
Args:
file: The uploaded file
user: The user uploading the image
request: The FastAPI request object for URL generation
description: Optional description for the image
collection_id: Optional collection ID to associate with the image
Returns:
ImageResponse: The created image metadata
Raises:
ValueError: If file validation fails
RuntimeError: If upload fails
"""
# Validate file type
if not file.content_type or not file.content_type.startswith('image/'):
raise ValueError("File must be an image")
# Validate file size (10MB limit)
max_size = 10 * 1024 * 1024 # 10MB
content = await file.read()
if len(content) > max_size:
raise ValueError("File size exceeds 10MB limit")
# Reset file pointer
await file.seek(0)
try:
# Upload to storage
storage_path, content_type, file_size, metadata = await self.storage_service.upload_file(
file, str(user.team_id)
)
# Create image record
image = ImageModel(
filename=file.filename,
original_filename=file.filename,
file_size=file_size,
content_type=content_type,
storage_path=storage_path,
public_url=None, # Will be set after we have the image ID
team_id=user.team_id,
uploader_id=user.id,
description=description,
metadata=metadata,
collection_id=ObjectId(collection_id) if collection_id else None
)
# Save to database
created_image = await image_repository.create(image)
# Generate API download URL now that we have the image ID
api_download_url = self._generate_api_download_url(request, str(created_image.id))
# Update the image with the API download URL
await image_repository.update(created_image.id, {"public_url": api_download_url})
created_image.public_url = api_download_url
# Publish image processing task to Pub/Sub
try:
task_published = await pubsub_service.publish_image_processing_task(
image_id=str(created_image.id),
storage_path=storage_path,
team_id=str(user.team_id)
)
if not task_published:
logger.warning(f"Failed to publish processing task for image {created_image.id}")
except Exception as e:
logger.warning(f"Failed to publish image processing task: {e}")
# Convert to response
return self._convert_to_response(created_image, request)
except Exception as e:
logger.error(f"Error uploading image: {e}")
raise RuntimeError("Failed to upload image")
async def list_images(
self,
user: UserModel,
request: Request,
skip: int = 0,
limit: int = 50,
collection_id: Optional[str] = None
) -> ImageListResponse:
"""
List images for the user's team or all images if user is admin
Args:
user: The requesting user
request: The FastAPI request object for URL generation
skip: Number of records to skip for pagination
limit: Maximum number of records to return
collection_id: Optional filter by collection ID
Returns:
ImageListResponse: List of images with pagination metadata
"""
# Check if user is admin - if so, get all images across all teams
if user.is_admin:
images = await image_repository.get_all_with_pagination(
skip=skip,
limit=limit,
collection_id=ObjectId(collection_id) if collection_id else None,
)
total = await image_repository.count_all(
collection_id=ObjectId(collection_id) if collection_id else None,
)
else:
# Regular users only see images from their team
images = await image_repository.get_by_team(
user.team_id,
skip=skip,
limit=limit,
collection_id=ObjectId(collection_id) if collection_id else None,
)
total = await image_repository.count_by_team(
user.team_id,
collection_id=ObjectId(collection_id) if collection_id else None,
)
# Convert to response
response_images = [self._convert_to_response(image, request) for image in images]
return ImageListResponse(images=response_images, total=total, skip=skip, limit=limit)
async def get_image(self, image_id: str, user: UserModel, request: Request) -> ImageResponse:
"""
Get image metadata by ID
Args:
image_id: The image ID to retrieve
user: The requesting user
request: The FastAPI request object for URL generation
Returns:
ImageResponse: The image metadata
Raises:
ValueError: If image_id is invalid
RuntimeError: If image not found
PermissionError: If user not authorized to access the image
"""
try:
obj_id = ObjectId(image_id)
except Exception:
raise ValueError("Invalid image ID")
# Get image
image = await image_repository.get_by_id(obj_id)
if not image:
raise RuntimeError("Image not found")
# Check team access (admins can access any image)
if not user.is_admin and image.team_id != user.team_id:
raise PermissionError("Not authorized to access this image")
return self._convert_to_response(image, request, include_last_accessed=True)
async def download_image(self, image_id: str, user: UserModel) -> Tuple[bytes, str, str]:
"""
Download image file
Args:
image_id: The image ID to download
user: The requesting user
Returns:
Tuple[bytes, str, str]: File content, content type, and filename
Raises:
ValueError: If image_id is invalid
RuntimeError: If image not found or file not found in storage
PermissionError: If user not authorized to access the image
"""
try:
obj_id = ObjectId(image_id)
except Exception:
raise ValueError("Invalid image ID")
# Get image
image = await image_repository.get_by_id(obj_id)
if not image:
raise RuntimeError("Image not found")
# Check team access (admins can access any image)
if not user.is_admin and image.team_id != user.team_id:
raise PermissionError("Not authorized to access this image")
# Get file from storage
file_content = self.storage_service.get_file(image.storage_path)
if not file_content:
raise RuntimeError("Image file not found in storage")
# Update last accessed
await image_repository.update_last_accessed(obj_id)
return file_content, image.content_type, image.original_filename
async def update_image(
self,
image_id: str,
image_data: ImageUpdate,
user: UserModel,
request: Request
) -> ImageResponse:
"""
Update image metadata
Args:
image_id: The image ID to update
image_data: The update data
user: The requesting user
request: The FastAPI request object for URL generation
Returns:
ImageResponse: The updated image metadata
Raises:
ValueError: If image_id is invalid
RuntimeError: If image not found or update fails
PermissionError: If user not authorized to update the image
"""
try:
obj_id = ObjectId(image_id)
except Exception:
raise ValueError("Invalid image ID")
# Get image
image = await image_repository.get_by_id(obj_id)
if not image:
raise RuntimeError("Image not found")
# Check team access (admins can update any image)
if not user.is_admin and image.team_id != user.team_id:
raise PermissionError("Not authorized to update this image")
# Update image
update_data = image_data.dict(exclude_unset=True)
if not update_data:
# No fields to update
return self._convert_to_response(image, request)
updated_image = await image_repository.update(obj_id, update_data)
if not updated_image:
raise RuntimeError("Failed to update image")
return self._convert_to_response(updated_image, request)
async def delete_image(self, image_id: str, user: UserModel) -> bool:
"""
Delete an image
Args:
image_id: The image ID to delete
user: The requesting user
Returns:
bool: True if successfully deleted
Raises:
ValueError: If image_id is invalid
RuntimeError: If image not found or deletion fails
PermissionError: If user not authorized to delete the image
"""
try:
obj_id = ObjectId(image_id)
except Exception:
raise ValueError("Invalid image ID")
# Get image
image = await image_repository.get_by_id(obj_id)
if not image:
raise RuntimeError("Image not found")
# Check team access (admins can delete any image)
if not user.is_admin and image.team_id != user.team_id:
raise PermissionError("Not authorized to delete this image")
# Delete from storage
try:
self.storage_service.delete_file(image.storage_path)
except Exception as e:
logger.warning(f"Failed to delete file from storage: {e}")
# Delete from vector database if it has embeddings
if image.has_embedding and image.embedding_id:
try:
await self.embedding_service.delete_embedding(image.embedding_id)
except Exception as e:
logger.warning(f"Failed to delete embedding: {e}")
# Delete from database
result = await image_repository.delete(obj_id)
if not result:
raise RuntimeError("Failed to delete image")
return True
def _convert_to_response(
self,
image: ImageModel,
request: Request,
include_last_accessed: bool = False
) -> ImageResponse:
"""Convert ImageModel to ImageResponse"""
api_download_url = self._generate_api_download_url(request, str(image.id))
response_data = {
"id": str(image.id),
"filename": image.filename,
"original_filename": image.original_filename,
"file_size": image.file_size,
"content_type": image.content_type,
"storage_path": image.storage_path,
"public_url": api_download_url,
"team_id": str(image.team_id),
"uploader_id": str(image.uploader_id),
"upload_date": image.upload_date,
"description": image.description,
"metadata": image.metadata,
"has_embedding": image.has_embedding,
"collection_id": str(image.collection_id) if image.collection_id else None
}
if include_last_accessed:
response_data["last_accessed"] = image.last_accessed
return ImageResponse(**response_data)