Build Kafka (13 scenes)
Scene 08 · Exactly-once — three monotonic counters
PID, epoch, group-generation — three independent fences against zombies.
Previously

Retries can duplicate. Restarts can zombie. Rebalances can hand the same partition to a stale consumer. Exactly-once isn't one mechanism — it's three monotonic counters, each fencing a different failure mode.

Scene 08
Exactly-once — three monotonic counters
Diagram
A producer with a Transactional ID, its current PID and epoch shown as badges. Records flow into a partition wrapped in a transaction; sendOffsetsToTransaction includes the consumer-group generation. Three fence checks light up at the broker — producer sequence numbers (dedupe retries), producer epoch (zombie producer fenced), and group generation (zombie task post-rebalance).
TOPIC A · INPUT0123offset 0Streams taskread → process → writeConsumerreads Topic A · inputProducerwrites Topic B · outputTxn CoordinatorstateIDLEFENCESPIDPID + seq#Epochtransactional.idGroup-Gengroup-gen (KIP-447)TOPIC B · OUTPUTLSO=0Downstreamisolation = read_committed
Watch one transaction at a time: the consumer reads a cell from Topic A, the producer writes a transformed cell to Topic B (uncommitted, dashed), the coordinator walks open → prepareCommit → committed, and the cell flips solid as the LSO snaps forward. The read_committed downstream below sees the batch appear atomically — that's what the LSO line is gating.
Implementation
Producer.send
(pid, seq) tags every record — dedupe within a session
1def send(record):
2 record.pid = self.pid # assigned by InitProducerId
3 record.seq = self.next_seq[record.partition]
4 self.next_seq[record.partition] += 1
5 record.epoch = self.epoch # checked by broker
6 resp = leader.produce(record)
7 if resp == DuplicateSequenceException:
8 return # retry already landed
9 if resp == ProducerFencedException:
10 raise # zombie — give up
Coordinator.initProducerId
epoch bump — every restart fences the previous incarnation
1def initProducerId(transactional_id):
2 state = self.txn_log[transactional_id]
3 if state.has_open_txn:
4 abort_transaction(state) # roll back zombie's writes
5 state.epoch += 1 # monotonic, the fence
6 state.pid = state.pid or new_pid()
7 self.txn_log[transactional_id] = state
8 return (state.pid, state.epoch)
Producer.sendOffsetsToTransaction
the offset-commit path that carries the group generation
1def sendOffsetsToTransaction(offsets, group_meta):
2 # group_meta = (group_id, generation, member_id)
3 coordinator.addOffsetsToTxn(
4 self.transactional_id, self.epoch,
5 group_meta.group_id,
6 )
7 coordinator.txnOffsetCommit(
8 offsets,
9 group_meta.generation, # KIP-447
10 group_meta.member_id,
11 )
Broker.handleProduce
three independent checks — drop any one, a zombie leaks
1def handleProduce(record):
2 state = self.producer_state[record.pid]
3 if record.epoch < state.epoch:
4 return ProducerFencedException # cross-restart fence
5 expected = state.last_seq[record.partition] + 1
6 if record.seq < expected:
7 return DuplicateSequenceException # within-session dedupe
8 if record.seq > expected:
9 return OutOfOrderSequenceException
10 log.append(record)
11 state.last_seq[record.partition] = record.seq
Not sure what to ask? Tap a question — the staff engineer answers in the chat panel.