Tiered Storage
KIP-405: Hot Data on Disk, Cold Data in S3
Kafka's traditional storage model put the entire log on local broker disks. For low retention this is fine. For 30-day or 90-day retention on a high-throughput cluster, it means provisioning tens of terabytes of SSD on every broker — and most of that disk is touched only on rare backfills.
Tiered storage (KIP-405, GA in Kafka 3.6) splits the log into two tiers. The local tier holds recent segments (the hot tail). Closed segments past a configurable threshold are uploaded to remote storage (typically S3 or GCS) and deleted from local disk. Consumers do not change — the broker transparently fetches from the remote tier when needed.
Architecture
Key Numbers
Configuration
# Cluster-level: enable the remote storage system
remote.log.storage.system.enable=true
# Storage manager plugin (Confluent S3, AWS S3, custom)
remote.log.storage.manager.class.name=org.apache.kafka.server.log.remote.storage.RemoteLogStorageManager
remote.log.storage.manager.class.path=/opt/kafka/plugins/tiered/*
# Implementation-specific (S3 example)
rsm.config.s3.bucket.name=my-kafka-tier
rsm.config.s3.region=us-east-1
rsm.config.s3.credentials.provider.class=com.amazonaws.auth.DefaultAWSCredentialsProviderChain
# Metadata manager (uses __remote_log_metadata internal topic)
remote.log.metadata.manager.class.name=org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager
remote.log.metadata.topic.replication.factor=3
remote.log.metadata.topic.num.partitions=50 # Per-topic: enable tiered storage and set local retention
$ kafka-topics.sh --bootstrap-server kafka-1:9092 --create \
--topic events \
--partitions 100 \
--replication-factor 3 \
--config remote.storage.enable=true \
--config local.retention.ms=604800000 \
--config retention.ms=7776000000
# local.retention.ms = 7 days (kept on broker disk)
# retention.ms = 90 days (total: local + remote)
# local.retention.bytes = -1 (no size cap on local; use ms) Upload Path
# RemoteLogManager runs on each broker, processes its leader partitions
# Periodic task (every remote.log.manager.task.interval.ms = 30s default):
for partition in leader_partitions:
segments = list_local_segments(partition)
closed_old_enough = [s for s in segments
if s.is_closed() and (now - s.last_modified) > local.retention.ms]
for segment in closed_old_enough:
if not already_in_remote_metadata(segment):
# Upload .log + .index + .timeindex + .producerSnapshot + .leaderEpochCheckpoint
remote_id = generate_uuid()
remote_storage_manager.copyLogSegmentData(remote_id, segment)
remote_log_metadata_manager.addRemoteLogSegmentMetadata(metadata)
# Now safe to delete locally
delete_local_segment(segment) Only the partition leader uploads. Followers replicate as before from the leader's local copy. When leadership changes, the new leader checks remote metadata and resumes uploading from where the previous leader left off — no redundant uploads.
Fetch Path: Remote Read
# Consumer fetches offset 0.5M (in remote tier)
# Broker handling FetchRequest:
if requested_offset >= local_log_start_offset:
# Hot path — local disk, sendfile() to socket
return read_local_segment(requested_offset)
else:
# Cold path — remote tier
metadata = remote_log_metadata_manager.find(partition, offset=0.5M)
if metadata is None:
return OFFSET_OUT_OF_RANGE
# Stream from S3 directly to consumer
remote_input_stream = remote_storage_manager.fetchLogSegment(metadata, position)
copy_bytes(remote_input_stream, fetch_response_buffer, max_bytes)
return fetch_response
# The broker does not persist the remote bytes locally.
# It also does not page-cache them — cold reads cost full S3 latency every time. The broker uses a small in-memory cache for remote-segment indexes (so it does not re-fetch the .index file on every read), but the actual log data is streamed pass-through. This means cold reads are slower but they do not impact the page-cache hit rate of the hot tail.
Cost Economics
| Tier | Cost / GB-month | Latency | Use |
|---|---|---|---|
| SSD (gp3 or local NVMe) | ~$0.08-0.15 | 0.1-1 ms | Hot tail (last 1-7 days) |
| S3 Standard | $0.023 | 50-200 ms | Recent cold (1-90 days) |
| S3 IA (Infrequent Access) | $0.0125 + retrieval | 50-200 ms + retrieval fee | Older cold (30+ days) |
| S3 Glacier IR | $0.004 | ~ms (instant retrieval) | Long-term audit/compliance |
A worked example: 100 brokers, 5 GB/s throughput, 30-day retention. Without tiered storage, that is ~13 PB on local SSD across the cluster — about $130K/month at gp3 prices. With tiered storage (1 day local, 29 days remote), it is ~430 TB local + 12.6 PB in S3 = ~$4K + $290K = wait. The savings come from broker count, not just disk: smaller local disks mean fewer brokers (since you no longer need to spread storage across many machines), so the total cost drops.
When It Doesn't Help
Tiered storage shines when
- Retention is much longer than read window (90 days kept, 1 day actively read)
- Throughput requires many brokers — disk consolidation cuts broker count
- Topics are append-only, time-based (not compacted)
- Backfills and replays are rare
Skip tiered storage when
- Retention is short (a few days) — local disk is fine
- Most consumers replay from earliest constantly (cold reads dominate)
- You use compacted topics heavily (not yet supported)
- You can't tolerate occasional 100ms+ fetch latency on backfills
- Your S3 egress costs are non-trivial (cross-region consumers)
Operational Notes
# Key metrics to watch
kafka.log.remote:type=RemoteStorageThreadPool,name=RemoteLogReaderTaskQueueSize
kafka.log.remote:type=RemoteLogManager,name=RemoteCopyBytesPerSec
kafka.log.remote:type=RemoteLogManager,name=RemoteFetchBytesPerSec
kafka.log.remote:type=RemoteLogManager,name=RemoteCopyLagSegments
kafka.log.remote:type=RemoteLogManager,name=RemoteDeleteErrorsPerSec
# Throttling — bound the bandwidth used by tiered uploads
remote.log.manager.copier.task.thread.pool.size=10
remote.log.manager.thread.pool.size=10
remote.log.manager.copy.max.bytes.per.second=104857600 # 100 MB/s per broker
# Disable tiered storage on a topic (topic must be re-created)
$ kafka-configs.sh --alter --entity-type topics --entity-name events \
--add-config remote.storage.enable=false # not allowed once enabled, must recreate Frequently Asked Questions
What problem does tiered storage solve?
Kafka brokers store all log segments on local disks. For long retention (days, weeks, months), this means provisioning huge SSDs on every broker, even though most reads target only the last few minutes of data. Tiered storage lets the broker keep only the hot tail on local disk and offload cold segments to cheap object storage like S3 — without changing the consumer API. A 30-day retention requirement that needed 50 TB of SSD on each broker can drop to 1 TB local + 50 TB in S3.
Does fetching from S3 hurt latency?
Yes for cold reads, but the design assumes cold reads are rare. New consumers and most production workloads read the hot tail from local disk at full speed (~ms latency). Only consumers reading historical data — backfills, replays, audits — pay the S3 latency (~50-200ms per fetch). The broker streams the remote segment range into the response without persisting it locally, so it does not pollute the local disk.
Can I use tiered storage with compacted topics?
Not as of Kafka 3.7+. KIP-405 explicitly excluded compacted topics because compaction rewrites segments after they are written, which conflicts with the immutability assumption of remote-tier segments. Time-based retention topics (the most common case for high-volume event streams) are fully supported. There is ongoing work to extend support, but the current production sweet spot is high-volume non-compacted topics with long retention.
What goes into the remote tier?
Closed log segments — the .log file plus its .index and .timeindex sidecars, and any associated transaction index and producer state snapshot. These are uploaded as a unit (a Remote Log Segment) and tracked in metadata. The active segment (currently being written) and recently closed segments stay on local disk until the local retention threshold (configurable per topic) is exceeded.
How does the broker know what is in S3?
Kafka uses a pluggable RemoteLogMetadataManager. The default implementation stores remote-segment metadata in a special internal topic (__remote_log_metadata) that mirrors what segments exist for each partition. When a consumer fetches an offset that is in the remote tier, the broker looks up the segment from this metadata, fetches the relevant byte range from S3, and streams it back. Metadata operations are local; only the actual data fetch hits S3.