image_management_api/src/services/image_service.py

360 lines
13 KiB
Python

import logging
import os
from typing import Optional, List, Tuple
from datetime import datetime
from fastapi import UploadFile, Request
from bson import ObjectId
from src.models.image import ImageModel
from src.models.user import UserModel
from src.schemas.image import ImageResponse, ImageListResponse
from src.db.repositories.image_repository import image_repository
from src.services.storage import StorageService
from src.services.pubsub_service import pubsub_service
from src.utils.authorization import require_team_access, get_team_filter, AuthorizationError
logger = logging.getLogger(__name__)
class ImageService:
"""Service class for handling image-related business logic"""
def __init__(self):
self.storage_service = StorageService()
def _generate_api_download_url(self, request: Request, image_id: str) -> str:
"""Generate API download URL for an image"""
return f"{request.url.scheme}://{request.url.netloc}/api/v1/images/{image_id}/download"
async def upload_image(
self,
file: UploadFile,
user: UserModel,
request: Request,
description: Optional[str] = None,
collection_id: Optional[str] = None
) -> ImageResponse:
"""
Upload and process an image
Args:
file: The uploaded file
user: The user uploading the image
request: The FastAPI request object for URL generation
description: Optional description for the image
collection_id: Optional collection ID to associate with the image
Returns:
ImageResponse: The uploaded image metadata
Raises:
ValueError: If file is invalid
RuntimeError: If upload or processing fails
"""
# Validate file
if not file.filename:
raise ValueError("No filename provided")
if not file.content_type or not file.content_type.startswith('image/'):
raise ValueError("File must be an image")
# Read file content
file_content = await file.read()
if not file_content:
raise ValueError("Empty file")
# Generate storage path
file_extension = os.path.splitext(file.filename)[1]
storage_filename = f"{ObjectId()}{file_extension}"
storage_path = f"images/{user.team_id}/{storage_filename}"
# Store file
try:
self.storage_service.store_file(storage_path, file_content, file.content_type)
except Exception as e:
logger.error(f"Failed to store file: {e}")
raise RuntimeError("Failed to store image file")
# Create image record
image_data = {
"filename": storage_filename,
"original_filename": file.filename,
"file_size": len(file_content),
"content_type": file.content_type,
"storage_path": storage_path,
"team_id": user.team_id,
"uploader_id": user.id,
"upload_date": datetime.utcnow(),
"description": description,
"metadata": {},
"has_embedding": False,
"collection_id": ObjectId(collection_id) if collection_id else None
}
try:
# Create ImageModel instance first
image_model = ImageModel(**image_data)
image = await image_repository.create(image_model)
except Exception as e:
# Clean up stored file if database creation fails
try:
self.storage_service.delete_file(storage_path)
except:
pass
logger.error(f"Failed to create image record: {e}")
raise RuntimeError("Failed to create image record")
# Publish to pub/sub for asynchronous processing
try:
await pubsub_service.publish_image_processing_task(
image_id=str(image.id),
storage_path=storage_path,
team_id=str(user.team_id)
)
except Exception as e:
logger.warning(f"Failed to publish embedding generation task: {e}")
return self._convert_to_response(image, request)
async def list_images(
self,
user: UserModel,
request: Request,
skip: int = 0,
limit: int = 50,
collection_id: Optional[str] = None
) -> ImageListResponse:
"""
List images with team-based filtering
Args:
user: The requesting user
request: The FastAPI request object for URL generation
skip: Number of records to skip
limit: Maximum number of records to return
collection_id: Optional collection filter
Returns:
ImageListResponse: List of images with metadata
"""
# Apply team filtering based on user permissions
team_filter = get_team_filter(user)
# Convert collection_id to ObjectId if provided
collection_obj_id = ObjectId(collection_id) if collection_id else None
# Get images based on user permissions
if team_filter:
# Regular user - filter by team
team_obj_id = ObjectId(team_filter)
images = await image_repository.get_by_team(team_obj_id, skip, limit, collection_obj_id)
total = await image_repository.count_by_team(team_obj_id, collection_obj_id)
else:
# Admin user - can see all images
images = await image_repository.get_all_with_pagination(skip, limit, collection_obj_id)
total = await image_repository.count_all(collection_obj_id)
# Convert to responses
image_responses = [
self._convert_to_response(image, request)
for image in images
]
return ImageListResponse(
images=image_responses,
total=total,
skip=skip,
limit=limit
)
async def get_image(self, image_id: str, user: UserModel, request: Request) -> ImageResponse:
"""
Get image metadata by ID with authorization check
Args:
image_id: The image ID to retrieve
user: The requesting user
request: The FastAPI request object for URL generation
Returns:
ImageResponse: The image metadata
Raises:
ValueError: If image_id is invalid
RuntimeError: If image not found
AuthorizationError: If user not authorized to access the image
"""
try:
obj_id = ObjectId(image_id)
except Exception:
raise ValueError("Invalid image ID")
# Get image
image = await image_repository.get_by_id(obj_id)
if not image:
raise RuntimeError("Image not found")
# Centralized team access check
require_team_access(user, str(image.team_id), "image", "access")
return self._convert_to_response(image, request, include_last_accessed=True)
async def download_image(self, image_id: str, user: UserModel) -> Tuple[bytes, str, str]:
"""
Download image file with authorization check
Args:
image_id: The image ID to download
user: The requesting user
Returns:
Tuple[bytes, str, str]: File content, content type, and filename
Raises:
ValueError: If image_id is invalid
RuntimeError: If image not found or file not found in storage
AuthorizationError: If user not authorized to access the image
"""
try:
obj_id = ObjectId(image_id)
except Exception:
raise ValueError("Invalid image ID")
# Get image
image = await image_repository.get_by_id(obj_id)
if not image:
raise RuntimeError("Image not found")
# Centralized team access check
require_team_access(user, str(image.team_id), "image", "download")
# Get file from storage
file_content = self.storage_service.get_file(image.storage_path)
if not file_content:
raise RuntimeError("Image file not found in storage")
# Update last accessed
await image_repository.update_last_accessed(obj_id)
return file_content, image.content_type, image.original_filename
async def update_image(
self,
image_id: str,
image_data,
user: UserModel,
request: Request
) -> ImageResponse:
"""
Update image metadata with authorization check
Args:
image_id: The image ID to update
image_data: The update data
user: The requesting user
request: The FastAPI request object for URL generation
Returns:
ImageResponse: The updated image metadata
Raises:
ValueError: If image_id is invalid
RuntimeError: If image not found or update fails
AuthorizationError: If user not authorized to update the image
"""
try:
obj_id = ObjectId(image_id)
except Exception:
raise ValueError("Invalid image ID")
# Get image
image = await image_repository.get_by_id(obj_id)
if not image:
raise RuntimeError("Image not found")
# Centralized team access check
require_team_access(user, str(image.team_id), "image", "update")
# Update image
update_data = image_data.dict(exclude_unset=True)
if not update_data:
# No fields to update
return self._convert_to_response(image, request)
updated_image = await image_repository.update(obj_id, update_data)
if not updated_image:
raise RuntimeError("Failed to update image")
return self._convert_to_response(updated_image, request)
async def delete_image(self, image_id: str, user: UserModel) -> bool:
"""
Delete an image with authorization check
Args:
image_id: The image ID to delete
user: The requesting user
Returns:
bool: True if successfully deleted
Raises:
ValueError: If image_id is invalid
RuntimeError: If image not found or deletion fails
AuthorizationError: If user not authorized to delete the image
"""
try:
obj_id = ObjectId(image_id)
except Exception:
raise ValueError("Invalid image ID")
# Get image
image = await image_repository.get_by_id(obj_id)
if not image:
raise RuntimeError("Image not found")
# Centralized team access check
require_team_access(user, str(image.team_id), "image", "delete")
# Delete from storage
try:
self.storage_service.delete_file(image.storage_path)
except Exception as e:
logger.warning(f"Failed to delete file from storage: {e}")
# Delete from database
result = await image_repository.delete(obj_id)
if not result:
raise RuntimeError("Failed to delete image")
return True
def _convert_to_response(
self,
image: ImageModel,
request: Request,
include_last_accessed: bool = False
) -> ImageResponse:
"""Convert ImageModel to ImageResponse"""
api_download_url = self._generate_api_download_url(request, str(image.id))
response_data = {
"id": str(image.id),
"filename": image.filename,
"original_filename": image.original_filename,
"file_size": image.file_size,
"content_type": image.content_type,
"storage_path": image.storage_path,
"public_url": api_download_url,
"team_id": str(image.team_id),
"uploader_id": str(image.uploader_id),
"upload_date": image.upload_date,
"description": image.description,
"metadata": image.metadata,
"has_embedding": image.has_embedding,
"collection_id": str(image.collection_id) if image.collection_id else None
}
if include_last_accessed:
response_data["last_accessed"] = image.last_accessed
return ImageResponse(**response_data)