"""Google Cloud Storage backend."""
from __future__ import annotations
from collections.abc import AsyncGenerator, AsyncIterator
from dataclasses import dataclass, field
from datetime import datetime, timedelta, timezone
from typing import Any
from litestar_storages.base import BaseStorage
from litestar_storages.exceptions import (
ConfigurationError,
StorageConnectionError,
StorageFileNotFoundError,
)
from litestar_storages.types import MultipartUpload, ProgressCallback, ProgressInfo, StoredFile
__all__ = ("GCSConfig", "GCSStorage")
[docs]
@dataclass
class GCSConfig:
"""Configuration for Google Cloud Storage.
Supports authentication via:
- Service account JSON file (service_file)
- Application Default Credentials (ADC) - automatic when running on GCP
- Explicit token (for testing/special cases)
Attributes:
bucket: GCS bucket name
project: GCP project ID (required for some operations)
service_file: Path to service account JSON file
prefix: Key prefix for all operations (e.g., "uploads/")
presigned_expiry: Default expiration time for signed URLs
api_root: Custom API endpoint (for emulators)
"""
bucket: str
project: str | None = None
service_file: str | None = None
prefix: str = ""
presigned_expiry: timedelta = field(default_factory=lambda: timedelta(hours=1))
api_root: str | None = None
[docs]
class GCSStorage(BaseStorage):
"""Google Cloud Storage backend.
Uses gcloud-aio-storage for async GCS operations.
Example:
>>> # Using Application Default Credentials
>>> storage = GCSStorage(
... config=GCSConfig(
... bucket="my-bucket",
... project="my-project",
... )
... )
>>> # Using service account
>>> storage = GCSStorage(
... config=GCSConfig(
... bucket="my-bucket",
... service_file="/path/to/service-account.json",
... )
... )
>>> # Using emulator (fake-gcs-server)
>>> storage = GCSStorage(
... config=GCSConfig(
... bucket="test-bucket",
... api_root="http://localhost:4443",
... )
... )
Note:
The client is lazily initialized on first use. When running on GCP
(GCE, GKE, Cloud Run, etc.), credentials are automatically detected.
"""
[docs]
def __init__(self, config: GCSConfig) -> None:
"""Initialize GCSStorage.
Args:
config: Configuration for the GCS backend
Raises:
ConfigurationError: If required configuration is missing
"""
self.config = config
self._client: Any = None
self._session: Any = None
if not config.bucket:
raise ConfigurationError("GCS bucket name is required")
def _get_key(self, key: str) -> str:
"""Apply prefix to a key.
Args:
key: The raw key
Returns:
Key with prefix applied
"""
if self.config.prefix:
prefix = self.config.prefix.rstrip("/") + "/"
key = key.lstrip("/")
return f"{prefix}{key}"
return key
def _strip_prefix(self, key: str) -> str:
"""Remove prefix from a key.
Args:
key: The key with prefix
Returns:
Key without prefix
"""
if self.config.prefix and key.startswith(self.config.prefix):
return key[len(self.config.prefix) :].lstrip("/")
return key
async def _get_client(self) -> Any: # noqa: ANN401
"""Get or create GCS client.
Returns:
gcloud.aio.storage.Storage client
Raises:
ConfigurationError: If gcloud-aio-storage is not installed
StorageConnectionError: If unable to create client
"""
if self._client is not None:
return self._client
try:
from gcloud.aio.storage import Storage
except ImportError as e:
raise ConfigurationError(
"gcloud-aio-storage is required for GCSStorage. Install it with: pip install gcloud-aio-storage"
) from e
try:
# Build client kwargs
kwargs: dict[str, Any] = {}
if self.config.service_file:
kwargs["service_file"] = self.config.service_file
if self.config.api_root:
kwargs["api_root"] = self.config.api_root
self._client = Storage(**kwargs)
return self._client
except Exception as e:
raise StorageConnectionError(f"Failed to create GCS client: {e}") from e
[docs]
async def put(
self,
key: str,
data: bytes | AsyncIterator[bytes],
*,
content_type: str | None = None,
metadata: dict[str, str] | None = None,
) -> StoredFile:
"""Store data at the given key.
Args:
key: Storage path/key for the file
data: File contents as bytes or async byte stream
content_type: MIME type of the content
metadata: Additional metadata to store with the file
Returns:
StoredFile with metadata about the stored file
Raises:
StorageError: If the upload fails
"""
client = await self._get_client()
gcs_key = self._get_key(key)
# Collect data if it's an async iterator
if isinstance(data, bytes):
file_data = data
else:
chunks = []
async for chunk in data:
chunks.append(chunk)
file_data = b"".join(chunks)
# Build metadata dict for GCS
gcs_metadata: dict[str, Any] | None = None
if metadata:
gcs_metadata = {"metadata": metadata}
try:
response = await client.upload(
self.config.bucket,
gcs_key,
file_data,
content_type=content_type,
metadata=gcs_metadata,
)
return StoredFile(
key=key,
size=len(file_data),
content_type=content_type,
etag=response.get("etag", "").strip('"'),
last_modified=datetime.now(tz=timezone.utc),
metadata=metadata or {},
)
except Exception as e:
from litestar_storages.exceptions import StorageError
raise StorageError(f"Failed to upload file {key}: {e}") from e
[docs]
async def get(self, key: str) -> AsyncIterator[bytes]:
"""Retrieve file contents as an async byte stream.
Args:
key: Storage path/key for the file
Yields:
Chunks of file data as bytes
Raises:
StorageFileNotFoundError: If the file does not exist
StorageError: If the retrieval fails
"""
client = await self._get_client()
gcs_key = self._get_key(key)
try:
# gcloud-aio-storage download returns bytes directly
# For streaming, we'd need to use download_stream, but it requires
# more complex handling. For now, download and yield in chunks.
data = await client.download(self.config.bucket, gcs_key)
# Yield in chunks for consistency with streaming interface
chunk_size = 64 * 1024 # 64KB
for i in range(0, len(data), chunk_size):
yield data[i : i + chunk_size]
except Exception as e:
error_message = str(e).lower()
if "not found" in error_message or "404" in error_message:
raise StorageFileNotFoundError(key) from e
from litestar_storages.exceptions import StorageError
raise StorageError(f"Failed to retrieve file {key}: {e}") from e
[docs]
async def get_bytes(self, key: str) -> bytes:
"""Retrieve entire file contents as bytes.
Args:
key: Storage path/key for the file
Returns:
Complete file contents as bytes
Raises:
StorageFileNotFoundError: If the file does not exist
StorageError: If the retrieval fails
"""
client = await self._get_client()
gcs_key = self._get_key(key)
try:
return await client.download(self.config.bucket, gcs_key)
except Exception as e:
error_message = str(e).lower()
if "not found" in error_message or "404" in error_message:
raise StorageFileNotFoundError(key) from e
from litestar_storages.exceptions import StorageError
raise StorageError(f"Failed to retrieve file {key}: {e}") from e
[docs]
async def delete(self, key: str) -> None:
"""Delete a file.
Args:
key: Storage path/key for the file
Note:
GCS delete is idempotent - deleting a non-existent key succeeds silently.
"""
client = await self._get_client()
gcs_key = self._get_key(key)
try:
await client.delete(self.config.bucket, gcs_key)
except Exception as e:
from litestar_storages.exceptions import StorageError
raise StorageError(f"Failed to delete file {key}: {e}") from e
[docs]
async def exists(self, key: str) -> bool:
"""Check if a file exists.
Args:
key: Storage path/key for the file
Returns:
True if the file exists, False otherwise
"""
client = await self._get_client()
gcs_key = self._get_key(key)
try:
from gcloud.aio.storage import Bucket
bucket = Bucket(client, self.config.bucket)
return await bucket.blob_exists(gcs_key)
except Exception:
return False
[docs]
async def list(
self,
prefix: str = "",
*,
limit: int | None = None,
) -> AsyncGenerator[StoredFile, None]:
"""List files with optional prefix filter.
Args:
prefix: Filter results to keys starting with this prefix
limit: Maximum number of results to return
Yields:
StoredFile metadata for each matching file
"""
client = await self._get_client()
gcs_prefix = self._get_key(prefix)
try:
# Use list_objects to get full metadata
params = {"prefix": gcs_prefix}
response = await client.list_objects(self.config.bucket, params=params)
items = response.get("items", [])
for count, item in enumerate(items):
gcs_key = item["name"]
key = self._strip_prefix(gcs_key)
# Parse last modified time
last_modified = None
if "updated" in item:
try:
last_modified = datetime.fromisoformat(item["updated"].replace("Z", "+00:00"))
except (ValueError, TypeError):
last_modified = None
yield StoredFile(
key=key,
size=int(item.get("size", 0)),
content_type=item.get("contentType"),
etag=item.get("etag", "").strip('"'),
last_modified=last_modified,
metadata=item.get("metadata", {}),
)
if limit is not None and count + 1 >= limit:
return
except Exception as e:
from litestar_storages.exceptions import StorageError
raise StorageError(f"Failed to list files: {e}") from e
[docs]
async def url(
self,
key: str,
*,
expires_in: timedelta | None = None,
) -> str:
"""Generate a signed URL for accessing the file.
Args:
key: Storage path/key for the file
expires_in: Optional expiration time (defaults to config.presigned_expiry)
Returns:
Signed URL string
Note:
Signed URLs allow temporary access to private GCS objects without
requiring GCP credentials. Requires service account credentials.
"""
client = await self._get_client()
gcs_key = self._get_key(key)
expiry = expires_in or self.config.presigned_expiry
expires_seconds = int(expiry.total_seconds())
try:
from gcloud.aio.storage import Bucket
bucket = Bucket(client, self.config.bucket)
blob = bucket.new_blob(gcs_key)
return await blob.get_signed_url(expiration=expires_seconds)
except Exception as e:
from litestar_storages.exceptions import StorageError
raise StorageError(f"Failed to generate URL for {key}: {e}") from e
[docs]
async def copy(
self,
source: str,
destination: str,
) -> StoredFile:
"""Copy a file within the storage backend.
Uses GCS's native copy operation for efficiency.
Args:
source: Source key to copy from
destination: Destination key to copy to
Returns:
StoredFile metadata for the new copy
Raises:
StorageFileNotFoundError: If the source file does not exist
StorageError: If the copy fails
"""
client = await self._get_client()
source_key = self._get_key(source)
dest_key = self._get_key(destination)
try:
# Use GCS's native copy (rewriteTo API)
await client.copy(
self.config.bucket,
source_key,
self.config.bucket,
new_name=dest_key,
)
# Get metadata for the new copy
return await self.info(destination)
except Exception as e:
error_message = str(e).lower()
if "not found" in error_message or "404" in error_message:
raise StorageFileNotFoundError(source) from e
from litestar_storages.exceptions import StorageError
raise StorageError(f"Failed to copy {source} to {destination}: {e}") from e
[docs]
async def move(
self,
source: str,
destination: str,
) -> StoredFile:
"""Move/rename a file within the storage backend.
Uses GCS's copy + delete operations (no native move).
Args:
source: Source key to move from
destination: Destination key to move to
Returns:
StoredFile metadata for the moved file
Raises:
StorageFileNotFoundError: If the source file does not exist
StorageError: If the move fails
"""
result = await self.copy(source, destination)
await self.delete(source)
return result
[docs]
async def info(self, key: str) -> StoredFile:
"""Get metadata about a file without downloading it.
Args:
key: Storage path/key for the file
Returns:
StoredFile with metadata
Raises:
StorageFileNotFoundError: If the file does not exist
StorageError: If retrieving metadata fails
"""
client = await self._get_client()
gcs_key = self._get_key(key)
try:
from gcloud.aio.storage import Bucket
bucket = Bucket(client, self.config.bucket)
blob = await bucket.get_blob(gcs_key)
# Parse metadata from blob (use getattr for type safety)
metadata_dict: dict[str, Any] = getattr(blob, "metadata", None) or {}
# Parse last modified time
last_modified = None
if metadata_dict.get("updated"):
try:
last_modified = datetime.fromisoformat(metadata_dict["updated"].replace("Z", "+00:00"))
except (ValueError, TypeError):
last_modified = None
return StoredFile(
key=key,
size=int(metadata_dict.get("size", 0)),
content_type=metadata_dict.get("contentType"),
etag=metadata_dict.get("etag", "").strip('"'),
last_modified=last_modified,
metadata=metadata_dict.get("metadata", {}),
)
except Exception as e:
error_message = str(e).lower()
if "not found" in error_message or "404" in error_message:
raise StorageFileNotFoundError(key) from e
from litestar_storages.exceptions import StorageError
raise StorageError(f"Failed to get info for {key}: {e}") from e
# =========================================================================
# Multipart Upload Support
# =========================================================================
[docs]
async def start_multipart_upload(
self,
key: str,
*,
content_type: str | None = None,
metadata: dict[str, str] | None = None,
part_size: int = 5 * 1024 * 1024,
) -> MultipartUpload:
"""Start a multipart upload.
Use this for large files (typically > 100MB) to enable:
- Chunked uploads with progress tracking
- Better handling of network failures
- Memory-efficient streaming uploads
Note:
GCS doesn't have native multipart upload like S3. This implementation
buffers parts in memory and uploads them when complete_multipart_upload
is called. For true resumable uploads, consider using GCS's resumable
upload API directly.
Args:
key: Storage path/key for the file
content_type: MIME type of the content
metadata: Additional metadata to store with the file
part_size: Size of each part in bytes (default 5MB)
Returns:
MultipartUpload object to track the upload state
Raises:
StorageError: If initiating the upload fails
"""
# Generate a unique upload ID (we'll store parts in the upload object)
import uuid
upload_id = str(uuid.uuid4())
# Store metadata for later use in complete_multipart_upload
upload = MultipartUpload(
upload_id=upload_id,
key=key,
part_size=part_size,
)
# Store additional metadata in a hidden attribute for later
upload._content_type = content_type # type: ignore[attr-defined] # noqa: SLF001
upload._metadata = metadata or {} # type: ignore[attr-defined] # noqa: SLF001
upload._part_data = [] # type: ignore[attr-defined] # noqa: SLF001 - Buffer for part data
return upload
[docs]
async def upload_part(
self,
upload: MultipartUpload,
part_number: int,
data: bytes,
) -> str:
"""Upload a single part of a multipart upload.
Note:
Parts are buffered in memory until complete_multipart_upload is called.
Args:
upload: The MultipartUpload object from start_multipart_upload
part_number: Part number (1-indexed, must be sequential)
data: The part data to upload
Returns:
ETag (placeholder) of the uploaded part
Raises:
StorageError: If the part upload fails
"""
try:
# Buffer the part data for later upload
part_data_list: list[tuple[int, bytes]] = getattr(upload, "_part_data", [])
part_data_list.append((part_number, data))
# Generate a placeholder ETag (actual ETag will come from final upload)
import hashlib
etag = hashlib.md5(data).hexdigest() # noqa: S324
upload.add_part(part_number, etag)
return etag
except Exception as e:
from litestar_storages.exceptions import StorageError
raise StorageError(f"Failed to buffer part {part_number} for {upload.key}: {e}") from e
[docs]
async def complete_multipart_upload(
self,
upload: MultipartUpload,
) -> StoredFile:
"""Complete a multipart upload.
Combines all buffered parts and uploads the complete file to GCS.
Args:
upload: The MultipartUpload object with all parts uploaded
Returns:
StoredFile metadata for the completed upload
Raises:
StorageError: If completing the upload fails
"""
try:
# Retrieve buffered parts
part_data_list: list[tuple[int, bytes]] = getattr(upload, "_part_data", [])
# Sort by part number and combine
sorted_parts = sorted(part_data_list, key=lambda p: p[0])
complete_data = b"".join(part[1] for part in sorted_parts)
# Retrieve stored metadata
content_type: str | None = getattr(upload, "_content_type", None)
metadata: dict[str, str] = getattr(upload, "_metadata", {})
# Upload the complete file
result = await self.put(
upload.key,
complete_data,
content_type=content_type,
metadata=metadata,
)
# Clean up buffered data
delattr(upload, "_part_data")
delattr(upload, "_content_type")
delattr(upload, "_metadata")
return result
except Exception as e:
from litestar_storages.exceptions import StorageError
raise StorageError(f"Failed to complete multipart upload for {upload.key}: {e}") from e
[docs]
async def abort_multipart_upload(
self,
upload: MultipartUpload,
) -> None:
"""Abort a multipart upload.
This cancels an in-progress multipart upload and frees buffered data.
Use this to clean up failed uploads.
Args:
upload: The MultipartUpload object to abort
Raises:
StorageError: If aborting the upload fails
"""
try:
# Clean up buffered data
if hasattr(upload, "_part_data"):
delattr(upload, "_part_data")
if hasattr(upload, "_content_type"):
delattr(upload, "_content_type")
if hasattr(upload, "_metadata"):
delattr(upload, "_metadata")
except Exception as e:
from litestar_storages.exceptions import StorageError
raise StorageError(f"Failed to abort multipart upload for {upload.key}: {e}") from e
[docs]
async def put_large(
self,
key: str,
data: bytes | AsyncIterator[bytes],
*,
content_type: str | None = None,
metadata: dict[str, str] | None = None,
part_size: int = 10 * 1024 * 1024,
progress_callback: ProgressCallback | None = None,
) -> StoredFile:
"""Upload a large file using multipart upload.
This is a convenience method that handles the multipart upload process
automatically. It splits the data into parts, uploads them, and
completes the upload.
Args:
key: Storage path/key for the file
data: File contents as bytes or async byte stream
content_type: MIME type of the content
metadata: Additional metadata to store with the file
part_size: Size of each part in bytes (default 10MB)
progress_callback: Optional callback for progress updates
Returns:
StoredFile with metadata about the stored file
Raises:
StorageError: If the upload fails
"""
# Collect data if it's an async iterator
if isinstance(data, bytes):
file_data = data
else:
chunks = []
async for chunk in data:
chunks.append(chunk)
file_data = b"".join(chunks)
total_size = len(file_data)
# For small files, use regular put
if total_size < part_size:
return await self.put(key, file_data, content_type=content_type, metadata=metadata)
# Start multipart upload
upload = await self.start_multipart_upload(
key,
content_type=content_type,
metadata=metadata,
part_size=part_size,
)
try:
bytes_uploaded = 0
part_number = 1
# Upload parts
for i in range(0, total_size, part_size):
part_data = file_data[i : i + part_size]
await self.upload_part(upload, part_number, part_data)
bytes_uploaded += len(part_data)
part_number += 1
# Report progress
if progress_callback:
progress_callback(
ProgressInfo(
bytes_transferred=bytes_uploaded,
total_bytes=total_size,
operation="upload",
key=key,
)
)
# Complete the upload
return await self.complete_multipart_upload(upload)
except Exception:
# Clean up on failure
await self.abort_multipart_upload(upload)
raise
[docs]
async def close(self) -> None:
"""Close the GCS storage and release resources.
This method closes the underlying aiohttp session.
"""
if self._client is not None:
await self._client.close()
self._client = None