Image Processing Pipeline

Resize and compress images on upload, generating thumbnails and optimized variants. This recipe shows how to integrate Pillow with litestar-storages for a complete image processing workflow.

Prerequisites

  • Python 3.9+

  • litestar-storages installed (pip install litestar-storages)

  • Pillow for image processing (pip install Pillow)

  • For Litestar examples: pip install litestar-storages[litestar]

The Problem

Raw image uploads from users are often:

  • Too large for efficient delivery (multi-megabyte photos)

  • Wrong dimensions for your UI (need thumbnails)

  • Unoptimized format (BMP when WebP would be smaller)

  • Missing responsive variants (mobile vs desktop)

Processing images on upload creates optimized variants once, reducing bandwidth and improving user experience.

Solution

Image Processor Class

Create a reusable image processor that generates multiple variants:

"""Image processing utilities for upload pipelines."""

from __future__ import annotations

import io
from dataclasses import dataclass, field
from enum import Enum
from typing import NamedTuple

from PIL import Image


class ImageFormat(str, Enum):
    """Supported output image formats."""

    JPEG = "JPEG"
    PNG = "PNG"
    WEBP = "WEBP"
    GIF = "GIF"


class ImageVariant(NamedTuple):
    """Represents a processed image variant."""

    name: str           # e.g., "thumbnail", "medium", "original"
    data: bytes         # Processed image bytes
    width: int          # Actual width after processing
    height: int         # Actual height after processing
    format: ImageFormat # Output format
    content_type: str   # MIME type


@dataclass
class VariantConfig:
    """Configuration for an image variant.

    Attributes:
        name: Variant identifier (used in storage key)
        max_width: Maximum width in pixels (None for no limit)
        max_height: Maximum height in pixels (None for no limit)
        quality: JPEG/WebP quality (1-100)
        format: Output format (None to preserve original)
    """

    name: str
    max_width: int | None = None
    max_height: int | None = None
    quality: int = 85
    format: ImageFormat | None = None


