Exactly-Once Semantics

Idempotent Producers, Transactions, and the Read-Process-Write Loop

Exactly-once in Kafka is built from two distinct primitives. The first is the idempotent producer: a per-producer, per-partition sequence number that lets the broker detect and discard duplicate retries. The second is transactions: a coordinator-driven protocol that atomically commits writes spanning multiple partitions along with the consumer offset commit. Together they give you the read-process-write pattern that powers Kafka Streams and most stream-processing pipelines.

The trick is that "exactly-once" is a property of the visible state, not the wire. Messages still get retried, brokers still fail over, and aborted transactions still leave bytes on disk. The protocol ensures consumers reading with isolation.level=read_committed never observe those duplicates, and that the offset commit is atomic with the output writes.

Architecture

A single transaction touches three brokers (or roles): the producer, the transaction coordinator, and the data partition leaders.

Producer transactional.id PID + epoch Transaction Coordinator __transaction_state leader for txn partition Partition leader topic-A:0 Partition leader topic-B:3 __consumer_offsets offsets partition initTransactions produce (PID, epoch, seq) Coordinator writes COMMIT/ABORT markers to every partition that received data Two-phase commit: PREPARE_COMMIT to __transaction_state, then markers to all data partitions, then COMMIT.

Key Numbers

50
__transaction_state partitions (default)
5
max.in.flight per connection (with idempotence, ≤5)
15s
transaction.timeout.ms default
3
replication factor for transaction state
2^31
producer epoch space (short, wraps)
7 days
transactional.id.expiration.ms default
int32
sequence number per (PID, partition)

Idempotent Producer: PID + Epoch + Sequence

With enable.idempotence=true, every batch carries a triple: producer ID (assigned by broker on InitProducerId), epoch, and a per-partition monotonic sequence number. The broker tracks the last 5 sequences it accepted per (PID, partition) and dedupes retries.

// Producer config — idempotent
Properties props = new Properties();
props.put("bootstrap.servers", "kafka-1:9092,kafka-2:9092");
props.put("enable.idempotence", "true");
// these are forced by idempotence, but be explicit:
props.put("acks", "all");
props.put("max.in.flight.requests.per.connection", "5");
props.put("retries", Integer.MAX_VALUE);

KafkaProducer<String,String> producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord<>("orders", key, value));

What the broker sees on retry: the same (PID=42, epoch=0, seq=17) batch arrives twice. The first write succeeds and the broker remembers seq=17 in the leader's producer state. The second arrives with a duplicate sequence; the broker returns the original offset without writing again. Out-of-order sequences (seq=20 when last was 17) are rejected with OutOfOrderSequenceException — the producer must abort and reset.

Transactions API

A transaction wraps multiple sends across multiple partitions, plus the consumer offset commit, into one atomic unit.

props.put("transactional.id", "orders-processor-1");
props.put("isolation.level", "read_committed"); // for the consumer side
KafkaProducer<String,String> producer = new KafkaProducer<>(props);
producer.initTransactions();  // fences any prior producer with same transactional.id

while (true) {
  ConsumerRecords<String,String> records = consumer.poll(Duration.ofSeconds(1));
  if (records.isEmpty()) continue;

  producer.beginTransaction();
  try {
    Map<TopicPartition,OffsetAndMetadata> offsets = new HashMap<>();
    for (ConsumerRecord<String,String> r : records) {
      String out = transform(r.value());
      producer.send(new ProducerRecord<>("orders-enriched", r.key(), out));
      offsets.put(new TopicPartition(r.topic(), r.partition()),
                  new OffsetAndMetadata(r.offset() + 1));
    }
    producer.sendOffsetsToTransaction(offsets, consumer.groupMetadata());
    producer.commitTransaction();
  } catch (ProducerFencedException e) {
    // a newer instance took over with the same transactional.id — exit
    producer.close();
    return;
  } catch (KafkaException e) {
    producer.abortTransaction();
  }
}

initTransactions() does two things: it discovers the transaction coordinator (the broker hosting the leader of the __transaction_state partition for this transactional.id, picked by hashing the id), and it bumps the epoch to fence any zombie. sendOffsetsToTransaction() is the magic ingredient — it writes to __consumer_offsets as a transactional record, so the offset commit is visible to other consumers only when the transaction commits.

The __transaction_state Topic

An internal compacted topic, one record per (transactional.id, state). The Transaction Coordinator is the broker leading the partition for a given id.

# Inspect transaction state (DEBUG only — internal topic)
$ kafka-console-consumer.sh --bootstrap-server kafka-1:9092 \
    --topic __transaction_state --from-beginning \
    --formatter "kafka.coordinator.transaction.TransactionLog\$TransactionLogMessageFormatter"

orders-processor-1::TransactionMetadata(
  transactionalId=orders-processor-1,
  producerId=1003,
  producerEpoch=4,
  txnTimeoutMs=60000,
  state=Ongoing,
  topicPartitions=Set(orders-enriched-0, orders-enriched-1, __consumer_offsets-12),
  txnStartTimestamp=1714742103000,
  txnLastUpdateTimestamp=1714742104100
)

