Build Kafka (13 scenes)
Scene 2.5 · Offsets, retention, and where bookmarks live
Read and commit are separate ack channels; retention, not consumers, ages records out.
Previously

Reads don't delete and consumers carry their own bookmark. So who DOES delete records, and where does that bookmark physically live? Both answers are themselves Kafka topics.

Scene 2.5
Offsets, retention, and where bookmarks live
Diagram
The same partition log, with two extra layers visible: a separate `__consumer_offsets` topic where each consumer's commit position is itself written as a record, and a retention bar at the bottom that ages out the oldest cells on a clock or size budget. Read and commit are drawn as two independent arrows from each consumer.
Producersend(record)appends onlyPARTITION 0 (OF 3 — ZOOMED IN)events3 partitionsshowing P0 — pedagogy zoomREAD VS COMMITrecord = consumer.poll()// read advancesprocess(record)consumer.commit()// writes to __consumer_offsets// on restart: resume from committed__consumer_offsetsinternal Kafka topicwhere the bookmark physically livesConsumeroffset = 0Consumerread=0, committed=0
Watch the read and committed offsets — they start in lockstep. They don't have to. Retention crawls in from the left; messages stay until retention ages them out.
Implementation
Consumer.poll
advances the in-memory cursor; auto-commit piggybacks here
1def poll():
2 resp = broker.fetch(
3 topic, partition,
4 fromOffset = self.read,
5 )
6 for record in resp.records:
7 self.read += 1 # in-memory only
8 yield record
9 if enable.auto.commit:
10 # every auto.commit.interval.ms
11 commitOffset(self.read)
Consumer.commitOffset
persists the bookmark — a write into __consumer_offsets
1def commitOffset(offset):
2 # __consumer_offsets is a normal Kafka topic;
3 # the key is (group, topic, partition).
4 record = OffsetCommit(
5 group = self.group,
6 topic = topic,
7 part = partition,
8 offset = offset,
9 )
10 coordinator.append(record)
11 self.committed = offset
LogCleaner.maybeRoll
broker-side retention — ages segments out by time or size
1# runs continuously per partition
2def maybeRoll():
3 for seg in segments:
4 tooOld = age(seg) > log.retention.ms
5 tooLarge = totalBytes() > log.retention.bytes
6 if tooOld or tooLarge:
7 delete(seg) # baseOffset gone
8 if active.size >= log.segment.bytes:
9 roll() # start a new segment
Not sure what to ask? Tap a question — the staff engineer answers in the chat panel.