Build Kafka (13 scenes)
Scene 4.5 · Cluster, controller, and metadata
One controller per cluster; KRaft made the metadata itself a Raft log.
Previously
ISR membership, leader election, and topic config all need a single source of truth. One broker at a time is the controller — and KRaft made the metadata itself a replicated log, so Kafka has no external dependency.
Scene 4.5
Cluster, controller, and metadata
Diagram
A small cluster of brokers with leadership badges showing which one is the active controller. Partition replicas are scattered across brokers; the controller tracks the metadata (which replica leads which partition, who's in ISR) and replicates that metadata as a Raft log to the other controllers in KRaft mode.
Why this matters: a Kafka cluster is many brokers, but *somebody* has to decide which broker leads which partition, who's in the in-sync replica set, and what topics/configs even exist. That somebody is the controller — a ROLE one broker holds at a time. The top-right badge shows Broker 1 currently holds the controller role. The producer writes a few records to the partition leader (data plane); the controller (control plane) is along for the ride.
The big panel at the bottom shows the two ways Kafka has ever stored its metadata side-by-side. KRaft (left card) keeps it INSIDE the Kafka cluster as a Raft log replicated across 3 controllers — one system, one protocol. ZooKeeper (right card) kept it OUTSIDE the cluster in a 3-to-5-node ZK ensemble that you had to operate separately — two systems, two protocols. KIP-500 (2020) replaced ZK with KRaft for exactly the reasons listed on the cards.
Implementation
Controller.onBrokerFail
the active controller reacts when a broker stops heartbeating
1def onBrokerFail(brokerId):2 affected = [3 p for p in partitions4 if p.leader == brokerId5 ]6 for p in affected:7 newLeader = electLeader(p)8 record = PartitionChangeRecord(9 partition = p.id,10 leader = newLeader,11 leaderEpoch = p.epoch + 1,12 )13 metadataLog.append(record)14 # brokers fetch the new record and update their metadata cache
Controller.electLeader
pick a new leader from replicas still in the ISR
1def electLeader(p):2 for replicaId in p.isr:3 if replicaId in liveBrokers:4 return replicaId5 # ISR is empty — only an out-of-sync replica is left6 if unclean.leader.election.enable:7 return any(p.replicas & liveBrokers)8 return NO_LEADER # partition goes offline
MetadataLog.append
KRaft: metadata changes are a Raft log on the controller quorum
1def append(record): # record is a MetadataRecord2 if mode == 'kraft':3 # __cluster_metadata: 3-5 controller voters4 offset = raft.appendToQuorum(record)5 raft.waitForCommit(offset)6 else: # zookeeper7 zk.write(path, record.payload)8 zk.notifyWatchers()9 # every broker tails the log via the fetch protocol10 broadcastToBrokerCaches(record)
Not sure what to ask? Tap a question — the staff engineer answers in the chat panel.