Source code for litestar_storages.base

"""Base storage protocol and abstract implementation."""

from __future__ import annotations

from abc import ABC, abstractmethod
from collections.abc import AsyncGenerator, AsyncIterator
from datetime import timedelta
from typing import TYPE_CHECKING, Protocol, runtime_checkable

if TYPE_CHECKING:
    from litestar_storages.types import StoredFile

__all__ = ["BaseStorage", "Storage"]


[docs] @runtime_checkable class Storage(Protocol): """Async storage protocol. All storage backends must implement this protocol to ensure consistent behavior across different storage providers. This protocol defines the core interface for async file storage operations, including uploading, downloading, deleting, and managing file metadata. """
[docs] async def put( self, key: str, data: bytes | AsyncGenerator[bytes, None], *, content_type: str | None = None, metadata: dict[str, str] | None = None, ) -> StoredFile: """Store data at the given key. Args: key: Storage path/key for the file. This should be a forward-slash separated path (e.g., "images/photo.jpg"). data: File contents as bytes or async byte stream. For large files, prefer using an async iterator to avoid loading the entire file into memory. content_type: MIME type of the content (e.g., 'image/jpeg', 'text/plain'). If not provided, backends may attempt to infer it from the file extension. metadata: Additional metadata to store with the file. Keys and values must be strings. Backend-specific limits may apply. Returns: StoredFile containing metadata about the stored file, including the actual size, etag, and last modified timestamp. Raises: StorageError: If the upload fails for any reason StoragePermissionError: If lacking permissions to write to this location ConfigurationError: If storage backend is misconfigured """ ...
[docs] def get(self, key: str) -> AsyncIterator[bytes]: """Retrieve file contents as an async byte stream. This method returns an async iterator that yields chunks of the file, allowing for efficient streaming of large files without loading the entire content into memory. Args: key: Storage path/key for the file Yields: Chunks of file data as bytes. Chunk size is backend-dependent. Raises: StorageFileNotFoundError: If the file does not exist StorageError: If the retrieval fails StoragePermissionError: If lacking permissions to read this file """ ...
[docs] async def get_bytes(self, key: str) -> bytes: """Retrieve entire file contents as bytes. This is a convenience method that collects the stream from get() into memory. Use get() for large files to avoid memory pressure. Args: key: Storage path/key for the file Returns: Complete file contents as bytes Raises: StorageFileNotFoundError: If the file does not exist StorageError: If the retrieval fails StoragePermissionError: If lacking permissions to read this file """ ...
[docs] async def delete(self, key: str) -> None: """Delete a file. Args: key: Storage path/key for the file Raises: StorageFileNotFoundError: If the file does not exist (implementation-dependent; some backends may silently succeed for idempotency) StorageError: If the deletion fails StoragePermissionError: If lacking permissions to delete this file """ ...
[docs] async def exists(self, key: str) -> bool: """Check if a file exists. Args: key: Storage path/key for the file Returns: True if the file exists, False otherwise Raises: StorageError: If the check fails (rare; typically returns False) """ ...
[docs] async def list( self, prefix: str = "", *, limit: int | None = None, ) -> AsyncGenerator[StoredFile, None]: """List files with optional prefix filter. Args: prefix: Filter results to keys starting with this prefix. Use empty string to list all files. For hierarchical storage, use forward slashes (e.g., "images/2024/"). limit: Maximum number of results to return. If None, returns all matching files. Note: backends may have their own internal limits. Yields: StoredFile metadata for each matching file, typically in lexicographical order by key. Raises: StorageError: If the listing operation fails StoragePermissionError: If lacking permissions to list files """ ...
[docs] async def url( self, key: str, *, expires_in: timedelta | None = None, ) -> str: """Generate a URL for accessing the file. For cloud backends, this typically generates a presigned URL that grants temporary access to the file. For filesystem backends, this returns a path or configured base URL. Args: key: Storage path/key for the file expires_in: Optional expiration time for signed URLs. If None, uses backend's default expiration (typically 1 hour). For filesystem backends, this parameter may be ignored. Returns: URL string for accessing the file. This may be: - A presigned URL with embedded credentials (cloud backends) - A public URL if the file is publicly accessible - A file:// URL or relative path (filesystem backend) Raises: StorageFileNotFoundError: If the file does not exist (implementation-dependent) StorageError: If URL generation fails StoragePermissionError: If lacking permissions to generate URLs Note: URLs may become invalid after the expiration time or if the file is deleted. Clients should handle 404/403 responses gracefully. """ ...
[docs] async def copy( self, source: str, destination: str, ) -> StoredFile: """Copy a file within the storage backend. This operation should be atomic where possible. Some backends can perform server-side copies without downloading and re-uploading data. Args: source: Source key to copy from destination: Destination key to copy to. If this key already exists, it will be overwritten. Returns: StoredFile metadata for the new copy at the destination Raises: StorageFileNotFoundError: If the source file does not exist StorageError: If the copy operation fails StoragePermissionError: If lacking permissions for the operation """ ...
[docs] async def move( self, source: str, destination: str, ) -> StoredFile: """Move/rename a file within the storage backend. This operation should be atomic where possible. Default implementation performs a copy followed by delete. Args: source: Source key to move from destination: Destination key to move to. If this key already exists, it will be overwritten. Returns: StoredFile metadata for the file at the new destination Raises: StorageFileNotFoundError: If the source file does not exist StorageError: If the move operation fails StoragePermissionError: If lacking permissions for the operation """ ...
[docs] async def info(self, key: str) -> StoredFile: """Get metadata about a file without downloading it. This is useful for checking file size, content type, and other metadata without incurring the cost of downloading the file. Args: key: Storage path/key for the file Returns: StoredFile with metadata including size, content_type, etag, last_modified, and any custom metadata Raises: StorageFileNotFoundError: If the file does not exist StorageError: If the metadata retrieval fails StoragePermissionError: If lacking permissions to access file metadata """ ...
[docs] async def close(self) -> None: """Close the storage backend and release resources. This method should be called when the storage is no longer needed, typically during application shutdown. It allows backends to clean up resources like HTTP sessions, connection pools, or file handles. For backends that don't require cleanup, this method is a no-op. Note: After calling close(), the storage instance should not be used. Some backends may raise errors if operations are attempted after close(). """ ...
[docs] class BaseStorage(ABC): """Abstract base class providing common functionality for storage backends. Backends can inherit from this to get default implementations of convenience methods while only implementing core abstract operations. Subclasses must implement: - put() - get() - delete() - exists() - list() - url() - info() This class provides default implementations for: - get_bytes() - collects stream into memory - copy() - downloads and re-uploads - move() - copies then deletes """
[docs] @abstractmethod async def put( self, key: str, data: bytes | AsyncGenerator[bytes, None], *, content_type: str | None = None, metadata: dict[str, str] | None = None, ) -> StoredFile: """Store data at the given key. Must be implemented by subclasses."""
[docs] @abstractmethod def get(self, key: str) -> AsyncIterator[bytes]: """Retrieve file as async byte stream. Must be implemented by subclasses."""
[docs] @abstractmethod async def delete(self, key: str) -> None: """Delete a file. Must be implemented by subclasses."""
[docs] @abstractmethod async def exists(self, key: str) -> bool: """Check if file exists. Must be implemented by subclasses."""
[docs] @abstractmethod async def list( self, prefix: str = "", *, limit: int | None = None, ) -> AsyncGenerator[StoredFile, None]: """List files with prefix filter. Must be implemented by subclasses."""
[docs] @abstractmethod async def url( self, key: str, *, expires_in: timedelta | None = None, ) -> str: """Generate URL for file access. Must be implemented by subclasses."""
[docs] @abstractmethod async def info(self, key: str) -> StoredFile: """Get file metadata. Must be implemented by subclasses."""
# Default implementations below
[docs] async def get_bytes(self, key: str) -> bytes: """Default implementation: collect stream into bytes. This method gathers all chunks from get() and joins them into a single bytes object. For large files, prefer using get() directly to stream the content. Args: key: Storage path/key for the file Returns: Complete file contents as bytes Raises: StorageFileNotFoundError: If the file does not exist StorageError: If the retrieval fails """ chunks = [] async for chunk in self.get(key): chunks.append(chunk) return b"".join(chunks)
[docs] async def copy(self, source: str, destination: str) -> StoredFile: """Default implementation: download and re-upload. This implementation downloads the source file completely into memory, then uploads it to the destination. Backends that support server-side copy should override this method for better performance. Args: source: Source key to copy from destination: Destination key to copy to Returns: StoredFile metadata for the new copy Raises: StorageFileNotFoundError: If source does not exist StorageError: If the operation fails """ info = await self.info(source) data = await self.get_bytes(source) return await self.put( destination, data, content_type=info.content_type, metadata=info.metadata, )
[docs] async def move(self, source: str, destination: str) -> StoredFile: """Default implementation: copy then delete. This implementation performs a copy operation followed by deletion of the source. Backends that support atomic move/rename should override this method. Args: source: Source key to move from destination: Destination key to move to Returns: StoredFile metadata for the moved file Raises: StorageFileNotFoundError: If source does not exist StorageError: If the operation fails Note: This operation is not atomic in the default implementation. If deletion fails after a successful copy, the source file will remain. """ result = await self.copy(source, destination) await self.delete(source) return result
[docs] async def close(self) -> None: # noqa: B027 """Default implementation: no-op. Subclasses that manage resources (HTTP sessions, connection pools, etc.) should override this method to properly release them. """