@dataclass
class ImageProcessor:
    """Process images into multiple variants.

    Attributes:
        variants: List of variant configurations to generate
        preserve_original: Whether to keep the unmodified original
        strip_metadata: Remove EXIF and other metadata for privacy
        background_color: Color for transparency replacement (JPEG)
    """

    variants: list[VariantConfig] = field(default_factory=lambda: [
        VariantConfig(name="thumbnail", max_width=150, max_height=150, quality=80),
        VariantConfig(name="medium", max_width=800, max_height=800, quality=85),
        VariantConfig(name="large", max_width=1920, max_height=1920, quality=90),
    ])
    preserve_original: bool = True
    strip_metadata: bool = True
    background_color: tuple[int, int, int] = (255, 255, 255)  # White

    def _get_content_type(self, fmt: ImageFormat) -> str:
        """Get MIME type for format."""
        return {
            ImageFormat.JPEG: "image/jpeg",
            ImageFormat.PNG: "image/png",
            ImageFormat.WEBP: "image/webp",
            ImageFormat.GIF: "image/gif",
        }[fmt]

    def _detect_format(self, img: Image.Image) -> ImageFormat:
        """Detect the original image format."""
        fmt = img.format or "JPEG"
        try:
            return ImageFormat(fmt.upper())
        except ValueError:
            return ImageFormat.JPEG

    def _prepare_for_save(
        self,
        img: Image.Image,
        target_format: ImageFormat,
    ) -> Image.Image:
        """Prepare image for saving in target format.

        Handles mode conversion (e.g., RGBA to RGB for JPEG).
        """
        if target_format == ImageFormat.JPEG:
            # JPEG doesn't support transparency
            if img.mode in ("RGBA", "LA", "P"):
                # Create white background
                background = Image.new("RGB", img.size, self.background_color)
                if img.mode == "P":
                    img = img.convert("RGBA")
                background.paste(img, mask=img.split()[-1])  # Use alpha as mask
                return background
            elif img.mode != "RGB":
                return img.convert("RGB")

        elif target_format == ImageFormat.PNG:
            if img.mode not in ("RGB", "RGBA", "L", "LA", "P"):
                return img.convert("RGBA")

        elif target_format == ImageFormat.WEBP:
            if img.mode not in ("RGB", "RGBA"):
                return img.convert("RGBA")

        return img

    def _resize_image(
        self,
        img: Image.Image,
        max_width: int | None,
        max_height: int | None,
    ) -> Image.Image:
        """Resize image maintaining aspect ratio.

        Uses high-quality Lanczos resampling.
        """
        if max_width is None and max_height is None:
            return img

        original_width, original_height = img.size

        # Calculate new dimensions maintaining aspect ratio
        if max_width and max_height:
            # Fit within both constraints
            ratio = min(
                max_width / original_width,
                max_height / original_height,
            )
        elif max_width:
            ratio = max_width / original_width
        else:
            ratio = max_height / original_height  # type: ignore

        # Don't upscale
        if ratio >= 1:
            return img

        new_width = int(original_width * ratio)
        new_height = int(original_height * ratio)

        return img.resize((new_width, new_height), Image.Resampling.LANCZOS)

    def _save_image(
        self,
        img: Image.Image,
        fmt: ImageFormat,
        quality: int,
    ) -> bytes:
        """Save image to bytes with specified format and quality."""
        buffer = io.BytesIO()

        # Prepare image for target format
        img = self._prepare_for_save(img, fmt)

        # Build save kwargs
        save_kwargs: dict = {"format": fmt.value}

        if fmt in (ImageFormat.JPEG, ImageFormat.WEBP):
            save_kwargs["quality"] = quality
            save_kwargs["optimize"] = True

        if fmt == ImageFormat.JPEG:
            save_kwargs["progressive"] = True

        if fmt == ImageFormat.PNG:
            save_kwargs["optimize"] = True

        if fmt == ImageFormat.WEBP:
            save_kwargs["method"] = 6  # Best compression

        img.save(buffer, **save_kwargs)
        return buffer.getvalue()

    def process(self, data: bytes) -> list[ImageVariant]:
        """Process image into configured variants.

        Args:
            data: Raw image bytes

        Returns:
            List of processed image variants

        Raises:
            ValueError: If image cannot be processed
        """
        try:
            img = Image.open(io.BytesIO(data))
        except Exception as e:
            raise ValueError(f"Invalid image data: {e}") from e

        # Detect original format
        original_format = self._detect_format(img)

        # Strip metadata if requested
        if self.strip_metadata:
            # Create a clean copy without metadata
            clean_img = Image.new(img.mode, img.size)
            clean_img.putdata(list(img.getdata()))
            img = clean_img

        variants: list[ImageVariant] = []

        # Optionally preserve original
        if self.preserve_original:
            variants.append(ImageVariant(
                name="original",
                data=data,  # Keep original bytes
                width=img.size[0],
                height=img.size[1],
                format=original_format,
                content_type=self._get_content_type(original_format),
            ))

        # Generate configured variants
        for config in self.variants:
            # Resize
            resized = self._resize_image(img, config.max_width, config.max_height)

            # Determine output format
            output_format = config.format or original_format

            # Save to bytes
            processed_data = self._save_image(resized, output_format, config.quality)

            variants.append(ImageVariant(
                name=config.name,
                data=processed_data,
                width=resized.size[0],
                height=resized.size[1],
                format=output_format,
                content_type=self._get_content_type(output_format),
            ))

        return variants

Framework-Agnostic Usage

Process and store image variants with any storage backend:

"""Framework-agnostic image processing pipeline."""

import asyncio
from dataclasses import dataclass
from pathlib import Path
import uuid

from litestar_storages import (
    FileSystemStorage,
    FileSystemConfig,
    StoredFile,
)

# Assume ImageProcessor and ImageVariant are imported from above


@dataclass
class ProcessedImage:
    """Result of processing and storing an image."""

    id: str  # Unique identifier for all variants
    variants: dict[str, StoredFile]  # variant_name -> StoredFile


async def process_and_store(
    storage: FileSystemStorage,
    processor: ImageProcessor,
    data: bytes,
    base_key: str | None = None,
) -> ProcessedImage:
    """Process an image and store all variants.

    Args:
        storage: Storage backend
        processor: Image processor with variant configs
        data: Raw image bytes
        base_key: Optional base key (defaults to UUID)

    Returns:
        ProcessedImage with all stored variants
    """
    # Generate unique ID
    image_id = base_key or str(uuid.uuid4())

    # Process image
    variants = processor.process(data)

    # Store each variant
    stored: dict[str, StoredFile] = {}

    for variant in variants:
        # Generate key: images/{id}/{variant_name}.{ext}
        ext = {
            "JPEG": "jpg",
            "PNG": "png",
            "WEBP": "webp",
            "GIF": "gif",
        }[variant.format.value]

        key = f"images/{image_id}/{variant.name}.{ext}"

        stored[variant.name] = await storage.put(
            key=key,
            data=variant.data,
            content_type=variant.content_type,
            metadata={
                "width": str(variant.width),
                "height": str(variant.height),
                "variant": variant.name,
            },
        )

    return ProcessedImage(id=image_id, variants=stored)