State transitions: Empty → Ongoing → PrepareCommit → CompleteCommit (or PrepareAbort → CompleteAbort). Once PrepareCommit is durable on the coordinator's log, the transaction is decided — even if every involved broker dies and recovers, replay of the transaction log will resume and write commit markers.

Two-Phase Commit Protocol

A simplified trace of one successful transaction with sends to two partitions and an offset commit.

1. Producer  -> Coordinator  : InitProducerId(transactional.id="orders-processor-1")
   Coordinator -> Producer    : PID=1003, epoch=4

2. Producer  -> Coordinator  : AddPartitionsToTxn([orders-enriched-0, orders-enriched-1])
   Coordinator writes "Ongoing + partitions" to __transaction_state

3. Producer  -> Leader(orders-enriched-0) : ProduceRequest (transactional=true, PID=1003, epoch=4, seq=0..N)
   Producer  -> Leader(orders-enriched-1) : ProduceRequest ...

4. Producer  -> Coordinator  : AddOffsetsToTxn(__consumer_offsets-12, group=orders-group)
   Producer  -> Coordinator  : TxnOffsetCommit(offsets)

5. Producer  -> Coordinator  : EndTxn(commit=true)
   Coordinator writes "PrepareCommit" to __transaction_state    <-- decision point
   Coordinator -> Leader(orders-enriched-0) : WriteTxnMarker(COMMIT, PID, epoch)
   Coordinator -> Leader(orders-enriched-1) : WriteTxnMarker(COMMIT, PID, epoch)
   Coordinator -> Leader(__consumer_offsets-12) : WriteTxnMarker(COMMIT, PID, epoch)
   Coordinator writes "CompleteCommit" to __transaction_state

Consumer Side: isolation.level

The producer's transactional output is invisible to consumers unless they opt in.

props.put("isolation.level", "read_committed");
// default is read_uncommitted, which sees aborted records too

KafkaConsumer<String,String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(List.of("orders-enriched"));

With read_committed, the consumer's fetcher tracks the Last Stable Offset (LSO) per partition. The LSO is the offset before the earliest still-open transaction. Records past the LSO are buffered until their transaction commits or aborts; aborted records are filtered out via the abort index in the log segment. This adds latency proportional to the longest open transaction.

Tradeoffs and When Not to Use

Use exactly-once when

  • You are doing read-process-write with Kafka as both source and sink
  • Your downstream cannot dedupe (analytics counters, financial ledgers)
  • You can tolerate ~10-30% throughput overhead

Avoid when

  • Output is a non-Kafka system (DB, S3) — use upserts/idempotent writes there instead
  • Latency-critical (read_committed adds buffering delay)
  • You only need at-least-once and the consumer can dedupe by key
  • Transaction state replication factor cannot be guaranteed (small clusters)

Frequently Asked Questions

Does enabling idempotence guarantee exactly-once?

No. Idempotence (enable.idempotence=true) guarantees that retries from a single producer instance will not produce duplicate records on a single partition. It does not span producer restarts and does not coordinate across multiple partitions. For end-to-end exactly-once semantics across topics and across producer restarts, you need transactions (transactional.id) plus consumers with isolation.level=read_committed.

What does the transactional.id actually do?

The transactional.id is a stable string the producer chooses (e.g. 'orders-processor-1'). On producer.initTransactions(), the broker fences any prior producer with the same transactional.id by bumping its epoch. This guarantees that a zombie producer (one that hung in a GC pause and came back later) cannot commit records anymore: when it tries, the broker rejects it with ProducerFenced. The transactional.id must therefore be deterministic per logical processor instance.

Where are transaction states stored?

Transaction state lives in the internal __transaction_state topic, which is partitioned (default 50 partitions) and replicated. Each partition is owned by a Transaction Coordinator broker, which is the broker hosting the leader replica for that partition. The coordinator writes BEGIN, ADD_PARTITIONS_TO_TXN, PREPARE_COMMIT, COMMIT, and ABORT markers. After PREPARE_COMMIT is durable, the coordinator writes commit/abort markers to every partition that received data in the transaction.

What is the read-process-write pattern?

It is the canonical Kafka Streams loop: consume records from input topics, process them, produce results to output topics, and atomically commit the consumer offsets in the same transaction as the produced records. The producer.sendOffsetsToTransaction() call sends offsets to the consumer offsets topic as part of the transaction. If the transaction aborts, neither the output records nor the offset commit become visible, so reprocessing is safe.

Does exactly-once mean messages are delivered once?

Not quite. Exactly-once in Kafka means each input record contributes its effect to the output exactly once, despite retries, restarts, and failovers. Physical delivery may still happen many times — duplicates exist on the wire and on disk in aborted transactions — but consumers reading with read_committed see only committed records, and the offset commit is atomic with the output, so reprocessing produces the same final state.