Build Kafka (13 scenes)
Scene 07 · Rebalance — stop-the-world vs. cooperative
Eager revokes everyone; cooperative-sticky only the lanes that move.
Previously

Followers and leaders are sorted. Now the OTHER source of churn: consumers come and go, and the group has to reassign partitions. Eager rebalance is a 14-minute outage; cooperative is a 1-minute brief gap.

Scene 07
Rebalance — stop-the-world vs. cooperative
Diagram
A consumer group with several consumers and a topic with multiple partitions. When a consumer joins or leaves, the group coordinator triggers a rebalance: 'eager' protocol revokes EVERY partition from EVERY consumer (red strip across all lanes) before reassigning; 'cooperative-sticky' (KIP-429) only revokes the partitions that actually need to move. The wait-time bar shows the difference — minutes vs seconds.
Producersteady traf…Group coordinatorprotocol = eagerPARTITIONS · 6P0→ C0P1→ C1P2→ C2P3→ C3P4→ C0P5→ C1CONSUMER GROUP A · 4 CONSUMERSdynamic membership — restart triggers rebalanceC0owns P0, P4C1owns P1, P5C2owns P2C3owns P3
Four consumers split six partitions. Watch the lanes — they're all green (active). At tick 8, C0 exceeds max.poll.interval.ms; the group enters PreparingRebalance and every lane goes dark for a few ticks. Watch what happens to the lanes that AREN'T moving.
Implementation
Coordinator.onJoinGroup # eager
every member revokes everything before reassign
1def onJoinGroup(member):
2 group.state = PreparingRebalance
3 # signal EVERY member to drop EVERY partition
4 for m in group.members:
5 m.send(RevokeAll)
6 await m.onPartitionsRevoked_done
7 # group is idle here — nobody owns anything
8 plan = assignor.assign(group.members,
9 group.subscribed_topics)
10 group.generation += 1
11 for m, parts in plan.items():
12 m.send(SyncGroup(parts))
13 group.state = Stable
Consumer.onPartitionsRevoked
user callback — why every revoke costs wall-clock
1def onPartitionsRevoked(partitions):
2 # everything below runs while the lane is BLACK
3 for p in partitions:
4 consumer.commitSync(offsets[p])
5 stateStore[p].flush()
6 stateStore[p].close()
7 # Streams: local RocksDB rebuilt on the next assignment
8 metrics.record('revoke.latency', now() - t0)
9 # only after this returns does Coordinator proceed
Not sure what to ask? Tap a question — the staff engineer answers in the chat panel.