async def get_variant_url(
    storage: FileSystemStorage,
    image_id: str,
    variant: str = "medium",
) -> str | None:
    """Get URL for a specific image variant.

    Args:
        storage: Storage backend
        image_id: Image identifier
        variant: Variant name (thumbnail, medium, large, original)

    Returns:
        URL if variant exists, None otherwise
    """
    # Try common extensions
    for ext in ["jpg", "png", "webp", "gif"]:
        key = f"images/{image_id}/{variant}.{ext}"
        if await storage.exists(key):
            return await storage.url(key)

    return None


async def main() -> None:
    """Example usage."""
    # Configure storage
    storage = FileSystemStorage(
        config=FileSystemConfig(
            path=Path("./uploads"),
            base_url="https://cdn.example.com",
            create_dirs=True,
        )
    )

    # Configure processor with WebP output for smaller files
    processor = ImageProcessor(
        variants=[
            VariantConfig(
                name="thumbnail",
                max_width=150,
                max_height=150,
                quality=80,
                format=ImageFormat.WEBP,
            ),
            VariantConfig(
                name="preview",
                max_width=400,
                max_height=400,
                quality=85,
                format=ImageFormat.WEBP,
            ),
            VariantConfig(
                name="full",
                max_width=1920,
                max_height=1080,
                quality=90,
            ),  # Preserve original format
        ],
        preserve_original=True,
        strip_metadata=True,
    )

    # Load a test image (in real usage, from upload)
    test_image_path = Path("test_image.jpg")
    if test_image_path.exists():
        image_data = test_image_path.read_bytes()
    else:
        # Create a simple test image with Pillow
        from PIL import Image
        import io

        img = Image.new("RGB", (800, 600), color="blue")
        buffer = io.BytesIO()
        img.save(buffer, format="JPEG")
        image_data = buffer.getvalue()

    # Process and store
    result = await process_and_store(storage, processor, image_data)

    print(f"Image ID: {result.id}")
    print("Variants:")
    for name, stored in result.variants.items():
        url = await storage.url(stored.key)
        print(f"  {name}: {stored.size} bytes - {url}")


if __name__ == "__main__":
    asyncio.run(main())

With Litestar

Build a complete image upload API with Litestar:

"""Litestar application with image processing pipeline."""

from __future__ import annotations

from dataclasses import dataclass
from pathlib import Path
from typing import Annotated
import uuid

from litestar import Litestar, post, get, Response
from litestar.datastructures import UploadFile
from litestar.di import Provide
from litestar.exceptions import ClientException, NotFoundException
from litestar.status_codes import HTTP_400_BAD_REQUEST

from litestar_storages import (
    S3Storage,
    S3Config,
    Storage,
    StorageFileNotFoundError,
)
from litestar_storages.contrib.plugin import StoragePlugin

# Assume ImageProcessor, VariantConfig, ImageFormat are imported


# Response DTOs
@dataclass
class ImageVariantResponse:
    """Single variant info."""

    name: str
    url: str
    width: int
    height: int
    size: int


@dataclass
class UploadImageResponse:
    """Response for image upload."""

    id: str
    variants: list[ImageVariantResponse]


@dataclass
class ImageInfoResponse:
    """Response for image info lookup."""

    id: str
    variants: list[ImageVariantResponse]


# Dependency providers
def provide_processor() -> ImageProcessor:
    """Provide configured image processor."""
    return ImageProcessor(
        variants=[
            VariantConfig(
                name="thumb",
                max_width=150,
                max_height=150,
                quality=80,
                format=ImageFormat.WEBP,
            ),
            VariantConfig(
                name="medium",
                max_width=600,
                max_height=600,
                quality=85,
                format=ImageFormat.WEBP,
            ),
            VariantConfig(
                name="large",
                max_width=1200,
                max_height=1200,
                quality=90,
            ),
        ],
        preserve_original=True,
        strip_metadata=True,
    )


