Streaming Large Files¶
Handle large file uploads and downloads efficiently without loading entire files into memory. This recipe demonstrates chunked streaming, multipart uploads, and progress tracking for files of any size.
Prerequisites¶
Python 3.9+
litestar-storages installed (
pip install litestar-storages)For S3 multipart:
pip install litestar-storages[s3]For Litestar:
pip install litestar-storages[litestar]
The Problem¶
Loading large files entirely into memory causes:
Memory exhaustion and OOM errors
Slow response times while buffering
Poor user experience (no progress indication)
Failed uploads on unstable connections (no resume)
Streaming processes files in chunks, enabling:
Constant memory usage regardless of file size
Real-time progress updates
Resumable uploads for very large files
Better resource utilization
Solution¶
Streaming Utilities¶
First, create utilities for chunked streaming:
"""Streaming utilities for large file handling."""
from __future__ import annotations
import asyncio
from collections.abc import AsyncIterator
from dataclasses import dataclass
from typing import Callable
from litestar_storages import ProgressInfo
@dataclass
class ChunkInfo:
"""Information about a data chunk."""
data: bytes
offset: int
size: int
total: int | None
async def chunk_bytes(
data: bytes,
chunk_size: int = 64 * 1024, # 64KB default
) -> AsyncIterator[bytes]:
"""Split bytes into chunks.
Args:
data: Data to chunk
chunk_size: Size of each chunk
Yields:
Data chunks
"""
for i in range(0, len(data), chunk_size):
yield data[i:i + chunk_size]
# Yield control to event loop
await asyncio.sleep(0)
async def chunk_file(
path: str,
chunk_size: int = 64 * 1024,
) -> AsyncIterator[bytes]:
"""Read file in chunks without loading into memory.
Args:
path: Path to file
chunk_size: Size of each chunk
Yields:
File data chunks
"""
try:
import aiofiles
except ImportError as e:
raise ImportError("aiofiles required: pip install aiofiles") from e
async with aiofiles.open(path, "rb") as f:
while True:
chunk = await f.read(chunk_size)
if not chunk:
break
yield chunk
async def chunk_stream_with_progress(
stream: AsyncIterator[bytes],
total_size: int | None,
callback: Callable[[ProgressInfo], None] | None,
key: str = "",
operation: str = "upload",
) -> AsyncIterator[bytes]:
"""Wrap a stream with progress reporting.
Args:
stream: Source stream
total_size: Total expected size (for percentage)
callback: Progress callback function
key: Storage key for progress info
operation: "upload" or "download"
Yields:
Original chunks unchanged
"""
bytes_transferred = 0
async for chunk in stream:
yield chunk
bytes_transferred += len(chunk)
if callback:
callback(ProgressInfo(
bytes_transferred=bytes_transferred,
total_bytes=total_size,
operation=operation,
key=key,
))
class StreamingUploader:
"""Helper for streaming uploads with progress.
Example:
uploader = StreamingUploader(storage, progress_callback=my_callback)
result = await uploader.upload_file("/path/to/large/file.zip", "uploads/file.zip")
"""
def __init__(
self,
storage, # Type: Storage
chunk_size: int = 64 * 1024,
progress_callback: Callable[[ProgressInfo], None] | None = None,
) -> None:
self.storage = storage
self.chunk_size = chunk_size
self.progress_callback = progress_callback
async def upload_bytes(
self,
key: str,
data: bytes,
content_type: str | None = None,
):
"""Upload bytes with streaming and progress.
Args:
key: Storage key
data: Data to upload
content_type: MIME type
Returns:
StoredFile result
"""
total_size = len(data)
stream = chunk_bytes(data, self.chunk_size)
if self.progress_callback:
stream = chunk_stream_with_progress(
stream,
total_size,
self.progress_callback,
key=key,
operation="upload",
)
return await self.storage.put(
key=key,
data=stream,
content_type=content_type,
)
async def upload_file(
self,
path: str,
key: str,
content_type: str | None = None,
):
"""Upload file with streaming and progress.
Args:
path: Local file path
key: Storage key
content_type: MIME type (auto-detected if None)
Returns:
StoredFile result
"""
import os
total_size = os.path.getsize(path)
stream = chunk_file(path, self.chunk_size)
if self.progress_callback:
stream = chunk_stream_with_progress(
stream,
total_size,
self.progress_callback,
key=key,
operation="upload",
)
# Auto-detect content type from extension
if content_type is None:
import mimetypes
content_type, _ = mimetypes.guess_type(path)
return await self.storage.put(
key=key,
data=stream,
content_type=content_type,
)
class StreamingDownloader:
"""Helper for streaming downloads with progress.
Example:
downloader = StreamingDownloader(storage, progress_callback=my_callback)
async for chunk in downloader.download("uploads/file.zip"):
output_file.write(chunk)
"""
def __init__(
self,
storage, # Type: Storage
progress_callback: Callable[[ProgressInfo], None] | None = None,
) -> None:
self.storage = storage
self.progress_callback = progress_callback
async def download(self, key: str) -> AsyncIterator[bytes]:
"""Download file with streaming and progress.
Args:
key: Storage key
Yields:
File data chunks
"""
# Get file size for progress
info = await self.storage.info(key)
total_size = info.size
stream = self.storage.get(key)
if self.progress_callback:
async for chunk in chunk_stream_with_progress(
stream,
total_size,
self.progress_callback,
key=key,
operation="download",
):
yield chunk
else:
async for chunk in stream:
yield chunk
async def download_to_file(self, key: str, path: str) -> int:
"""Download file directly to disk.
Args:
key: Storage key
path: Local destination path
Returns:
Total bytes downloaded
"""
try:
import aiofiles
except ImportError as e:
raise ImportError("aiofiles required: pip install aiofiles") from e
total_bytes = 0
async with aiofiles.open(path, "wb") as f:
async for chunk in self.download(key):
await f.write(chunk)
total_bytes += len(chunk)
return total_bytes
Framework-Agnostic Streaming¶
Use streaming with any storage backend:
"""Framework-agnostic streaming file operations."""
import asyncio
from pathlib import Path
from litestar_storages import (
S3Storage,
S3Config,
ProgressInfo,
)
# Import from above
# from streaming_utils import StreamingUploader, StreamingDownloader
def create_progress_bar(description: str = "Progress"):
"""Create a simple console progress reporter."""
def report_progress(info: ProgressInfo) -> None:
if info.percentage is not None:
bar_width = 40
filled = int(bar_width * info.percentage / 100)
bar = "=" * filled + "-" * (bar_width - filled)
print(f"\r{description}: [{bar}] {info.percentage:.1f}%", end="", flush=True)
else:
mb = info.bytes_transferred / (1024 * 1024)
print(f"\r{description}: {mb:.2f} MB transferred", end="", flush=True)
return report_progress
async def main() -> None:
"""Demonstrate streaming uploads and downloads."""
# Configure S3 storage
storage = S3Storage(
config=S3Config(
bucket="my-bucket",
region="us-west-2",
)
)
# Create uploader with progress
uploader = StreamingUploader(
storage,
chunk_size=1024 * 1024, # 1MB chunks
progress_callback=create_progress_bar("Uploading"),
)
# Upload a large file
print("Starting upload...")
result = await uploader.upload_file(
path="/path/to/large-file.zip",
key="backups/large-file.zip",
content_type="application/zip",
)
print(f"\nUploaded: {result.key} ({result.size} bytes)")
# Create downloader with progress
downloader = StreamingDownloader(
storage,
progress_callback=create_progress_bar("Downloading"),
)
# Download to file
print("\nStarting download...")
bytes_downloaded = await downloader.download_to_file(
key="backups/large-file.zip",
path="/tmp/downloaded-file.zip",
)
print(f"\nDownloaded: {bytes_downloaded} bytes")
# Or stream to process chunks
print("\nStreaming download...")
async for chunk in downloader.download("backups/large-file.zip"):
# Process each chunk (hash, validate, etc.)
pass
await storage.close()
if __name__ == "__main__":
asyncio.run(main())
S3 Multipart Upload for Very Large Files¶
For files over 100MB, use S3’s multipart upload:
"""S3 multipart upload for very large files."""
import asyncio
from pathlib import Path
from litestar_storages import (
S3Storage,
S3Config,
ProgressInfo,
MultipartUpload,
)
async def upload_large_file_multipart(
storage: S3Storage,
path: str,
key: str,
part_size: int = 10 * 1024 * 1024, # 10MB parts
progress_callback=None,
) -> None:
"""Upload a large file using S3 multipart upload.
Benefits:
- Parallel part uploads possible
- Resume failed uploads
- Required for files > 5GB
Args:
storage: S3Storage instance
path: Local file path
key: S3 key
part_size: Size of each part (min 5MB)
progress_callback: Optional progress callback
"""
import os
import aiofiles
file_size = os.path.getsize(path)
# Use built-in put_large for simplicity
async with aiofiles.open(path, "rb") as f:
data = await f.read()
result = await storage.put_large(
key=key,
data=data,
part_size=part_size,
progress_callback=progress_callback,
)
print(f"Uploaded: {result.key} ({result.size} bytes)")
async def upload_large_file_manual(
storage: S3Storage,
path: str,
key: str,
part_size: int = 10 * 1024 * 1024,
) -> None:
"""Manual multipart upload with fine-grained control.
Use this when you need:
- Custom error handling per part
- Parallel part uploads
- Upload state persistence for resume
"""
import os
import aiofiles
file_size = os.path.getsize(path)
total_parts = (file_size + part_size - 1) // part_size
print(f"Starting multipart upload: {total_parts} parts")
# Start multipart upload
upload = await storage.start_multipart_upload(
key=key,
content_type="application/octet-stream",
part_size=part_size,
)
try:
async with aiofiles.open(path, "rb") as f:
part_number = 1
while True:
chunk = await f.read(part_size)
if not chunk:
break
# Upload this part
etag = await storage.upload_part(upload, part_number, chunk)
print(f" Part {part_number}/{total_parts} uploaded (ETag: {etag[:8]}...)")
part_number += 1
# Complete the upload
result = await storage.complete_multipart_upload(upload)
print(f"Completed: {result.key}")
except Exception as e:
# Abort on failure to clean up parts
print(f"Upload failed: {e}, aborting...")
await storage.abort_multipart_upload(upload)
raise
async def parallel_multipart_upload(
storage: S3Storage,
path: str,
key: str,
part_size: int = 10 * 1024 * 1024,
max_concurrency: int = 4,
) -> None:
"""Upload parts in parallel for faster transfers.
Args:
storage: S3Storage instance
path: Local file path
key: S3 key
part_size: Size of each part
max_concurrency: Max parallel uploads
"""
import os
import aiofiles
file_size = os.path.getsize(path)
total_parts = (file_size + part_size - 1) // part_size
# Start upload
upload = await storage.start_multipart_upload(
key=key,
part_size=part_size,
)
# Semaphore for concurrency control
semaphore = asyncio.Semaphore(max_concurrency)
async def upload_part(part_num: int, offset: int) -> tuple[int, str]:
"""Upload a single part."""
async with semaphore:
async with aiofiles.open(path, "rb") as f:
await f.seek(offset)
chunk = await f.read(part_size)
etag = await storage.upload_part(upload, part_num, chunk)
return part_num, etag
try:
# Create tasks for all parts
tasks = [
upload_part(i + 1, i * part_size)
for i in range(total_parts)
]
# Run with progress
completed = 0
for coro in asyncio.as_completed(tasks):
part_num, etag = await coro
completed += 1
print(f"Part {part_num} complete ({completed}/{total_parts})")
# Complete
result = await storage.complete_multipart_upload(upload)
print(f"Upload complete: {result.key}")
except Exception as e:
await storage.abort_multipart_upload(upload)
raise
With Litestar¶
Build streaming file endpoints with Litestar:
"""Litestar application with streaming file operations."""
from __future__ import annotations
import asyncio
from collections.abc import AsyncIterator
from dataclasses import dataclass
from datetime import timedelta
from typing import Annotated, Any
import uuid
from litestar import Litestar, get, post, Response
from litestar.datastructures import UploadFile
from litestar.di import Provide
from litestar.exceptions import ClientException
from litestar.response import Stream
from litestar.status_codes import HTTP_400_BAD_REQUEST
from litestar_storages import (
S3Storage,
S3Config,
Storage,
StoredFile,
ProgressInfo,
StorageFileNotFoundError,
)
from litestar_storages.contrib.plugin import StoragePlugin
# Configuration
MAX_DIRECT_UPLOAD = 100 * 1024 * 1024 # 100MB - use multipart above this
CHUNK_SIZE = 1024 * 1024 # 1MB chunks
# Response DTOs
@dataclass
class UploadResponse:
"""Upload result."""
key: str
size: int
url: str
@dataclass
class DownloadInfo:
"""Download information."""
key: str
size: int
content_type: str | None
download_url: str
# Streaming request body reader
async def read_body_stream(
body: AsyncIterator[bytes],
max_size: int = 500 * 1024 * 1024, # 500MB max
) -> AsyncIterator[bytes]:
"""Read request body as a stream with size limit.
Args:
body: Request body iterator
max_size: Maximum allowed size
Yields:
Body chunks
Raises:
ClientException: If size limit exceeded
"""
total_size = 0
async for chunk in body:
total_size += len(chunk)
if total_size > max_size:
raise ClientException(
detail=f"Request body too large. Maximum: {max_size // (1024 * 1024)}MB",
status_code=HTTP_400_BAD_REQUEST,
)
yield chunk
# Route handlers
@post("/upload/stream")
async def upload_stream(
data: UploadFile,
storage: Storage,
) -> UploadResponse:
"""Upload file with streaming.
Handles files of any size without loading into memory.
Files over 100MB use multipart upload automatically.
"""
filename = data.filename or f"{uuid.uuid4()}"
key = f"uploads/{uuid.uuid4()}/{filename}"
# Read file content (UploadFile handles streaming internally)
content = await data.read()
# For very large files with S3, use multipart
if isinstance(storage, S3Storage) and len(content) > MAX_DIRECT_UPLOAD:
result = await storage.put_large(
key=key,
data=content,
content_type=data.content_type,
)
else:
result = await storage.put(
key=key,
data=content,
content_type=data.content_type,
)
url = await storage.url(key)
return UploadResponse(
key=result.key,
size=result.size,
url=url,
)
@get("/download/{key:path}")
async def download_stream(
key: str,
storage: Storage,
) -> Stream:
"""Stream file download without loading into memory.
Returns a streaming response that sends chunks as they're read.
"""
try:
info = await storage.info(key)
except StorageFileNotFoundError as e:
raise ClientException(
detail=f"File not found: {key}",
status_code=404,
) from e
async def generate() -> AsyncIterator[bytes]:
"""Generate response body from storage stream."""
async for chunk in storage.get(key):
yield chunk
return Stream(
generate(),
media_type=info.content_type or "application/octet-stream",
headers={
"Content-Length": str(info.size),
"Content-Disposition": f'attachment; filename="{key.split("/")[-1]}"',
},
)
@get("/download/{key:path}/url")
async def get_download_url(
key: str,
storage: Storage,
) -> DownloadInfo:
"""Get presigned download URL.
For cloud storage, returns a presigned URL for direct download.
Useful for very large files or CDN integration.
"""
try:
info = await storage.info(key)
except StorageFileNotFoundError as e:
raise ClientException(
detail=f"File not found: {key}",
status_code=404,
) from e
# Generate URL with 1 hour expiration
url = await storage.url(key, expires_in=timedelta(hours=1))
return DownloadInfo(
key=info.key,
size=info.size,
content_type=info.content_type,
download_url=url,
)
# WebSocket progress updates (optional)
async def upload_with_websocket_progress(
websocket,
storage: S3Storage,
data: bytes,
key: str,
) -> StoredFile:
"""Upload with real-time WebSocket progress updates.
Example client:
ws = new WebSocket('ws://...');
ws.onmessage = (e) => {
const progress = JSON.parse(e.data);
updateProgressBar(progress.percentage);
};
"""
async def report_progress(info: ProgressInfo) -> None:
await websocket.send_json({
"event": "progress",
"bytes_transferred": info.bytes_transferred,
"total_bytes": info.total_bytes,
"percentage": info.percentage,
})
result = await storage.put_large(
key=key,
data=data,
progress_callback=report_progress,
)
await websocket.send_json({
"event": "complete",
"key": result.key,
"size": result.size,
})
return result
# Application setup
def create_app() -> Litestar:
"""Create application with streaming-capable storage."""
import os
storage = S3Storage(
config=S3Config(
bucket=os.getenv("S3_BUCKET", "uploads"),
region=os.getenv("AWS_REGION", "us-west-2"),
)
)
return Litestar(
route_handlers=[upload_stream, download_stream, get_download_url],
plugins=[StoragePlugin(default=storage)],
)
app = create_app()
Resumable Uploads¶
Implement resumable uploads for unreliable connections:
"""Resumable upload implementation."""
import asyncio
import hashlib
import json
from dataclasses import dataclass, asdict
from pathlib import Path
from typing import Any
from litestar_storages import S3Storage, MultipartUpload
@dataclass
class UploadState:
"""Persisted upload state for resume."""
upload_id: str
key: str
file_path: str
file_size: int
part_size: int
completed_parts: list[tuple[int, str]] # (part_number, etag)
def to_json(self) -> str:
"""Serialize to JSON."""
return json.dumps(asdict(self))
@classmethod
def from_json(cls, data: str) -> "UploadState":
"""Deserialize from JSON."""
d = json.loads(data)
return cls(**d)
class ResumableUploader:
"""Upload large files with resume capability.
Saves upload state to disk, allowing uploads to resume
after network failures or application restarts.
"""
def __init__(
self,
storage: S3Storage,
state_dir: Path = Path(".upload_state"),
part_size: int = 10 * 1024 * 1024,
) -> None:
self.storage = storage
self.state_dir = state_dir
self.part_size = part_size
self.state_dir.mkdir(exist_ok=True)
def _state_path(self, file_path: str) -> Path:
"""Get state file path for an upload."""
file_hash = hashlib.md5(file_path.encode()).hexdigest()
return self.state_dir / f"{file_hash}.json"
def _save_state(self, state: UploadState) -> None:
"""Save upload state to disk."""
path = self._state_path(state.file_path)
path.write_text(state.to_json())
def _load_state(self, file_path: str) -> UploadState | None:
"""Load existing upload state."""
path = self._state_path(file_path)
if path.exists():
return UploadState.from_json(path.read_text())
return None
def _clear_state(self, file_path: str) -> None:
"""Remove state file after completion."""
path = self._state_path(file_path)
if path.exists():
path.unlink()
async def upload(
self,
file_path: str,
key: str,
on_progress=None,
) -> None:
"""Upload file with resume support.
Args:
file_path: Local file path
key: Storage key
on_progress: Optional callback(bytes_uploaded, total_bytes)
"""
import os
import aiofiles
file_size = os.path.getsize(file_path)
total_parts = (file_size + self.part_size - 1) // self.part_size
# Check for existing upload
state = self._load_state(file_path)
if state and state.key == key:
print(f"Resuming upload: {len(state.completed_parts)}/{total_parts} parts done")
upload = MultipartUpload(
upload_id=state.upload_id,
key=state.key,
parts=state.completed_parts,
part_size=state.part_size,
)
else:
# Start new upload
upload = await self.storage.start_multipart_upload(
key=key,
part_size=self.part_size,
)
state = UploadState(
upload_id=upload.upload_id,
key=key,
file_path=file_path,
file_size=file_size,
part_size=self.part_size,
completed_parts=[],
)
# Get set of completed part numbers
completed_numbers = {p[0] for p in state.completed_parts}
try:
async with aiofiles.open(file_path, "rb") as f:
for part_num in range(1, total_parts + 1):
if part_num in completed_numbers:
# Skip already uploaded parts
continue
# Seek to part position
offset = (part_num - 1) * self.part_size
await f.seek(offset)
chunk = await f.read(self.part_size)
# Upload part
etag = await self.storage.upload_part(upload, part_num, chunk)
# Update state
state.completed_parts.append((part_num, etag))
upload.add_part(part_num, etag)
self._save_state(state)
# Report progress
if on_progress:
bytes_done = sum(
min(self.part_size, file_size - (p[0] - 1) * self.part_size)
for p in state.completed_parts
)
on_progress(bytes_done, file_size)
# Complete upload
await self.storage.complete_multipart_upload(upload)
self._clear_state(file_path)
print(f"Upload complete: {key}")
except Exception as e:
print(f"Upload interrupted: {e}")
print("Run again to resume from where you left off")
raise
async def demo_resumable_upload() -> None:
"""Demonstrate resumable upload."""
storage = S3Storage(
config=S3Config(bucket="my-bucket", region="us-west-2")
)
uploader = ResumableUploader(storage)
def progress(done: int, total: int) -> None:
pct = (done / total) * 100
print(f"\rProgress: {pct:.1f}% ({done}/{total} bytes)", end="")
await uploader.upload(
file_path="/path/to/huge-file.tar.gz",
key="backups/huge-file.tar.gz",
on_progress=progress,
)
Key Points¶
Never load large files into memory: Use streaming and chunking
Provide progress feedback: Users need to know uploads are working
Use multipart for large S3 uploads: Required for files > 5GB, recommended for > 100MB
Implement resume for reliability: Save state for long uploads
Parallel uploads: Faster transfers with concurrent part uploads
Presigned URLs: Offload large downloads to cloud CDN
Performance Tips¶
Tune chunk size: Larger chunks = fewer requests, but more memory
Parallel parts: 4-8 concurrent part uploads is usually optimal
Presigned URLs: Let cloud storage handle bandwidth-heavy downloads
Connection pooling: Reuse HTTP connections for multiple operations
Compression: Compress before upload if content is compressible