Source code for litestar_storages.backends.azure

"""Azure Blob Storage backend."""

from __future__ import annotations

from collections.abc import AsyncGenerator, AsyncIterator
from dataclasses import dataclass, field
from datetime import datetime, timedelta, timezone
from typing import Any

from litestar_storages.base import BaseStorage
from litestar_storages.exceptions import (
    ConfigurationError,
    StorageConnectionError,
    StorageFileNotFoundError,
)
from litestar_storages.types import MultipartUpload, ProgressCallback, ProgressInfo, StoredFile

__all__ = ("AzureConfig", "AzureStorage")


[docs] @dataclass class AzureConfig: """Configuration for Azure Blob Storage. Supports authentication via: - Connection string (connection_string) - Account URL + credential (account_url + account_key or DefaultAzureCredential) - SAS token (account_url with SAS token embedded) Attributes: container: Azure Blob container name account_url: Azure storage account URL (e.g., https://<account>.blob.core.windows.net) account_key: Storage account access key (optional if using connection string or DefaultAzureCredential) connection_string: Full connection string (alternative to account_url + account_key) prefix: Key prefix for all operations (e.g., "uploads/") presigned_expiry: Default expiration time for SAS URLs """ container: str account_url: str | None = None account_key: str | None = None connection_string: str | None = None prefix: str = "" presigned_expiry: timedelta = field(default_factory=lambda: timedelta(hours=1))
[docs] class AzureStorage(BaseStorage): """Azure Blob Storage backend. Uses azure-storage-blob async API for all operations. Example: >>> # Using connection string >>> storage = AzureStorage( ... config=AzureConfig( ... container="my-container", ... connection_string="DefaultEndpointsProtocol=https;...", ... ) ... ) >>> # Using account URL and key >>> storage = AzureStorage( ... config=AzureConfig( ... container="my-container", ... account_url="https://myaccount.blob.core.windows.net", ... account_key="my-access-key", ... ) ... ) >>> # Using Azurite emulator >>> storage = AzureStorage( ... config=AzureConfig( ... container="test-container", ... connection_string="DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=...;BlobEndpoint=http://127.0.0.1:10000/devstoreaccount1", ... ) ... ) Note: The client is lazily initialized on first use. When running on Azure (App Service, Functions, AKS, etc.), credentials can be automatically detected using DefaultAzureCredential. """
[docs] def __init__(self, config: AzureConfig) -> None: """Initialize AzureStorage. Args: config: Configuration for the Azure Blob backend Raises: ConfigurationError: If required configuration is missing """ self.config = config self._container_client: Any = None if not config.container: raise ConfigurationError("Azure container name is required") if not config.connection_string and not config.account_url: raise ConfigurationError("Either connection_string or account_url is required")
def _get_key(self, key: str) -> str: """Apply prefix to a key. Args: key: The raw key Returns: Key with prefix applied """ if self.config.prefix: prefix = self.config.prefix.rstrip("/") + "/" key = key.lstrip("/") return f"{prefix}{key}" return key def _strip_prefix(self, key: str) -> str: """Remove prefix from a key. Args: key: The key with prefix Returns: Key without prefix """ if self.config.prefix and key.startswith(self.config.prefix): return key[len(self.config.prefix) :].lstrip("/") return key async def _get_container_client(self) -> Any: # noqa: ANN401 """Get or create Azure Container client. Returns: azure.storage.blob.aio.ContainerClient Raises: ConfigurationError: If azure-storage-blob is not installed StorageConnectionError: If unable to create client """ if self._container_client is not None: return self._container_client try: from azure.storage.blob.aio import BlobServiceClient, ContainerClient except ImportError as e: raise ConfigurationError( "azure-storage-blob is required for AzureStorage. Install it with: pip install azure-storage-blob" ) from e try: if self.config.connection_string: # Create from connection string self._container_client = ContainerClient.from_connection_string( conn_str=self.config.connection_string, container_name=self.config.container, ) elif self.config.account_url: # Create from account URL if self.config.account_key: # Use account key credential blob_service_client = BlobServiceClient( account_url=self.config.account_url, credential=self.config.account_key, ) else: # Use DefaultAzureCredential (for managed identity, etc.) try: from azure.identity.aio import DefaultAzureCredential # pragma: no cover credential = DefaultAzureCredential() # pragma: no cover blob_service_client = BlobServiceClient( # pragma: no cover account_url=self.config.account_url, credential=credential, ) except ImportError as e: # pragma: no cover raise ConfigurationError( "azure-identity is required when using account_url without account_key. " "Install it with: pip install azure-identity" ) from e self._container_client = blob_service_client.get_container_client(self.config.container) return self._container_client except Exception as e: if isinstance(e, ConfigurationError): raise raise StorageConnectionError(f"Failed to create Azure client: {e}") from e # pragma: no cover
[docs] async def put( self, key: str, data: bytes | AsyncIterator[bytes], *, content_type: str | None = None, metadata: dict[str, str] | None = None, ) -> StoredFile: """Store data at the given key. Args: key: Storage path/key for the file data: File contents as bytes or async byte stream content_type: MIME type of the content metadata: Additional metadata to store with the file Returns: StoredFile with metadata about the stored file Raises: StorageError: If the upload fails """ container_client = await self._get_container_client() azure_key = self._get_key(key) # Collect data if it's an async iterator if isinstance(data, bytes): file_data = data else: chunks = [] async for chunk in data: chunks.append(chunk) file_data = b"".join(chunks) try: from azure.storage.blob import ContentSettings blob_client = container_client.get_blob_client(azure_key) # Build content settings content_settings = None if content_type: content_settings = ContentSettings(content_type=content_type) await blob_client.upload_blob( file_data, overwrite=True, content_settings=content_settings, metadata=metadata, ) return StoredFile( key=key, size=len(file_data), content_type=content_type, etag="", # Azure returns ETag but needs additional call last_modified=datetime.now(tz=timezone.utc), metadata=metadata or {}, ) except Exception as e: from litestar_storages.exceptions import StorageError raise StorageError(f"Failed to upload file {key}: {e}") from e
[docs] async def get(self, key: str) -> AsyncIterator[bytes]: """Retrieve file contents as an async byte stream. Args: key: Storage path/key for the file Yields: Chunks of file data as bytes Raises: StorageFileNotFoundError: If the file does not exist StorageError: If the retrieval fails """ container_client = await self._get_container_client() azure_key = self._get_key(key) try: blob_client = container_client.get_blob_client(azure_key) download_stream = await blob_client.download_blob() # Stream the data in chunks async for chunk in download_stream.chunks(): yield chunk except Exception as e: error_message = str(e).lower() if "blobnotfound" in error_message or "not found" in error_message or "404" in error_message: raise StorageFileNotFoundError(key) from e from litestar_storages.exceptions import StorageError raise StorageError(f"Failed to retrieve file {key}: {e}") from e
[docs] async def get_bytes(self, key: str) -> bytes: """Retrieve entire file contents as bytes. Args: key: Storage path/key for the file Returns: Complete file contents as bytes Raises: StorageFileNotFoundError: If the file does not exist StorageError: If the retrieval fails """ container_client = await self._get_container_client() azure_key = self._get_key(key) try: blob_client = container_client.get_blob_client(azure_key) download_stream = await blob_client.download_blob() return await download_stream.readall() except Exception as e: error_message = str(e).lower() if "blobnotfound" in error_message or "not found" in error_message or "404" in error_message: raise StorageFileNotFoundError(key) from e from litestar_storages.exceptions import StorageError raise StorageError(f"Failed to retrieve file {key}: {e}") from e
[docs] async def delete(self, key: str) -> None: """Delete a file. Args: key: Storage path/key for the file Note: Deleting a non-existent key will raise an error (unlike S3/GCS). Use exists() first if you need idempotent deletes. """ container_client = await self._get_container_client() azure_key = self._get_key(key) try: blob_client = container_client.get_blob_client(azure_key) await blob_client.delete_blob() except Exception as e: from litestar_storages.exceptions import StorageError raise StorageError(f"Failed to delete file {key}: {e}") from e
[docs] async def exists(self, key: str) -> bool: """Check if a file exists. Args: key: Storage path/key for the file Returns: True if the file exists, False otherwise """ container_client = await self._get_container_client() azure_key = self._get_key(key) try: blob_client = container_client.get_blob_client(azure_key) return await blob_client.exists() except Exception: # pragma: no cover return False
[docs] async def list( self, prefix: str = "", *, limit: int | None = None, ) -> AsyncGenerator[StoredFile, None]: """List files with optional prefix filter. Args: prefix: Filter results to keys starting with this prefix limit: Maximum number of results to return Yields: StoredFile metadata for each matching file """ container_client = await self._get_container_client() azure_prefix = self._get_key(prefix) try: count = 0 async for blob in container_client.list_blobs(name_starts_with=azure_prefix): blob_key = self._strip_prefix(blob.name) yield StoredFile( key=blob_key, size=blob.size or 0, content_type=blob.content_settings.content_type if blob.content_settings else None, etag=blob.etag.strip('"') if blob.etag else "", last_modified=blob.last_modified, metadata=blob.metadata or {}, ) count += 1 if limit is not None and count >= limit: return except Exception as e: from litestar_storages.exceptions import StorageError raise StorageError(f"Failed to list files: {e}") from e
[docs] async def url( self, key: str, *, expires_in: timedelta | None = None, ) -> str: """Generate a SAS URL for accessing the file. Args: key: Storage path/key for the file expires_in: Optional expiration time (defaults to config.presigned_expiry) Returns: SAS URL string Note: SAS URLs allow temporary access to private Azure blobs without requiring Azure credentials. Requires account key for signing. """ container_client = await self._get_container_client() azure_key = self._get_key(key) expiry = expires_in or self.config.presigned_expiry # Get account name and key before try block to avoid TRY301 if self.config.connection_string: # Parse from connection string parts = dict(item.split("=", 1) for item in self.config.connection_string.split(";") if "=" in item) account_name = parts.get("AccountName", "") account_key = parts.get("AccountKey", "") else: # Extract from account URL if self.config.account_url: url = self.config.account_url.replace("https://", "").replace("http://", "") account_name = url.split(".")[0] else: account_name = "" # pragma: no cover account_key = self.config.account_key or "" if not account_key: # pragma: no cover msg = "Account key is required to generate SAS URLs" raise ConfigurationError(msg) try: from azure.storage.blob import BlobSasPermissions, generate_blob_sas blob_client = container_client.get_blob_client(azure_key) sas_token = generate_blob_sas( account_name=account_name, container_name=self.config.container, blob_name=azure_key, account_key=account_key, permission=BlobSasPermissions(read=True), expiry=datetime.now(tz=timezone.utc) + expiry, ) return f"{blob_client.url}?{sas_token}" except Exception as e: from litestar_storages.exceptions import StorageError raise StorageError(f"Failed to generate URL for {key}: {e}") from e
[docs] async def copy( self, source: str, destination: str, ) -> StoredFile: """Copy a file within the storage backend. Uses Azure's native copy operation for efficiency. Args: source: Source key to copy from destination: Destination key to copy to Returns: StoredFile metadata for the new copy Raises: StorageFileNotFoundError: If the source file does not exist StorageError: If the copy fails """ container_client = await self._get_container_client() source_key = self._get_key(source) dest_key = self._get_key(destination) try: source_blob = container_client.get_blob_client(source_key) dest_blob = container_client.get_blob_client(dest_key) # Start copy from URL await dest_blob.start_copy_from_url(source_blob.url) # Get metadata for the new copy return await self.info(destination) except Exception as e: error_message = str(e).lower() if "blobnotfound" in error_message or "not found" in error_message or "404" in error_message: raise StorageFileNotFoundError(source) from e from litestar_storages.exceptions import StorageError raise StorageError(f"Failed to copy {source} to {destination}: {e}") from e
[docs] async def move( self, source: str, destination: str, ) -> StoredFile: """Move/rename a file within the storage backend. Uses Azure's copy + delete operations (no native move). Args: source: Source key to move from destination: Destination key to move to Returns: StoredFile metadata for the moved file Raises: StorageFileNotFoundError: If the source file does not exist StorageError: If the move fails """ result = await self.copy(source, destination) await self.delete(source) return result
[docs] async def info(self, key: str) -> StoredFile: """Get metadata about a file without downloading it. Args: key: Storage path/key for the file Returns: StoredFile with metadata Raises: StorageFileNotFoundError: If the file does not exist StorageError: If retrieving metadata fails """ container_client = await self._get_container_client() azure_key = self._get_key(key) try: blob_client = container_client.get_blob_client(azure_key) properties = await blob_client.get_blob_properties() return StoredFile( key=key, size=properties.size or 0, content_type=properties.content_settings.content_type if properties.content_settings else None, etag=properties.etag.strip('"') if properties.etag else "", last_modified=properties.last_modified, metadata=properties.metadata or {}, ) except Exception as e: error_message = str(e).lower() if "blobnotfound" in error_message or "not found" in error_message or "404" in error_message: raise StorageFileNotFoundError(key) from e from litestar_storages.exceptions import StorageError raise StorageError(f"Failed to get info for {key}: {e}") from e
[docs] async def close(self) -> None: """Close the Azure storage and release resources. This method closes the underlying aiohttp session. """ if self._container_client is not None: await self._container_client.close() self._container_client = None
# ========================================================================= # Multipart Upload Support # =========================================================================
[docs] async def start_multipart_upload( self, key: str, *, content_type: str | None = None, metadata: dict[str, str] | None = None, part_size: int = 4 * 1024 * 1024, ) -> MultipartUpload: """Start a multipart upload using Azure Block Blobs. Azure Block Blobs support uploading blocks (up to 50,000 blocks per blob) and then committing them as a single blob. Args: key: Storage path/key for the file content_type: MIME type of the content metadata: Additional metadata to store with the file part_size: Size of each part in bytes (default 4MB, max 4000MB per block) Returns: MultipartUpload object to track the upload state Raises: StorageError: If initiating the upload fails Note: Azure doesn't have an explicit "start multipart upload" API. Instead, we generate a unique upload_id and track it locally. The actual multipart upload happens when blocks are staged and committed. """ # Generate a unique upload ID for tracking # Azure doesn't have an explicit multipart upload concept like S3 # We use the key + timestamp as the upload ID upload_id = f"{key}_{datetime.now(tz=timezone.utc).timestamp()}" # Store metadata and content_type for later use during commit # These will be used in complete_multipart_upload upload = MultipartUpload( upload_id=upload_id, key=key, part_size=part_size, ) # Store content_type and metadata in upload object for later use # We'll need these when committing the block list upload.content_type = content_type # type: ignore[attr-defined] upload.metadata = metadata # type: ignore[attr-defined] return upload
[docs] async def upload_part( self, upload: MultipartUpload, part_number: int, data: bytes, ) -> str: """Upload a single part (block) of a multipart upload. Azure uses block IDs to identify blocks. Block IDs must be: - Base64-encoded strings - Unique within the blob - Same length for all blocks in the blob Args: upload: The MultipartUpload object from start_multipart_upload part_number: Part number (1-indexed) data: The part data to upload Returns: Block ID (base64-encoded) of the uploaded block Raises: StorageError: If the part upload fails """ import base64 container_client = await self._get_container_client() azure_key = self._get_key(upload.key) # Generate block ID - must be base64-encoded and same length for all blocks # Using 10-character zero-padded number ensures consistent length block_id = base64.b64encode(f"{part_number:010d}".encode()).decode() try: blob_client = container_client.get_blob_client(azure_key) # Stage the block (doesn't commit yet) await blob_client.stage_block( block_id=block_id, data=data, ) # Record the uploaded part upload.add_part(part_number, block_id) return block_id except Exception as e: from litestar_storages.exceptions import StorageError raise StorageError(f"Failed to upload part {part_number} for {upload.key}: {e}") from e
[docs] async def complete_multipart_upload( self, upload: MultipartUpload, ) -> StoredFile: """Complete a multipart upload by committing all blocks. Args: upload: The MultipartUpload object with all parts uploaded Returns: StoredFile metadata for the completed upload Raises: StorageError: If completing the upload fails """ container_client = await self._get_container_client() azure_key = self._get_key(upload.key) # Sort parts by part number to ensure correct order sorted_parts = sorted(upload.parts, key=lambda p: p[0]) block_list = [block_id for _, block_id in sorted_parts] try: from azure.storage.blob import ContentSettings blob_client = container_client.get_blob_client(azure_key) # Get content_type and metadata from upload object content_type = getattr(upload, "content_type", None) metadata = getattr(upload, "metadata", None) # Build content settings content_settings = None if content_type: content_settings = ContentSettings(content_type=content_type) # Commit the block list to finalize the blob await blob_client.commit_block_list( block_list=block_list, content_settings=content_settings, metadata=metadata, ) # Get the final file info return await self.info(upload.key) except Exception as e: # pragma: no cover from litestar_storages.exceptions import StorageError raise StorageError(f"Failed to complete multipart upload for {upload.key}: {e}") from e
[docs] async def abort_multipart_upload( self, upload: MultipartUpload, ) -> None: """Abort a multipart upload. Note: Azure automatically garbage-collects uncommitted blocks after 7 days. There is no explicit "abort" operation needed - simply don't commit the block list. This method is provided for API consistency but is essentially a no-op. Args: upload: The MultipartUpload object to abort """
# Azure automatically cleans up uncommitted blocks after 7 days # No explicit abort operation is needed
[docs] async def put_large( self, key: str, data: bytes | AsyncIterator[bytes], *, content_type: str | None = None, metadata: dict[str, str] | None = None, part_size: int = 4 * 1024 * 1024, progress_callback: ProgressCallback | None = None, ) -> StoredFile: """Upload a large file using multipart upload. This is a convenience method that handles the multipart upload process automatically. It splits the data into blocks, uploads them, and commits the block list. Args: key: Storage path/key for the file data: File contents as bytes or async byte stream content_type: MIME type of the content metadata: Additional metadata to store with the file part_size: Size of each part in bytes (default 4MB, max 4000MB per block) progress_callback: Optional callback for progress updates Returns: StoredFile with metadata about the stored file Raises: StorageError: If the upload fails Note: Azure Block Blobs support up to 50,000 blocks per blob. Each block can be up to 4000MB in size. """ # Collect data if it's an async iterator if isinstance(data, bytes): file_data = data else: chunks = [] async for chunk in data: chunks.append(chunk) file_data = b"".join(chunks) total_size = len(file_data) # For small files, use regular put if total_size < part_size: return await self.put(key, file_data, content_type=content_type, metadata=metadata) # Start multipart upload upload = await self.start_multipart_upload( key, content_type=content_type, metadata=metadata, part_size=part_size, ) try: bytes_uploaded = 0 part_number = 1 # Upload parts for i in range(0, total_size, part_size): part_data = file_data[i : i + part_size] await self.upload_part(upload, part_number, part_data) bytes_uploaded += len(part_data) part_number += 1 # Report progress if progress_callback: progress_callback( ProgressInfo( bytes_transferred=bytes_uploaded, total_bytes=total_size, operation="upload", key=key, ) ) # Complete the upload return await self.complete_multipart_upload(upload) except Exception: # Note: Azure doesn't require explicit abort - blocks auto-expire await self.abort_multipart_upload(upload) raise