Build Kafka (13 scenes)
Scene 08 · Exactly-once — three monotonic counters
PID, epoch, group-generation — three independent fences against zombies.
Previously
Retries can duplicate. Restarts can zombie. Rebalances can hand the same partition to a stale consumer. Exactly-once isn't one mechanism — it's three monotonic counters, each fencing a different failure mode.
Scene 08
Exactly-once — three monotonic counters
Diagram
A producer with a Transactional ID, its current PID and epoch shown as badges. Records flow into a partition wrapped in a transaction; sendOffsetsToTransaction includes the consumer-group generation. Three fence checks light up at the broker — producer sequence numbers (dedupe retries), producer epoch (zombie producer fenced), and group generation (zombie task post-rebalance).
Watch one transaction at a time: the consumer reads a cell from Topic A, the producer writes a transformed cell to Topic B (uncommitted, dashed), the coordinator walks open → prepareCommit → committed, and the cell flips solid as the LSO snaps forward. The read_committed downstream below sees the batch appear atomically — that's what the LSO line is gating.
Implementation
Producer.send
(pid, seq) tags every record — dedupe within a session
1def send(record):2 record.pid = self.pid # assigned by InitProducerId3 record.seq = self.next_seq[record.partition]4 self.next_seq[record.partition] += 15 record.epoch = self.epoch # checked by broker6 resp = leader.produce(record)7 if resp == DuplicateSequenceException:8 return # retry already landed9 if resp == ProducerFencedException:10 raise # zombie — give up
Coordinator.initProducerId
epoch bump — every restart fences the previous incarnation
1def initProducerId(transactional_id):2 state = self.txn_log[transactional_id]3 if state.has_open_txn:4 abort_transaction(state) # roll back zombie's writes5 state.epoch += 1 # monotonic, the fence6 state.pid = state.pid or new_pid()7 self.txn_log[transactional_id] = state8 return (state.pid, state.epoch)
Producer.sendOffsetsToTransaction
the offset-commit path that carries the group generation
1def sendOffsetsToTransaction(offsets, group_meta):2 # group_meta = (group_id, generation, member_id)3 coordinator.addOffsetsToTxn(4 self.transactional_id, self.epoch,5 group_meta.group_id,6 )7 coordinator.txnOffsetCommit(8 offsets,9 group_meta.generation, # KIP-44710 group_meta.member_id,11 )
Broker.handleProduce
three independent checks — drop any one, a zombie leaks
1def handleProduce(record):2 state = self.producer_state[record.pid]3 if record.epoch < state.epoch:4 return ProducerFencedException # cross-restart fence5 expected = state.last_seq[record.partition] + 16 if record.seq < expected:7 return DuplicateSequenceException # within-session dedupe8 if record.seq > expected:9 return OutOfOrderSequenceException10 log.append(record)11 state.last_seq[record.partition] = record.seq
Not sure what to ask? Tap a question — the staff engineer answers in the chat panel.