# Validation
ALLOWED_TYPES = {"image/jpeg", "image/png", "image/gif", "image/webp"}
MAX_SIZE = 20 * 1024 * 1024  # 20MB


def validate_image_upload(content_type: str | None, size: int) -> None:
    """Validate image upload before processing."""
    if content_type and content_type not in ALLOWED_TYPES:
        raise ClientException(
            detail=f"Invalid image type. Allowed: {', '.join(ALLOWED_TYPES)}",
            status_code=HTTP_400_BAD_REQUEST,
        )

    if size > MAX_SIZE:
        raise ClientException(
            detail=f"Image too large. Maximum: {MAX_SIZE // (1024 * 1024)}MB",
            status_code=HTTP_400_BAD_REQUEST,
        )


# Route handlers
@post("/images")
async def upload_image(
    data: UploadFile,
    storage: Storage,
    processor: Annotated[ImageProcessor, Provide(provide_processor)],
) -> UploadImageResponse:
    """Upload and process an image.

    Creates multiple optimized variants:
    - thumb: 150x150 WebP thumbnail
    - medium: 600x600 WebP preview
    - large: 1200x1200 original format
    - original: Unmodified upload
    """
    content = await data.read()

    # Validate
    validate_image_upload(data.content_type, len(content))

    # Process
    try:
        variants = processor.process(content)
    except ValueError as e:
        raise ClientException(
            detail=str(e),
            status_code=HTTP_400_BAD_REQUEST,
        ) from e

    # Generate unique ID
    image_id = str(uuid.uuid4())

    # Store variants and collect responses
    variant_responses: list[ImageVariantResponse] = []

    for variant in variants:
        ext = variant.format.value.lower()
        if ext == "jpeg":
            ext = "jpg"

        key = f"images/{image_id}/{variant.name}.{ext}"

        stored = await storage.put(
            key=key,
            data=variant.data,
            content_type=variant.content_type,
            metadata={
                "width": str(variant.width),
                "height": str(variant.height),
            },
        )

        url = await storage.url(key)

        variant_responses.append(ImageVariantResponse(
            name=variant.name,
            url=url,
            width=variant.width,
            height=variant.height,
            size=stored.size,
        ))

    return UploadImageResponse(id=image_id, variants=variant_responses)


@get("/images/{image_id:str}")
async def get_image_info(
    image_id: str,
    storage: Storage,
) -> ImageInfoResponse:
    """Get info about an uploaded image and its variants."""
    variants: list[ImageVariantResponse] = []

    # Check for known variant names
    variant_names = ["thumb", "medium", "large", "original"]
    extensions = ["webp", "jpg", "png", "gif"]

    found_any = False

    for name in variant_names:
        for ext in extensions:
            key = f"images/{image_id}/{name}.{ext}"
            try:
                info = await storage.info(key)
                url = await storage.url(key)

                # Parse dimensions from metadata
                width = int(info.metadata.get("width", 0))
                height = int(info.metadata.get("height", 0))

                variants.append(ImageVariantResponse(
                    name=name,
                    url=url,
                    width=width,
                    height=height,
                    size=info.size,
                ))
                found_any = True
                break  # Found this variant, move to next
            except StorageFileNotFoundError:
                continue

    if not found_any:
        raise NotFoundException(detail=f"Image not found: {image_id}")

    return ImageInfoResponse(id=image_id, variants=variants)


@get("/images/{image_id:str}/{variant:str}")
async def get_image_variant(
    image_id: str,
    variant: str,
    storage: Storage,
) -> Response:
    """Redirect to a specific image variant.

    Args:
        image_id: Image identifier
        variant: Variant name (thumb, medium, large, original)
    """
    extensions = ["webp", "jpg", "png", "gif"]

    for ext in extensions:
        key = f"images/{image_id}/{variant}.{ext}"
        if await storage.exists(key):
            url = await storage.url(key)
            return Response(
                content=None,
                status_code=302,
                headers={"Location": url},
            )

    raise NotFoundException(detail=f"Variant not found: {variant}")


