Retry Utilities¶
The retry module provides robust retry logic with exponential backoff for storage operations. Network issues, rate limits, and transient failures are common in cloud storage - the retry utilities help your application handle them gracefully.
When to Use Retries¶
Retries are appropriate for transient failures that may succeed on a subsequent attempt:
Retryable |
Not Retryable |
|---|---|
Connection timeouts |
Invalid credentials |
Rate limiting (429) |
File not found |
Temporary network issues |
Permission denied |
Service unavailable (503) |
Invalid configuration |
The retry utilities automatically distinguish between these cases - non-retryable exceptions are raised immediately without wasting time on doomed attempts.
RetryConfig¶
The RetryConfig dataclass controls retry behavior:
from litestar_storages import RetryConfig
config = RetryConfig(
max_retries=3, # Maximum retry attempts (0 = no retries)
base_delay=1.0, # Initial delay between retries (seconds)
max_delay=60.0, # Maximum delay cap (seconds)
exponential_base=2.0, # Base for exponential backoff
jitter=True, # Add randomness to prevent thundering herd
retryable_exceptions=( # Exception types that trigger retries
StorageConnectionError,
TimeoutError,
ConnectionError,
),
)
Configuration Options¶
Option |
Type |
Default |
Description |
|---|---|---|---|
|
|
|
Maximum number of retry attempts after initial failure |
|
|
|
Initial delay in seconds before first retry |
|
|
|
Maximum delay cap to prevent excessive waits |
|
|
|
Multiplier for exponential backoff |
|
|
|
Add randomness to delays (recommended) |
|
|
See below |
Exception types that should trigger retries |
Default Retryable Exceptions¶
By default, these exceptions trigger retries:
StorageConnectionError- Network connectivity issuesTimeoutError- Operation timeoutsConnectionError- Python’s built-in connection error
The @retry Decorator¶
The @retry decorator adds retry logic to async functions:
from litestar_storages import retry, RetryConfig, Storage
@retry()
async def upload_file(storage: Storage, key: str, data: bytes) -> None:
"""Upload with default retry settings (3 retries, exponential backoff)."""
await storage.put(key, data)
@retry(RetryConfig(max_retries=5, base_delay=0.5))
async def upload_critical_file(storage: Storage, key: str, data: bytes) -> None:
"""Upload with custom retry settings for important files."""
await storage.put(key, data)
Basic Usage¶
from litestar_storages import retry, Storage
@retry()
async def store_user_avatar(storage: Storage, user_id: str, image: bytes) -> str:
"""Store a user's avatar with automatic retry on failure."""
key = f"avatars/{user_id}.jpg"
result = await storage.put(key, image, content_type="image/jpeg")
return result.key
Custom Configuration¶
from litestar_storages import retry, RetryConfig
# Aggressive retry for critical operations
critical_retry = RetryConfig(
max_retries=5,
base_delay=0.5,
max_delay=30.0,
)
@retry(critical_retry)
async def store_payment_receipt(storage: Storage, receipt_id: str, pdf: bytes) -> None:
"""Store payment receipt - must succeed."""
await storage.put(f"receipts/{receipt_id}.pdf", pdf)
Custom Retryable Exceptions¶
Extend the retryable exceptions for specific backends:
from litestar_storages import retry, RetryConfig, StorageConnectionError
# Include rate limit errors as retryable
@retry(RetryConfig(
retryable_exceptions=(
StorageConnectionError,
TimeoutError,
ConnectionError,
RateLimitError, # Your custom exception
),
))
async def upload_with_rate_limit_handling(storage: Storage, key: str, data: bytes) -> None:
await storage.put(key, data)
The with_retry() Function¶
For cases where you need inline retry logic without a decorator, use with_retry():
from litestar_storages import with_retry, RetryConfig, Storage
async def process_upload(storage: Storage, key: str, data: bytes) -> None:
"""Upload with inline retry logic."""
# Retry just the storage operation
await with_retry(
lambda: storage.put(key, data),
RetryConfig(max_retries=3),
)
When to Use with_retry()¶
Use with_retry() instead of @retry when:
You need to retry only part of a function
The retry configuration is determined at runtime
You’re working with lambdas or closures
from litestar_storages import with_retry, RetryConfig
async def batch_upload(storage: Storage, files: dict[str, bytes]) -> list[str]:
"""Upload multiple files with per-file retry."""
uploaded = []
for key, data in files.items():
# Each file gets its own retry attempts
result = await with_retry(
lambda k=key, d=data: storage.put(k, d),
RetryConfig(max_retries=2),
)
uploaded.append(result.key)
return uploaded
Dynamic Configuration¶
async def upload_with_priority(
storage: Storage,
key: str,
data: bytes,
priority: str,
) -> None:
"""Upload with retry config based on priority."""
if priority == "critical":
config = RetryConfig(max_retries=10, base_delay=0.1)
elif priority == "high":
config = RetryConfig(max_retries=5, base_delay=0.5)
else:
config = RetryConfig(max_retries=2, base_delay=1.0)
await with_retry(lambda: storage.put(key, data), config)
Exponential Backoff Explained¶
The retry delay grows exponentially with each attempt:
delay = min(base_delay * (exponential_base ** attempt), max_delay)
With default settings (base_delay=1.0, exponential_base=2.0, max_delay=60.0):
Attempt |
Delay (seconds) |
|---|---|
1 |
1.0 |
2 |
2.0 |
3 |
4.0 |
4 |
8.0 |
5 |
16.0 |
6 |
32.0 |
7+ |
60.0 (capped) |
Why Jitter Matters¶
Without jitter, if multiple clients fail at the same time, they all retry at the same time, potentially causing another failure (thundering herd problem).
With jitter enabled (default), delays are randomized within +/-25%:
jittered_delay = delay * random(0.75, 1.25)
This spreads out retry attempts, reducing load spikes on the storage service.
Logging¶
The retry utilities log warnings for each retry attempt and errors when retries are exhausted:
import logging
# Enable logging to see retry activity
logging.basicConfig(level=logging.WARNING)
# Example log output:
# WARNING:litestar_storages.retry:Retry attempt 1/3 for upload_file after 1.23s: Connection timed out
# WARNING:litestar_storages.retry:Retry attempt 2/3 for upload_file after 2.67s: Connection timed out
# ERROR:litestar_storages.retry:All 3 retries exhausted for upload_file
To capture retry events in your application:
import logging
logger = logging.getLogger("litestar_storages.retry")
logger.setLevel(logging.INFO)
# Add your handler
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter("%(asctime)s - %(message)s"))
logger.addHandler(handler)
Best Practices¶
1. Use Retries for Idempotent Operations¶
Retries work best with operations that can be safely repeated:
# Good: PUT is idempotent - uploading the same file twice is fine
@retry()
async def upload(storage: Storage, key: str, data: bytes) -> None:
await storage.put(key, data)
# Caution: Not idempotent - may create duplicates
async def create_unique_file(storage: Storage, data: bytes) -> str:
key = f"files/{uuid4()}.dat" # Generate key BEFORE retry loop
await with_retry(lambda: storage.put(key, data))
return key
2. Set Reasonable Timeouts¶
Combine retries with operation timeouts to avoid waiting forever:
import asyncio
from litestar_storages import retry, RetryConfig
@retry(RetryConfig(max_retries=3, base_delay=1.0))
async def upload_with_timeout(storage: Storage, key: str, data: bytes) -> None:
"""Upload with both retry and timeout."""
try:
await asyncio.wait_for(
storage.put(key, data),
timeout=30.0, # 30 second timeout per attempt
)
except asyncio.TimeoutError:
raise TimeoutError(f"Upload of {key} timed out")
3. Don’t Retry User-Facing Operations¶
For operations where users are waiting, prefer fast failure over long retry sequences:
# API endpoint - fail fast for better UX
@retry(RetryConfig(max_retries=1, base_delay=0.5))
async def handle_upload(storage: Storage, key: str, data: bytes) -> None:
await storage.put(key, data)
# Background job - can afford more retries
@retry(RetryConfig(max_retries=10, base_delay=5.0))
async def process_batch_upload(storage: Storage, key: str, data: bytes) -> None:
await storage.put(key, data)
4. Monitor Retry Rates¶
High retry rates indicate underlying issues. Track them in production:
from dataclasses import dataclass
@dataclass
class RetryMetrics:
attempts: int = 0
successes: int = 0
failures: int = 0
metrics = RetryMetrics()
@retry()
async def upload_with_metrics(storage: Storage, key: str, data: bytes) -> None:
metrics.attempts += 1
try:
await storage.put(key, data)
metrics.successes += 1
except Exception:
metrics.failures += 1
raise
Integration with Storage Operations¶
Wrapping Storage Methods¶
Create a retry-enabled wrapper for common operations:
from litestar_storages import Storage, RetryConfig, with_retry
class RetryableStorage:
"""Storage wrapper with automatic retries."""
def __init__(self, storage: Storage, config: RetryConfig | None = None):
self._storage = storage
self._config = config or RetryConfig()
async def put(self, key: str, data: bytes, **kwargs):
return await with_retry(
lambda: self._storage.put(key, data, **kwargs),
self._config,
)
async def get_bytes(self, key: str) -> bytes:
return await with_retry(
lambda: self._storage.get_bytes(key),
self._config,
)
async def delete(self, key: str) -> None:
return await with_retry(
lambda: self._storage.delete(key),
self._config,
)
Next Steps¶
Learn about multipart uploads for large files
Explore progress callbacks for upload monitoring
Create custom backends with built-in retry support