Redis Streams
Streams are Redis's answer to Kafka-style append-only logs. Each entry has a server-assigned
monotonically-increasing ID (millisecond timestamp + sequence number), a flat field-value
payload, and lives in a stream identified by a single Redis key. Producers append with
XADD; consumers read with XREAD (broadcast) or
XREADGROUP (work-queue with consumer groups). Each consumer group tracks the
"delivered but not yet acked" Pending Entries List (PEL), so a crashed consumer's work can
be reclaimed by another via XCLAIM. Internally the data structure is a radix
tree of listpacks — efficient append and range-scan, compact storage.
Stream Model
Key Numbers
Streams vs Lists vs Pub/Sub
Three Redis primitives handle messaging — here's how to choose.
Redis offers three distinct mechanisms for "messaging" workloads, each with fundamentally different semantics. Choosing the wrong one leads to lost messages, impossible recovery, or unbounded memory growth. Here's the breakdown.
| Property | Streams | Lists (RPUSH/LPOP) | Pub/Sub |
|---|---|---|---|
| Data model | append-only log, indexable by ID | mutable queue, indexable by position | transient broadcast |
| History | preserved until trimmed | pop destroys the entry | gone after publish |
| Consumer recovery | can replay from any ID or reclaim PEL | can't — message is gone after pop | can't — no history to replay |
| Multiple consumers | multiple groups, each sees all entries | only one consumer per message | all subscribers receive each message |
| Backpressure | PEL tracks unacked; consumer can limit | LPOP returns nil when empty | no — fire and forget |
| Consumer groups | built-in, named, persistent | application-level only | no |
| At-least-once | yes — via PEL + XACK | only if you re-push on failure | no — at-most-once |
| Exactly-once | no — requires idempotent consumers | no | no |
| Latency | sub-ms XADD, sub-ms XREADGROUP | sub-ms | sub-ms but unreliable |
| Memory model | grows until trimmed; bounded by MAXLEN | grows indefinitely unless you pop | near zero — just subscriber buffers |
| Best for | durable queues, event sourcing, fan-out work | simple task queues, lightweight pipes | live notifications, real-time dashboards |
Lists — The Destroy-on-Read Trap
Lists feel natural for a queue: RPUSH myqueue task_data, then BLPOP myqueue 0 to consume. The problem is that LPOP (or RPOP) permanently removes the entry. If your consumer crashes after popping but before completing the task, that work is gone — there's no way to recover it. You can implement your own "processing" list + "done" list pattern, but now you're building consumer-group semantics from scratch.
# The dangerous pattern: pop and pray RPUSH tasks "job-123" LPOP tasks → "job-123" — gone if consumer dies here # Safer pattern: two-list dequeue RPUSH tasks "job-123" BRPOPLPUSH tasks processing 0 → moves to processing list atomically # ... do work ... LREM processing "job-123" → remove when done # If consumer dies, job-123 sits in processing list — recoverable # but requires a separate watchdog to re-queue stale entries
Pub/Sub — No Memory, No Recovery
Pub/Sub is a pure broadcast: a publisher sends to all active subscribers on a channel. There is no persistence — Redis discards the message as soon as it's delivered to connected clients. If a subscriber is offline when the message fires, it's gone. Pub/Sub is appropriate for live status updates, real-time counters, or dashboard refreshes where occasional loss is acceptable. It is never appropriate for anything that needs durability or exactly-once processing.
# Publisher: sends and forgets PUBLISH notifications "user:42:order:placed" # Subscriber: must be connected to receive SUBSCRIBE notifications # If subscriber disconnects, misses the message — no replay
Streams — The Right Default for Durable Queues
Streams combine the append-only durability of a log with the consumer-group partitioning of a work queue. Every entry survives until you explicitly trim it. Consumer groups ensure each entry is delivered to exactly one worker. PEL ensures that crashed workers can have their unprocessed messages reassigned. For anything beyond "fire and forget," streams are the right tool.
Stream Internals: Radix Tree + Listpacks
How Redis stores streams efficiently enough to handle millions of entries in RAM.
A Redis stream is not a simple list. It's a specialized data structure optimized for three operations: appending to the tail (XADD), range queries by ID (XRANGE, XREAD), and random access by ID (XREAD with specific start). The implementation is a radix tree (also called a compact prefix tree or RADIX tree in Redis internals) where the keys are entry IDs and values are listpacks.
Radix Tree (Rax)
A radix tree stores variable-length keys by sharing prefixes. For streams, the keys are entry IDs like 1700000000000-0. Since entry IDs share a common prefix (the millisecond timestamp portion), the tree compresses well. Each node in the radix tree stores a slice of an entry ID as its key, with a pointer to either a child node or a listpack value.
The radix tree provides O(log N) seek to any entry ID, which means XREAD can jump directly to a specific position in the stream without scanning from the beginning. This is critical for consumers that resume after a crash — they can seek to their last acknowledged position in O(log N) time regardless of stream length.
# Internal tree structure (simplified)
stream "events"
└── radix tree
└── node "1700" ← shared prefix, not a data node
└── node "000000000" ← full timestamp key
└── node "0-" ← sequence separator
└── node "0" ← leaf, value → listpack When you XADD, Redis takes the current millisecond timestamp, appends -0, and inserts into the radix tree. If another entry arrives in the same millisecond, it becomes -1, -2, etc. The tree structure means that consecutive entries with similar IDs share tree nodes, reducing memory overhead for the tree itself to roughly 12 bytes per node plus edge pointers.
Listpacks
The value at each radix tree leaf is a listpack — a contiguous memory blob that groups multiple stream entries together. A listpack is Redis's compact序列化 format: it stores field names and values as binary integers or length-prefixed strings, with inline encoding that uses fewer bytes for small integers and short strings.
Listpacks group entries that share the same field schema. Most streams have a stable set of field names (e.g., type, user, session_id). Redis exploits this by storing field names once per listpack as a "master entry," with subsequent entries storing only the values. When scanning a listpack, Redis first reads the master entry's field names, then reads each subsequent entry's values in order.
# What a listpack looks like in memory (conceptual) listpack for ID range 1700000000000-0 to 1700000000099-0 [master entry] field: type, user, session_id ← stored once [entry 0] values: ["click", "42", "sess_abc"] ← values only [entry 1] values: ["view", "99", "sess_def"] ... (up to ~100 entries per listpack) [end marker] total-bytes, num-entries
Listpacks are bounded in size (default max 4096 bytes per listpack node, configurable via listpack-max-raw-entry bytes). When a listpack reaches its size limit, Redis starts a new listpack node. This means each radix tree leaf points to a listpack containing roughly 50–200 entries depending on field count and value sizes.
Memory Implications
Stream memory has three components:
- Radix tree overhead: ~12 bytes per node + child pointers. A stream with 10M entries might have ~200k nodes = ~2–3 MB for the tree structure itself.
- Listpack data: dominated by entry payloads. A stream with 1M entries averaging 100 bytes per entry (fields + values) needs ~100 MB for listpacks.
- PEL memory: each pending entry reference consumes ~30 bytes in server memory (entry ID + consumer reference + metadata). A consumer with 100k unacknowledged messages burns ~3 MB of server-side PEL memory.
Use MEMORY USAGE on a stream key to measure its actual footprint:
MEMORY USAGE events → 1048576 ← ~1MB for stream + radix tree overhead XINFO STREAM events FULL → ... includes radix-tree-bytes, listpack-bytes, ...
Why This Design?
The radix tree + listpack combination optimizes for the three most common operations. Appending (XADD) is amortized O(1) — Redis appends to the tail listpack and only creates new nodes when the listpack fills. Range scans (XRANGE) are O(log N + M) — seek to start in the tree, then scan forward through listpacks. Random access by ID uses the tree's seek to jump directly to the containing listpack.
The trade-off is that updates to existing entries are not supported — streams are append-only by design. There's no XSET or XUPDATE to modify an entry after it's added. This simplicity is what makes the data structure so fast and compact.
Entry IDs: Timestamps, Sequences, and the 0-0/$ Shorthand
The ID is the primary index — understanding it unlocks stream behavior.
Every stream entry has an ID in the format ms-s, where ms is a Unix timestamp in milliseconds and s is a sequence number for entries within the same millisecond. IDs are monotonically increasing within a stream — every new entry gets an ID strictly greater than the previous one. This property makes IDs the natural index for range queries and consumer progress tracking.
Server-Assigned IDs (the * shorthand)
When you call XADD stream * field value, the asterisk tells Redis to generate the ID. Redis takes unix_time_ms(), checks if it's greater than the last ID's timestamp, and if so uses that timestamp with sequence number 0. If the timestamp is the same as the last entry's, the sequence increments. The result is a deterministic, globally-increasing ID with no coordination needed.
XADD events * type click user 42 → "1700000000000-0" ← first entry in this ms XADD events * type click user 43 → "1700000000000-1" ← same ms, sequence incremented XADD events * type click user 44 → "1700000000000-2" ← sequence continues # Millisecond boundary — timestamp jumps, sequence resets XADD events * type click user 45 → "1700000000001-0" ← new ms, sequence 0
Custom IDs
You can provide an explicit ID. This is useful when ingesting data with its own timestamp (e.g., from a legacy system), enforcing strict total ordering across multiple producers, or when you need to replay events with IDs that match an external log. The only rule: the ID must be greater than the current maximum ID in the stream.
# Custom timestamp — useful for ingesting historical data XADD events 1699000000000-0 type import data "from backup" → "1699000000000-0" # Error: ID must be greater than last ID XADD events 1698000000000-0 type old "too early" → (error) ERR The ID specified is smaller than the current item ID # External ordering — three producers coordinating via shared clock XADD events 1700000000000-0 type producer=A payload "first" XADD events 1700000000000-1 type producer=B payload "second" ← same ms, seq 1 XADD events 1700000000000-2 type producer=C payload "third" ← seq 2 # Total order is preserved: 0 < 1 < 2 regardless of producer
The Special IDs: 0-0, $, and >
Three special ID values appear throughout the API. Each has a precise meaning:
0-0 XREADGROUP GROUP g c STREAMS s 0-0 starts from the stream's first entry and delivers all entries not yet acknowledged to consumer c.
$ $ means "give me entries added after I called this command." A blocked XREAD with $ waits until a new entry arrives, then returns it. This is the tail-follow pattern: XREAD BLOCK 0 STREAMS events $ creates a consumer that receives every new entry as it arrives. Note: $ is evaluated once at call time — if the stream is empty when you call, you see nothing. A new stream has no entries, so the first XADD is invisible to a $-blocked client that was waiting.
> > (not a valid stream ID) to mean "give me entries added since the group's last-delivered-id cursor, excluding any entry already in a consumer's PEL." Entries delivered via > go onto the receiving consumer's PEL. The > symbol never appears as a start ID; it's a command parameter token.
Cross-Host Ordering
The millisecond timestamp comes from the Redis server's system clock. If you have multiple Redis nodes (e.g., in a Cluster), each server has its own clock. A producer on node-A might assign 1700000000001-0 at 12:00:00.001 UTC, while a producer on node-B assigns 1700000000000-0 at 12:00:00.002 UTC — the IDs appear out of order relative to wall-clock time. This matters only if you use the IDs as a timestamp proxy. For ordering within a single stream, Redis guarantees monotonicity. For ordering across streams, you need an external clock or a logical clock (Lamport timestamp) embedded in the field values.
XADD and Core Commands
Every command you'll use day-to-day, with edge cases.
XADD — Appending Entries
# Minimal: auto-ID
XADD events * type click user 42
→ "1700000000000-0"
# With MAXLEN: bounded stream
XADD events MAXLEN 10000 * type click user 42
→ "1700000000000-1" ← stream now has at most 10,000 entries
older entries dropped from head
# Approximate MAXLEN (recommended for high-frequency streams)
XADD events MAXLEN ~ 10000 * type click user 42
→ "1700000000000-2"
# The ~ means "drop entries until we're at or below ~N"
# Trimming aligns to listpack boundaries, O(log N) instead of O(N)
# Useful when you don't need exact count guarantees
# MINID: time-based retention
XADD events MINID 1700000000000 * type historical "data"
→ adds entry only if its ID ≥ 1700000000000
# MINID is only valid for XTRIM, not XADD (was confusing — fixed in 6.2: now XADD supports MINID)
# NOMKSTREAM: only add if stream already exists
XADD events NOMKSTREAM * type click user 42
→ nil if "events" stream doesn't exist
→ ID if stream exists (stream is created on first XADD regardless of NOMKSTREAM)
# LIMIT: cap bytes-per-append to avoid huge entries (6.2+)
XADD events LIMIT 1024 * type large_field "..." ← returns nil if entry exceeds 1024 bytes XLEN — Stream Length
XLEN events → 1042 ← exactly how many entries exist
Simple, but note that exact-length streams with very high cardinality (millions of entries) can be slow to count if Redis has to traverse the radix tree. In practice, XLEN is O(1) because Redis maintains a length counter cached in the stream header. The counter is updated on every XADD and XTRIM.
XRANGE and XREVRANGE — Random Access
# Get entries from start to end (inclusive), count limit XRANGE events 1700000000000-0 1700000000100-0 COUNT 50 → entries with IDs in [1700000000000-0, 1700000000100-0], max 50 # Get all entries from ID forward XRANGE events 1700000000000-0 + COUNT 100 → + means "to the maximum ID" — up to 100 entries from this point # Reverse scan — newest entries first XREVRANGE events + - COUNT 10 → 10 most recent entries in the stream # Count-only shorthand: use + and - without ID constraints XRANGE events - + COUNT 1 → the first entry in the stream XREVRANGE events + - COUNT 1 → the last entry in the stream
The + and - are shorthand for "maximum possible ID" and "minimum possible ID" respectively. Use + in XRANGE for "from this ID to tail," and - in XREVRANGE for "from head to this ID."
XREAD — Broadcast Reading
XREAD is the non-grouped read. Every consumer sees every entry. No acknowledgments. The consumer tracks its own position by storing the last-seen ID and passing it on the next call. This is analogous to reading a file from offset — you manage your own cursor.
# Read forward from a specific ID (resume after restart) XREAD STREAMS events 1700000000100-0 → entries with ID > 1700000000100-0, up to Redis's default (1000) # Tail-follow: block until new entries arrive XREAD BLOCK 0 STREAMS events $ → blocks forever, returns when a new XADD inserts into events # Multi-stream read: one call reads from multiple streams XREAD BLOCK 5 STREAMS events metrics logs → blocks up to 5s, returns from whichever stream has data first → format: [[stream, entries], [stream, entries], ...] # Count limit on multi-stream XREAD COUNT 5 BLOCK 0 STREAMS events metrics logs → max 5 entries per stream
BLOCK 0, it blocks indefinitely until data arrives. BLOCK 5000 blocks up to 5 seconds then returns nil (timeout). A blocked XREAD releases the Redis client thread during the wait — no CPU is consumed on the server.
XDEL — Deleting Entries
XDEL removes specific entries by ID. This is rarely needed for normal operation (use XTRIM instead for retention) but useful for dead-letter patterns where you explicitly discard poison messages.
XDEL events 1700000000000-0 1700000000000-1 → 2 ← number of entries deleted
Consumer Groups: Work Queues with Explicit Acknowledgment
How XREADGROUP partitions work, why PEL is central, and what happens when things fail.
XGROUP CREATE — Creating a Group
# Start from the beginning — consumer gets all historical entries XGROUP CREATE events workers 0 → OK # Start from current tail — consumer only gets new entries, not history XGROUP CREATE events workers $ → OK # With MKSTREAM: create the stream if it doesn't exist XGROUP CREATE events workers 0 MKSTREAM → creates stream "events" with a consumer group "workers" at ID 0 # Specify a custom ID to start from (replay from a checkpoint) XGROUP CREATE events workers 1700000000500-0 → group starts at ID 1700000000500-0, skipping everything before
A consumer group is bound to a specific stream. You cannot share a group's state across multiple streams — each stream has its own groups. If you need to consume multiple streams with a single group, look at consumer-side merging or Redis Streams' XREAD with multiple streams (but without group semantics across streams).
XREADGROUP — Reading with Group Semantics
# Basic group read — get new entries not yet delivered to anyone XREADGROUP GROUP workers c1 COUNT 10 BLOCK 5000 STREAMS events > → [stream, [[ID, [field, value, ...]], ...]] entries assigned to c1, added to c1's PEL # Reread your own PEL — for crash recovery or at-startup re-processing XREADGROUP GROUP workers c1 STREAMS events 0 → reads c1's PEL entries, re-delivers them (they stay in PEL) # Block forever for new work XREADGROUP GROUP workers c1 BLOCK 0 COUNT 10 STREAMS events > → waits until entries arrive, then delivers to c1
The > token means "deliver entries that have never been delivered to any consumer in this group." It advances the group's last-delivered-id cursor. Entries delivered via > go onto the receiving consumer's PEL and stay there until an XACK removes them.
Passing 0 (or any specific ID) instead of > reads from a consumer's PEL — every entry in the PEL is re-delivered. This is how a restarted consumer resumes: it had IDs 1700000000000-0 through 1700000000009-0 in its PEL before the crash, and after restart it calls XREADGROUP GROUP workers c1 STREAMS events 0 to get those back and reprocess them.
XACK — Acknowledging and Removing from PEL
# Acknowledge one entry XACK events workers 1700000000000-0 → 1 ← 1 entry removed from PEL # Acknowledge multiple entries at once (batch ack) XACK events workers 1700000000000-0 1700000000000-1 1700000000000-2 → 3 ← 3 entries removed from PEL # Acknowledge all entries currently in your PEL # (fetch your PEL first, then call XACK with all IDs) XPENDING events workers ← shows all pending for group → then XACK with the IDs from the pending list
XACK removes the entry ID(s) from the consumer's PEL. Redis marks the entry as "acknowledged" in the group's internal state. The entry itself remains in the stream (unless you XTRIM). After XACK, the entry is never re-delivered to any consumer in this group — it's effectively consumed.
0 to get your PEL entries back and reprocess. This means your consumer must be idempotent — processing the same message twice must produce the same result as processing it once. Design your handlers accordingly (e.g., check if work is already done before doing it, use a unique work ID as a deduplication key).
Consumer Naming and Auto-Creation
Consumer names are arbitrary strings — they don't need to be pre-created. When XREADGROUP delivers to a consumer named c1, Redis creates the consumer record on the fly. This makes it easy to have one consumer per process: use the process ID or hostname as the consumer name.
# Consumer names are auto-created on first delivery # In your worker process: HOSTNAME=$(hostname) XREADGROUP GROUP workers $HOSTNAME BLOCK 0 STREAMS events > → each worker gets its own named consumer record if the process dies and restarts, it re-registers with the same name
Multiple Consumers Per Group
A consumer group can have N consumers, and each XREADGROUP call delivers entries to exactly one consumer. The assignment is deterministic: Redis picks the consumer with the fewest in-flight entries (round-robin within the group's consumer set). This is automatic load balancing — you don't need to implement it.
# Two consumers sharing the same group XREADGROUP GROUP workers c1 STREAMS events > ← gets some entries XREADGROUP GROUP workers c2 STREAMS events > ← gets disjoint entries # Check which entries belong to which consumer XPENDING events workers → shows per-consumer breakdown: [[ID, idle-time, consumer], ...] # If c1 crashes, c2's entries are untouched # c1's entries sit in c1's PEL — reclaimable via XCLAIM
XCLAIM and XAUTOCLAIM: Reclaiming Stale Work
The crash-recovery mechanism that makes streams fault-tolerant.
When Does Reclaiming Happen?
Picture this: consumer c1 calls XREADGROUP GROUP workers c1 ... and receives entry 1700000000000-0. The entry goes into c1's PEL. Then c1 crashes — power loss, OOM kill, whatever. Redis has no way to know c1 is dead. The entry stays in c1's PEL forever, visible to XPENDING but invisible to > (which only delivers to consumers with no PEL entry for that ID). Nobody can process it.
XCLAIM is the escape hatch: it transfers ownership of a pending entry from its current consumer to a new consumer. The transfer only happens if the entry has been idle for at least the specified minimum time — this prevents a slow consumer from having its work stolen by an eager neighbor.
XCLAIM Mechanics
# Basic: transfer one entry from c1 to c2 if it has been idle ≥ 30s XCLAIM events workers c2 30000 1700000000000-0 → [entry data] or nil if conditions not met # Claim multiple at once XCLAIM events workers c2 30000 1700000000000-0 1700000000000-1 1700000000000-2 → [entries that were successfully claimed] # Just show what would be claimed (dry run) XCLAIM events workers c2 30000 1700000000000-0 # returns the entry data if conditions met, nil otherwise # Minimum idle time: 60s — only claim entries that have been # in some consumer's PEL for at least 60 seconds without being acked XCLAIM events workers c2 60000 1700000000000-0 → nil if entry was delivered to c1 less than 60s ago
The idle time requirement is critical: if a consumer is slow but alive (say, it's processing a complex image that takes 90 seconds), a 60-second minimum prevents premature reclamation. Set the idle threshold above your expected processing time. If your average job takes 30s and your p99 is 2 minutes, set the idle threshold to 3–5 minutes to avoid reclaiming from slow-but-healthy consumers.
XAUTOCLAIM: Batch Reclamation
XCLAIM works on specific IDs you already know about. XAUTOCLAIM (Redis 6.2+) discovers stale entries automatically and claims them in a batch. It's designed for a watchdog loop that periodically scans for dead consumer work.
# Claim up to 100 entries idle ≥ 60s for consumer c2,
# starting from cursor 0 (beginning of PEL)
XAUTOCLAIM events workers c2 60000 0 COUNT 100
→ [[claimed_entry_ids], next_cursor, entries_data]
next_cursor is "0" when done, otherwise use in next call
# Full loop example (pseudocode):
cursor = "0"
while True:
result = XAUTOCLAIM events workers c2 60000 cursor COUNT 100
claimed_ids = result[0]
cursor = result[1] # "0" means done
for entry_id in claimed_ids:
# entry is now in c2's PEL — process it
process_entry(entry_id)
if cursor == "0":
sleep(30) # wait before next scan
cursor = "0" # reset to scan from beginning of PEL Delivery Count and Poison Messages
Every time XCLAIM transfers an entry, Redis increments the entry's delivery counter. You can see this counter via XPENDING ... IDLE. A high delivery count (e.g., 10+) on an entry that never gets acked is a poison message signal — consumers are taking it, crashing, and giving it back.
# See delivery count and idle time for every pending entry XPENDING events workers IDLE 0 - + 100 → [[id, idle_ms, consumer, delivery_count], ...] # If delivery_count > 5 for the same entry: # → move it to a dead-letter stream and alert XADD events:dead-letters * original_id $entry_id type "posion" count $delivery_count XDEL events $entry_id
Build this into your watchdog: if an entry's delivery count exceeds your threshold (e.g., 5), move it to a dead-letter stream (events:dlq) and XACK it from the original stream. This prevents infinite retry loops from saturating your PEL.
XPENDING: Observability for the Pending Entries List
The PEL is the heart of at-least-once delivery. XPENDING tells you what's stuck.
Reading the PEL
# Summary: count, min ID, max ID, per-consumer list
XPENDING events workers
→ [pending_count, min_id, max_id,
[[consumer, pending_count], ...]]
# Example output:
1) (integer) 3
2) "1700000000000-0"
3) "1700000000000-2"
4) 1) 1) "c1"
2) (integer) 2
2) 1) "c2"
2) (integer) 1
# 3 pending entries: IDs 1700000000000-0 to 1700000000000-2
# c1 has 2 pending, c2 has 1 pending The summary is useful for dashboards and alerts: if pending count grows above your expected maximum (e.g., num_consumers * expected_inflight), something is wrong.
Detailed Entry Inspection with IDLE
# List up to 100 pending entries, idle ≥ 0ms, with full details XPENDING events workers IDLE 0 - + 100 → [[id, idle_ms, consumer, delivery_count], ...] # List entries idle ≥ 60s XPENDING events workers IDLE 60000 - + 100 → only entries that have been in a PEL for 60+ seconds # Range-limited: only entries between two IDs XPENDING events workers - + COUNT 50 → up to 50 entries from the full pending range
The idle_ms field tells you how long since the entry was first delivered to a consumer (not when it was last claimed). An entry with idle_ms: 300000 (5 minutes) and delivery_count: 1 means one consumer took it 5 minutes ago and never acked it — it might be a slow processor, or the consumer crashed. An entry with idle_ms: 300000 and delivery_count: 5 means it bounced 5 times — a poison message.
Recovery Pattern: Re-read Your Own PEL at Startup
# Worker startup sequence
consumer_name = os.environ["HOSTNAME"]
while True:
# 1. Re-read anything we had pending before crash
stale_entries = XREADGROUP GROUP workers $consumer_name STREAMS events 0
# 2. Read new entries
new_entries = XREADGROUP GROUP workers $consumer_name BLOCK 0 COUNT 100 STREAMS events >
for entry in stale_entries + new_entries:
process(entry)
XACK events workers entry.id The dual-read pattern (PEL first, then new) ensures you never miss work. Redis delivers your stale entries (from before the crash) via the 0 (start-from-beginning) read, and fresh entries via >. Combining them in one loop lets you process everything without duplicates — an entry can appear in either the stale or new list, but not both, because entries delivered via > are added to PEL and entries in PEL are delivered by the 0 read.
PEL Memory: The Hidden Cost
The PEL lives in server memory, not client memory. Every pending entry reference occupies approximately 30–50 bytes on the Redis server (entry ID + consumer reference + metadata). A consumer that processes 10k messages per second and takes an average of 5 seconds per message will have ~50k entries in its PEL at any time, consuming ~2 MB of server RAM per consumer. With 100 consumers, that's 200 MB just for PEL tracking.
If your throughput is high and processing is slow, monitor PEL size via INFO clients and XINFO GROUPS. A growing PEL is a sign that consumers can't keep up — scale out or investigate processing bottlenecks.
# Monitor PEL size over time (Redis MONITOR output parsing) redis-cli --latency-history -r 1000 -i 10 XINFO GROUPS events # track "pel-count" per consumer in the group
Trimming: Keeping Streams Bounded
Streams grow forever without trimming. Here's how to bound them.
XADD with MAXLEN
# Every XADD enforces max 1000 entries — exact trim XADD events MAXLEN 1000 * type click user 42 # ~1000 means "trim to approximately 1000" — aligned to listpack boundaries # The stream may end up with 990 or 1010 entries (plus one node's worth) # This is the recommended mode for high-frequency appends XADD events MAXLEN ~ 1000 * type click user 42
Enforcing MAXLEN on every XADD has a cost: Redis must evaluate the trim condition on every append. For high-frequency streams (millions of appends per minute), this overhead accumulates. Use the approximate ~ variant always — it's both faster and semantically sufficient for most use cases. If you need an exact bound (e.g., for a rate-limited API), either accept the cost or use a separate maintenance job with XTRIM.
XTRIM: Standalone Trimming
# Trim to max 10000 entries without adding XTRIM events MAXLEN ~ 10000 # Time-based: drop entries older than a given ID (timestamp) XTRIM events MINID 1700000000000 → removes entries with IDs < 1700000000000 # Combine with COUNT for batch processing in a cron job XTRIM events MAXLEN ~ 100000 # run every 5 minutes on a high-volume stream
XTRIM is useful in a scheduled maintenance job rather than on every XADD. For example, a nightly job can enforce a data retention policy (keep last 7 days = MINID based on 7 days ago) without adding trim overhead to the hot path of your event producers.
When to Use MAXLEN vs MINID
| Scenario | Use | Why |
|---|---|---|
| Keep last N entries | MAXLEN ~ N | Count-based retention, rolling window |
| Keep last X hours/days | MINID with timestamp | Time-based retention, compliance |
| High-frequency append | ~ approximate variant | O(log N) vs O(N) trim cost |
| Exact count guarantee | MAXLEN (no tilde) | Needed for rate limiting |
| Maintenance job | XTRIM separately | Keeps XADD hot path clean |
Blocking Operations: BLOCK, COUNT, and Stream Behavior
How blocking works under the hood and common gotchas.
How BLOCK Works
When you call XREAD BLOCK 0 STREAMS events $, Redis suspends the client connection and registers an interest in the events stream for new entries with ID greater than $ (the max ID at call time). No CPU is consumed while waiting. When another client calls XADD events * ..., Redis locates the blocked clients waiting on that stream, evaluates whether the new ID exceeds their start position, and if so, wakes them up and delivers the entry.
If multiple clients are blocked with $, they all wake up and receive the same new entry — this is the broadcast behavior of XREAD (not XREADGROUP). For work-queue semantics, use XREADGROUP which delivers to exactly one consumer per group.
BLOCK Timeout Behavior
# Block for 5 seconds, then return nil (timeout) XREAD BLOCK 5000 STREAMS events $ → (nil after 5s) # Block forever XREAD BLOCK 0 STREAMS events $ → waits indefinitely # No block: return immediately with whatever is available XREAD STREAMS events $ → returns current entries, does not wait
When a BLOCK times out, Redis returns a null reply (nil in RESP protocol). Your client library should interpret this as "no data within timeout" rather than an error. Some libraries expose this as an empty list or a None return value.
COUNT and the $ Idiosyncrasy
The $ ID in XREAD is evaluated at the moment the command is processed, not when the entry is appended. If you run:
CLIENT 1: XREAD BLOCK 0 STREAMS events $ CLIENT 2: XADD events * type click user 42 → Client 1 wakes up and receives the entry (correct)
But if the stream is empty when Client 1 calls XREAD with $, and no entry arrives before the next XADD, then the new entry might not be delivered if the client has already evaluated $ as "the max ID at call time" (which was 0, meaning empty). The correct pattern is:
# Correct tail-follow pattern: XREAD BLOCK 0 STREAMS events $ # If stream is empty at call time, the BLOCK waits for the *next* entry # because $ means "IDs greater than what exists at call time" plus the blocking hook # The subtle case: if you call XREAD $ when the stream is empty, # then the first XADD creates the stream and delivers to the blocked client # This is correct. The gotcha is: if the stream already has entries and you # call XREAD $, you only see entries ADDED AFTER the call, not the current tail.
Multiple Blocked Clients Per Group
In XREADGROUP, only one blocked consumer per group wakes up per new entry. Multiple consumers in the same group each have their own PEL, and Redis's delivery algorithm assigns the new entry to the consumer with the fewest in-flight messages. If a consumer is blocked with BLOCK 0 and has no PEL entries, it will be the first to receive new work.
# Two consumers blocked on the same group # When a new entry arrives, Redis picks the consumer with fewer PEL entries # If both have 0, Redis picks in round-robin order of registration Consumer c1: XREADGROUP GROUP workers c1 BLOCK 0 STREAMS events > Consumer c2: XREADGROUP GROUP workers c2 BLOCK 0 STREAMS events > # Entry arrives → delivered to one of them, not both # Entry goes into that consumer's PEL until XACK
Unblocking Without Data (BLMOVE and XREADGROUP)
Blocking operations are tied to the specific stream and ID pattern. A client blocked on XREAD STREAMS events $ is unblocked by any entry whose ID is greater than the $ that was current when the command was issued. If you want to be woken on any new entry (not just those after your block call), use 0 as the start ID:
# Block from the current tail — any new entry wakes you XREAD BLOCK 0 STREAMS events $ # Equivalent but reads from last delivered ID + 1 # If the stream has entries up to ID 1700000000000-9, this blocks until 1700000000001-0 # This is the same as $ in practice, but evaluated differently
Monitoring and Debugging Streams
XINFO commands and Redis metrics that tell you what's happening.
XINFO STREAM
# Basic stream info XINFO STREAM events → length: 1048576 first-entry: [1700000000000-0, [type, click, user, 42]] last-entry: [1700000009999-0, [type, view, user, 99]] entries-deleted: 0 groups: 4 last-delivered-id: 1700000009500-0 pel-count: 15234 ← total PEL entries across all consumers irrelevant-entries: 0 total-entires-per-chip: ... # Full detail (includes radix tree stats) XINFO STREAM events FULL → radix-tree-nodes: 12345 radix-tree-bytes: 204800 listpack-length: 1024 listpack-bytes: 1048576 ...
XINFO GROUPS
XINFO GROUPS events → [[name, consumers, pending, last-delivered, pel-count, last-delivered-entry-id], ...] 1) 1) "workers" 2) (integer) 3 ← 3 consumers in this group 3) (integer) 15234 ← total pending entries in this group 4) "1700000009500-0" ← last-delivered ID for this group 5) (integer) 15234 ← pel-count (same as pending in this group) 6) "1700000009500-0" ← last-delivered-entry-id (same as above)
A growing pending number while your consumer throughput is constant means consumers can't keep up. A non-zero pel-count on consumers that are supposedly alive means those consumers are hanging (not calling XACK). Use this to detect both throughput issues and zombie consumers.
XINFO CONSUMERS
XINFO CONSUMERS events workers → [[name, pending, idle], ...] 1) 1) "consumer-c1" 2) (integer) 5234 ← entries in c1's PEL 3) (integer) 30000 ← idle time in ms (lastseen) 2) 1) "consumer-c2" 2) (integer) 10000 3) (integer) 0 ← active, just delivered
The idle field shows milliseconds since the consumer was last seen delivering a message. A consumer with idle: 600000 (10 minutes) and a non-zero pending count is likely dead. Trigger XAUTOCLAIM to reclaim its work.
Redis INFO and Memory Monitoring
# Check stream memory in Redis INFO INFO keyspace → db0:keys=5,expires=0,avg_ttl=0 stream:keys=1,... # Memory per key MEMORY USAGE events → 1048576 ← bytes used by the stream (radix tree + listpacks, excl PEL) # For PEL memory, combine XINFO CONSUMERS pending counts # Each pending entry = ~30-50 bytes server-side # total_pel_bytes ≈ sum(pending per consumer) × 40
Latency: XADD and XREADGROUP in the Critical Path
XADD at the tail of a stream is highly optimized — Redis appends to the tail listpack and only creates new radix tree nodes when the listpack fills. In the common case (listpack not full), XADD is O(1) amortized with ~50µs median latency on commodity hardware. This makes streams suitable for high-throughput ingestion pipelines.
XREADGROUP with BLOCK is O(1) to register the blocked client and O(1) to deliver when data arrives. The delivery itself involves copying the entry data to the client — larger payloads mean proportionally more latency. For payloads larger than ~10KB, consider storing a reference (object ID, S3 URL) in the stream entry and storing the actual data elsewhere.
# Latency test (Redis CLI) redis-cli --latency-history -h localhost -p 6379 XADD mystream MAXLEN ~ 100000 * type test data "x" # track p50, p99, p99.9 over 1000 samples
Redis Streams vs Kafka: When to Choose Which
A detailed conceptual mapping and trade-off analysis.
Conceptual Mapping
| Kafka Concept | Redis Streams Equivalent | Notes |
|---|---|---|
| Topic | Stream (Redis key) | A stream is like a topic with a single partition |
| Partition | Consumer group (single stream) | Within one stream, a group partitions across consumers |
| Consumer group | XREADGROUP + consumer name | Group tracks last-delivered; consumer gets disjoint work |
| Offset | Entry ID (last-delivered-id) | ID replaces Kafka's integer offset |
| Producer | XADD | Append-only, server assigns ID |
| Consumer | XREADGROUP + XACK | Group read + explicit acknowledgment |
| Commit | XACK | Removes from PEL; entry won't be redelivered in this group |
| Consumer crash → rebalance | Entry stays in PEL, XCLAIM reclaims | At-least-once in both systems |
| Partition count | Multiple stream keys (manual sharding) | One stream = one slot; cannot split across nodes |
| Retention policy | MAXLEN / MINID / XTRIM | Both support time-based and size-based retention |
| Dead letter queue | Separate stream + manual XADD | No built-in DLQ; implement in application or via XCLAIM+XDEL |
| Log compaction | No native equivalent | Streams don't support compaction (yet); use a separate key per entity |
Latency and Throughput
Redis Streams live in server RAM. Every operation (XADD, XREADGROUP, XACK) is a local memory operation with sub-millisecond median latency. Kafka writes to disk (SSDs) and has network overhead for producers and consumers. Redis can sustain ~500k XADDs/second on a single node; Kafka on comparable hardware sustains ~1M messages/second but with higher per-message latency due to disk and network layers.
For latency-sensitive applications (sub-ms processing deadlines), Redis wins. For throughput-intensive workloads where latency is measured in tens of milliseconds, Kafka's durability and scale-out story become more attractive.
Durability Trade-offs
Redis Streams persistence is eventual: RDB snapshots every N seconds + AOF (every-second or every-write). If Redis crashes between the last persistence and the crash, up to N seconds of entries are lost. Kafka's configurable acks (0, 1, all) and ISR (In-Sync Replica) replication provide stronger durability guarantees at the cost of latency.
# Redis: configure for stronger durability # In redis.conf: appendonly yes appendfsync everysec ← or "always" for stronger (higher latency) # OR for replication: min-replicas-to-write 1 replica-read-only no # With replica lag < 1s, you get near-real-time replication # But: the stream itself lives in one node's RAM — no tiered storage
Scaling Model
Kafka scales by adding partitions to topics — each partition can be on a different broker. You can increase parallelism by adding partitions and consumers. Redis Streams scales by sharding across multiple stream keys (since one stream occupies one Redis slot). You hash your partition key to a stream number: stream:events:{partition_id}. Consumer groups live per-stream; you manage the partition-to-consumer mapping in your application.
# Sharding strategy for high throughput
partitions = 16
stream_name = f"events:{hash(entity_id) % partitions}"
# Producer: route to partition
XADD stream_name * type click user 42
# Consumer: run one worker per partition
stream_for_worker = f"events:{worker_partition}"
XREADGROUP GROUP workers consumer_name BLOCK 0 STREAMS stream_for_worker > The trade-off: Kafka's partition count is fixed at topic creation (though you can add partitions with downtime risks). Redis stream sharding requires you to pick a partition count upfront, but you can re-partition by moving data if needed (or use a consistent hashing approach for dynamic addition). Neither model is clearly superior — it depends on your operational maturity.
When to Use Streams Over Kafka
Fan-Out Patterns with Consumer Groups
Multiple groups, multiple consumers, and why group composition matters.
Multiple Groups = Multiple Pipelines
A single stream can have multiple consumer groups. Each group maintains its own cursor (last-delivered-id) and PEL. This means you can have independent consumers reading the same data for different purposes — without coordination. This is the fan-out pattern, and it rivals Kafka's "multiple consumer groups from the same topic" model.
# One stream, two independent groups XGROUP CREATE events analytics 0 ← analytics pipeline XGROUP CREATE events notifications 0 ← notification pipeline # Analytics consumer: reads all events, computes metrics XREADGROUP GROUP analytics worker-1 BLOCK 0 STREAMS events > → gets every event ever added # Notifications consumer: reads all events, sends alerts XREADGROUP GROUP notifications worker-1 BLOCK 0 STREAMS events > → also gets every event, independently tracked # Each group has its own cursor — they don't interfere # An event acknowledged in analytics is NOT removed from notifications
This is powerful: you can add a new consumer group for a new use case without touching producers or existing consumers. The stream is the source of truth; each group is an independent subscriber.
Multi-Stage Pipelines
Fan-out also enables pipeline stages: one group reads from a stream, does initial processing, and writes to another stream for downstream consumers. This is a common pattern for event processing topologies.
# Stage 1: raw events ingested XADD raw-events * type click user 42 session "abc" # Stage 1 consumer: enrich and write to enriched-events XREADGROUP GROUP enrich workers block 0 streams raw-events > → enrich entry with user data from Redis hash → XADD enriched-events * original_id $enriched_data # Stage 2 consumer (analytics): reads enriched stream XREADGROUP GROUP analytics analytics-worker block 0 streams enriched-events > → compute metrics, XACK when done # Stage 2 consumer (notifications): reads enriched stream independently XREADGROUP GROUP notifications notification-worker block 0 streams enriched-events > → send push notification, XACK when done # Each pipeline stage has its own group, own cursor, independent progress
Consumer Load Balancing Within a Group
Within a single group, Redis automatically distributes entries across consumers in a round-robin fashion weighted by each consumer's PEL size. The consumer with the fewest pending entries gets the next entry. This is automatic and requires no application-level coordination.
# Scaling workers: just add more consumers to the group # No rebalancing command needed — Redis handles it on next XREADGROUP XINFO GROUPS events → workers: consumers=2, pending=10000 # Add a third worker (new consumer registers itself) # Next XREADGROUP call from any consumer distributes load across all 3 # To remove a consumer: it just stops calling XREADGROUP # Its PEL entries remain stale; run XAUTOCLAIM periodically to reclaim
Broadcast + Work Queue Hybrid
Combine XREAD (broadcast) with XREADGROUP (work queue) on the same stream for hybrid patterns: one group does analytics on all events (broadcast semantics), while another group distributes events to workers (work queue semantics). The stream stores the data once; both consumption patterns coexist.
# Analytics group: every event is delivered to every consumer in this group XGROUP CREATE events analytics_group 0 XREADGROUP GROUP analytics_group analytics-c1 STREAMS events > → analytics gets all events # Workers group: each event delivered to one worker XGROUP CREATE events workers 0 XREADGROUP GROUP workers worker-1 STREAMS events > → workers get partitioned events
Time Series: Sensor Data, Metrics, and Log Aggregation
Streams are a natural fit for time-ordered event data. Here's how to use them well.
Sensor Telemetry
Each sensor reading becomes an XADD with the sensor ID as part of the entry. Use a stream per sensor type (or per time window) and consumer groups for each processing pipeline (storage, alerting, analytics). Entry IDs carry the millisecond timestamp, making time-range queries natural.
# Temperature sensor: one entry per reading XADD sensors:temperature * sensor_id "sensor-A" temp 23.5 humidity 45 # Query: last hour's readings last_hour_id = (now_ms - 3600_000) * 1000000 # approximate XRANGE sensors:temperature $last_hour_id + COUNT 1000 # Aggregation: consumer groups per aggregation window # 1-minute rollups: group "rollup-1m" reads all, computes avg, writes to Redis hash # 5-minute rollups: group "rollup-5m" reads all, computes avg, writes to Redis hash # Retention: keep 7 days XTRIM sensors:temperature MINID $((now_ms - 7*24*3600*1000))-0
Application Metrics
Instrument your application to emit metrics as stream entries. Each entry carries the metric name, value, tags, and timestamp. Consumers aggregate into time windows and write rollups to Redis hashes or a dedicated time-series database.
# Emit: latency metric
XADD metrics:latency * service "api" endpoint "/orders" method "POST"
latency_ms 142 status 200 timestamp 1700000000000
# Emit: counter metric
XADD metrics:counter * service "api" endpoint "/orders" method "POST"
count 1 timestamp 1700000000000
# Consumer: aggregate into 1-minute buckets
# 1. Read new metrics via XREADGROUP
# 2. Compute bucket key: f"metrics:1m:{service}:{endpoint}:{minute_ts}"
# 3. HINCRBY bucket_key latency_sum $latency_ms
# 4. HINCRBY bucket_key count $count
# 5. HSET bucket_key minute_ts $minute_ts
# 6. XACK when done Log Aggregation
Stream entries as structured log lines (JSON fields). Use XREADGROUP to distribute log processing across workers — parsing, enrichment, indexing. The stream's natural ordering by timestamp means logs are processed in the order they occurred, which simplifies debugging across distributed services.
# Structured log entry
XADD logs:production * level "ERROR" service "order-service"
trace_id "abc123" message "failed to connect to db" error_code "ECONNREFUSED"
host "prod-03" timestamp 1700000000000
# Indexing consumer: parse and write to search index (e.g., Elasticsearch)
XREADGROUP GROUP indexer indexer-1 BLOCK 0 STREAMS logs:production >
→ parse fields, build index document, bulk insert to search engine
→ XACK when indexed
# Alerting consumer: filter ERROR and above, send to PagerDuty
XREADGROUP GROUP alerting alerting-1 BLOCK 0 STREAMS logs:production >
→ if level in ["ERROR", "CRITICAL"]: send_alert(entry)
→ XACK when alerted (or after retry limit exceeded) Per-Entity Streams
For entity-centric event sourcing, use one stream per entity (e.g., events:user:42, events:order:99). This gives you natural filtering and partitioning, and the stream ID encodes the entity's event sequence. This pattern avoids the "log compaction" problem Kafka solves — you can read the full history of any entity by reading its stream.
# Entity stream pattern
STREAM_NAME = f"events:user:{user_id}"
XADD $STREAM_NAME * event "profile_updated" fields_to_update
# Consumer for user:42: reads all events for this user
XREADGROUP GROUP user-events worker-1 BLOCK 0 STREAMS events:user:42 >
→ rebuild user state by replaying events
# Cross-entity consumer: fan out to entity streams
# Read from a "routing stream" that maps entity IDs to entity stream keys
XREADGROUP GROUP router router-worker BLOCK 0 STREAMS entity-router >
→ extract target_entity_stream from entry
→ XADD $target_entity_stream * ... (forward the event) Practical Examples: Python, Go, and Node.js
Production-ready code patterns for each language.
Python with redis-py
import redis
import json
import os
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
GROUP = 'workers'
STREAM = 'events'
CONSUMER = os.environ['HOSTNAME']
def process_entry(entry_id, fields):
"""Idempotent handler — safe to call multiple times."""
event_type = fields.get('type')
user_id = fields.get('user')
if event_type == 'click':
print(f"Processing click for user {user_id}")
# Your business logic here
return True
elif event_type == 'view':
print(f"Processing view for user {user_id}")
return True
return False
def consume_loop():
# Register consumer group if not exists
try:
r.xgroup_create(STREAM, GROUP, id='0', mkstream=True)
except redis.ResponseError as e:
if 'BUSYGROUP' not in str(e):
raise
while True:
# Re-read stale PEL entries first (crash recovery)
entries = r.xreadgroup(GROUP, CONSUMER, {STREAM: '0'}, count=100)
# Read new entries
new_entries = r.xreadgroup(GROUP, CONSUMER, {STREAM: '>'}, count=100, block=5000)
all_entries = (entries or []) + (new_entries or [])
for stream_name, messages in all_entries:
for entry_id, fields in messages:
try:
success = process_entry(entry_id, fields)
if success:
r.xack(STREAM, GROUP, entry_id)
except Exception as e:
print(f"Error processing {entry_id}: {e}")
# Don't XACK — entry stays in PEL for retry
# After N failures, move to DLQ
raise
# Run with: HOSTNAME=worker-1 python consumer.py
if __name__ == '__main__':
consume_loop() Go with go-redis/redis
package main
import (
"context"
"fmt"
"os"
"time"
"github.com/redis/go-redis/v9"
)
var ctx = context.Background()
func main() {
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
})
defer rdb.Close()
group := "workers"
stream := "events"
consumer := os.Getenv("HOSTNAME")
if consumer == "" {
consumer = fmt.Sprintf("consumer-%d", os.Getpid())
}
// Create group if not exists
err := rdb.XGroupCreateMkStream(ctx, stream, group, "0").Err()
if err != nil && err.Error() != "BUSYGROUP Consumer Group name already exists" {
panic(err)
}
for {
// Read new entries (blocks up to 5s)
streams, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
Group: group,
Consumer: consumer,
Streams: []string{stream, ">"},
Count: 100,
Block: time.Second * 5,
}).Result()
if err != nil && err != redis.Nil {
panic(err)
}
for _, streamResult := range streams {
for _, msg := range streamResult.Messages {
fields := msg.Values
eventType, _ := fields["type"].(string)
userID, _ := fields["user"].(string)
fmt.Printf("Processing %s for user %s\n", eventType, userID)
// Process and ack
if err := process(eventType, userID); err != nil {
fmt.Printf("Error: %v\n", err)
continue // don't ack — let it sit in PEL
}
rdb.XAck(ctx, stream, group, msg.ID)
}
}
}
}
func process(eventType, userID string) error {
time.Sleep(10 * time.Millisecond) // simulate work
return nil
} Node.js with ioredis
import Redis from 'ioredis';
const r = new Redis({ host: 'localhost', port: 6379 });
const GROUP = 'workers';
const STREAM = 'events';
const CONSUMER = process.env.HOSTNAME || `consumer-${process.pid}`;
async function processEntry(entryId, fields) {
const eventType = fields.type;
const userId = fields.user;
console.log(`Processing ${eventType} for user ${userId}`);
// Simulate async work
await new Promise(r => setTimeout(r, 10));
// Return true on success (ack), false on failure (keep in PEL)
return true;
}
async function consume() {
// Create consumer group if not exists
try {
await r.xgroup('CREATE', STREAM, GROUP, '0', 'MKSTREAM');
} catch (e) {
if (!e.message.includes('BUSYGROUP')) throw e;
}
while (true) {
try {
// XREADGROUP with block — blocking call
const result = await r.xreadgroup(
'GROUP', GROUP, CONSUMER,
'COUNT', '100',
'BLOCK', '5000',
'STREAMS', STREAM, '>'
);
if (!result) {
// timeout — no new entries, loop and retry
continue;
}
const [, messages] = result[0]; // [stream_name, [messages...]]
for (const [entryId, fields] of messages) {
const fieldsObj = Object.fromEntries(
fields.reduce((acc, val, i) => {
if (i % 2 === 0) acc.push([val, fields[i + 1]]);
return acc;
}, [])
);
const success = await processEntry(entryId, fieldsObj);
if (success) {
await r.xack(STREAM, GROUP, entryId);
}
}
} catch (err) {
console.error('Error in consume loop:', err);
await new Promise(r => setTimeout(r, 1000)); // back off on error
}
}
}
consume().catch(console.error); XAUTOCLAIM Watchdog (All Languages Share This Pattern)
# Standalone watchdog script — run as a cron or separate process
# Claims entries idle >= 5 minutes and moves them to a DLQ or retries
while true:
result = XAUTOCLAIM events workers watchdog 300000 0 COUNT 100
claimed_ids = result[0]
next_cursor = result[1]
for entry_id in claimed_ids:
# Check if this entry has exceeded max delivery count
entry_data = XRange events $entry_id +
if entry_data.delivery_count > 5:
XADD events:dlq * original_id $entry_id type "exceeded_retries"
XACK events workers $entry_id
else:
# Re-process: just let normal XREADGROUP pick it up
# or explicitly deliver to a retry consumer
pass
if next_cursor == "0":
break
else:
cursor = next_cursor Common Gotchas and Edge Cases
The subtle behaviors that trip up production deployments.
XADD to a non-existent stream creates it. There's no XCREATE. To create a stream with a consumer group before any data exists, use MKSTREAM: XGROUP CREATE events workers 0 MKSTREAM. Without MKSTREAM, the group creation fails if the stream doesn't exist.1700000000000-0 added, then XADD stream 1700000000000-0 ... fails with ERR The ID specified is smaller than the current item ID. Use the sequence number to make it greater: 1700000000000-1.XTRIM stream MINID 1700000000000 uses lexicographic comparison of the ID string. Since IDs are zero-padded, this works correctly for timestamps, but if you use custom non-numeric IDs, verify the lexicographic order matches your intended order.BLOCK 0 explicitly if you want indefinite blocking. Without BLOCK, XREADGROUP returns immediately with whatever is available (or empty).> means "give me new entries not in any PEL." Entries in a consumer's PEL are only delivered when you pass a specific ID (like 0) or the exact PEL entry ID. Calling XREADGROUP GROUP g c STREAMS s > repeatedly will not re-deliver entries that c1 has in its PEL — only new entries.MAXLEN ~ 1000 might result in 1000 entries, or 1100 (one extra listpack worth), depending on entry size and listpack packing. If you have a strict budget (e.g., memory limit per stream), use the approximate variant and monitor actual memory with MEMORY USAGE. If you have a hard count requirement, use MAXLEN (without ~) but be aware it's O(N) on the radix tree.WORKERS and workers are different groups. Pick a naming convention and stick to it. Use all-uppercase for system groups (e.g., ANALYTICS, NOTIFICATIONS) and all-lowercase for application groups to avoid confusion.XACK stream group id where id is not in the PEL returns 0 (no entries ack'd) but doesn't error. This makes it safe to XACK multiple times — idempotent acknowledgment. This matters for exactly-once semantics in your application: it's safe to XACK the same entry on a retry after a partial failure.XINFO STREAM "events:my-stream". In client libraries, pass the name as a string — no special handling needed.FAQ
What does the special ID '$' mean in XREAD?
It means 'the maximum ID currently in the stream'. XREAD BLOCK 0 STREAMS mystream $ blocks until a new entry is added, then returns it. Useful for tail-following consumers. Combined with BLOCK 0 (block forever), this is how a consumer follows a stream in real time without polling. The '>' symbol is the consumer-group equivalent: 'give me entries not yet delivered to any consumer in this group'.
What's the difference between XREAD and XREADGROUP?
XREAD is fanout — every reader sees every entry. XREADGROUP partitions entries across consumers in a group: each entry is delivered to exactly one consumer (in PEL — pending entries list — until ack'd). XREADGROUP is the work-queue mode; XREAD is the broadcast mode. For most queue use cases you want XREADGROUP with named consumers and XACK after processing.
What is XCLAIM for?
Reassigning a pending message from one consumer to another. If consumer-1 took message 123 from a group, then crashed before XACK, the message stays pending forever attached to consumer-1. XCLAIM lets another consumer (or a janitor process) take ownership: XCLAIM mygroup consumer-2 60000 123 transfers ownership if the message has been pending for at least 60000 ms. XAUTOCLAIM (since 6.2) does this automatically in batches: 'reclaim anything pending longer than X'.
How do MAXLEN and MINID cap stream growth?
XADD mystream MAXLEN 1000 * field value keeps the stream at most 1000 entries — older entries are dropped from the head. MAXLEN ~ 1000 (note the tilde) trims approximately, allowing efficient trimming aligned to internal node boundaries. MINID drops entries with IDs less than the given timestamp — useful for time-based retention. Approximation is much cheaper than exact: trimming is O(log N) on the radix tree per call.
How do streams compare to Kafka?
Streams give you Kafka's append-only log model, consumer groups, and explicit acks — but in Redis, sharded by Redis Cluster slot, with the entire stream living in one master's RAM (and AOF/RDB on disk). Kafka separates compute from storage, partitions independently, and survives node loss via topic replication. Streams are great for sub-millisecond latency on small-to-medium volumes (gigabytes, not terabytes) and integration with the rest of Redis. Kafka is better for high-volume, multi-day retention, and partition-independent throughput.
How is a stream stored internally?
A radix tree where keys are entry IDs (timestamps + sequence) and values are listpacks holding multiple entries. Each listpack groups entries with the same field set (most streams have a stable schema, so consecutive entries share field names — only values vary). Field names are deduplicated via a master entry per listpack. The radix tree gives O(log N) seek to any ID; listpack groups give compact storage and cache-friendly scans.
What is the PEL and why does it matter?
The Pending Entries List (PEL) is the core mechanism for at-least-once delivery. When XREADGROUP delivers an entry to a consumer, Redis adds it to that consumer's PEL and waits for an XACK to remove it. Until then, the entry is tracked — if the consumer dies, the entry remains in the PEL and can be reclaimed via XCLAIM. The PEL lives in server memory (not just the client), so even a full consumer crash still leaves the door open for recovery. The trade-off is that the PEL can grow unbounded for slow consumers, consuming memory proportional to unacknowledged messages.
Can a stream be sharded across multiple Redis nodes?
Yes, via Redis Cluster. A stream's key is hashed to a single slot, so the entire stream lives on one master node. All entries in the stream share the same slot — you cannot split one stream across nodes. For horizontal throughput, use multiple stream keys (e.g., streams:events:0, streams:events:1) and hash your partition key to distribute across them. Consumer groups are per-stream; a group on stream-0 does not interact with a group on stream-1. This is different from Kafka where one topic has many partitions that can each be on different brokers.
What happens if I XREAD from an ID that no longer exists?
Redis returns an empty result (nil), not an error. The stream may have been XTRIM'd or MAXLEN'd to remove older entries. Your application needs to handle this gracefully — either skip ahead to a known-good checkpoint, or treat the missing range as a gap and start from the oldest available entry (ID 0 or a saved cursor). Never assume the stream retains a specific historical range.
How does XINFO help with debugging?
XINFO STREAM gives you stream length, first/last entry IDs, groups count, entries deleted, and the radix-tree宇 node cardinality. XINFO GROUPS lists all consumer groups with PEL size, last-delivered ID, and pending count per consumer. XINFO CONSUMERS shows per-consumer details: name, seen-at timestamp, active idle time, and PEL count. Together these tell you exactly where things are stuck.
What's the difference between XREAD and XRANGE for reading history?
XREAD is for consuming new entries (forward cursor from a given ID). XRANGE is for arbitrary random-access: you specify a start and end ID (inclusive), and Redis seeks to that position in the radix tree and scans. XRANGE is O(log N + number of entries returned), so it's efficient for small windows but can be slow if you request a large range without a COUNT limit.