# Application setup
def create_app() -> Litestar:
    """Create Litestar application with storage plugin."""
    # Configure storage (use environment variables in production)
    import os

    if os.getenv("USE_S3"):
        storage: Storage = S3Storage(
            config=S3Config(
                bucket=os.getenv("S3_BUCKET", "images"),
                region=os.getenv("AWS_REGION", "us-east-1"),
            )
        )
    else:
        from litestar_storages import FileSystemStorage, FileSystemConfig
        storage = FileSystemStorage(
            config=FileSystemConfig(
                path=Path("./uploads"),
                base_url="/files",
                create_dirs=True,
            )
        )

    return Litestar(
        route_handlers=[upload_image, get_image_info, get_image_variant],
        plugins=[StoragePlugin(default=storage)],
    )


app = create_app()

Async Processing with Background Tasks

For high-volume applications, process images in the background:

"""Background image processing with task queue."""

import asyncio
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager
from dataclasses import dataclass
from pathlib import Path
from typing import Any
import uuid

from litestar import Litestar, post
from litestar.datastructures import UploadFile
from litestar.response import Response

from litestar_storages import (
    MemoryStorage,
    MemoryConfig,
    Storage,
    StoredFile,
)
from litestar_storages.contrib.plugin import StoragePlugin

# Assume ImageProcessor is imported


@dataclass
class ProcessingJob:
    """Represents an image processing job."""

    id: str
    raw_key: str
    status: str  # pending, processing, complete, failed
    error: str | None = None


# Simple in-memory job queue (use Redis/RabbitMQ in production)
_jobs: dict[str, ProcessingJob] = {}
_queue: asyncio.Queue[str] = asyncio.Queue()


async def process_image_job(
    storage: Storage,
    processor: ImageProcessor,
    job_id: str,
) -> None:
    """Process a single image job."""
    job = _jobs.get(job_id)
    if not job:
        return

    job.status = "processing"

    try:
        # Get raw image
        data = await storage.get_bytes(job.raw_key)

        # Process variants
        variants = processor.process(data)

        # Store variants
        for variant in variants:
            ext = variant.format.value.lower()
            if ext == "jpeg":
                ext = "jpg"

            key = f"processed/{job_id}/{variant.name}.{ext}"
            await storage.put(
                key=key,
                data=variant.data,
                content_type=variant.content_type,
            )

        # Clean up raw upload
        await storage.delete(job.raw_key)

        job.status = "complete"

    except Exception as e:
        job.status = "failed"
        job.error = str(e)


async def worker(storage: Storage, processor: ImageProcessor) -> None:
    """Background worker that processes image jobs."""
    while True:
        job_id = await _queue.get()
        try:
            await process_image_job(storage, processor, job_id)
        except Exception:
            pass  # Error already recorded in job
        finally:
            _queue.task_done()


@asynccontextmanager
async def lifespan(app: Litestar) -> AsyncIterator[None]:
    """Manage background worker lifecycle."""
    storage = app.state["storage"]
    processor = ImageProcessor()

    # Start worker
    task = asyncio.create_task(worker(storage, processor))

    yield

    # Shutdown
    task.cancel()
    try:
        await task
    except asyncio.CancelledError:
        pass


@post("/images/async")
async def upload_image_async(
    data: UploadFile,
    storage: Storage,
) -> dict[str, Any]:
    """Upload image for background processing.

    Returns immediately with job ID for status polling.
    """
    content = await data.read()
    job_id = str(uuid.uuid4())

    # Store raw upload
    raw_key = f"raw/{job_id}"
    await storage.put(key=raw_key, data=content, content_type=data.content_type)

    # Create job
    job = ProcessingJob(id=job_id, raw_key=raw_key, status="pending")
    _jobs[job_id] = job

    # Queue for processing
    await _queue.put(job_id)

    return {
        "job_id": job_id,
        "status": "pending",
        "poll_url": f"/images/jobs/{job_id}",
    }

Key Points

  • Process once, serve forever: Generate all needed variants at upload time

  • Use appropriate formats: WebP for web delivery, JPEG for photos, PNG for graphics

  • Strip metadata: Remove EXIF data for privacy (unless explicitly needed)

  • Set quality levels: Balance file size vs visual quality (80-90 is usually fine)

  • Handle failures gracefully: Keep originals until processing succeeds

  • Consider background processing: For high-volume applications, use task queues

Performance Tips

  1. Use WebP: 25-35% smaller than JPEG at equivalent quality

  2. Progressive JPEG: Better perceived loading for larger images

  3. Lazy loading: Process variants on-demand for rarely accessed images

  4. CDN integration: Cache processed images at the edge

  5. Memory limits: Process large images in chunks or use separate workers