Build Kafka (13 scenes)
Scene 5.5 · Log compaction — keep the last value per key
Compact-retention turns the log into a state store; tombstones propagate deletes.
Previously

Time-based retention drops old records. Compaction is the OTHER retention policy — keep the latest value per key forever, and turn the log into a state store the rest of the ecosystem reads from.

Scene 5.5
Log compaction — keep the last value per key
Diagram
A topic with the SAME key appearing many times across the log. The compactor scans the log and keeps only the LATEST record per key, marking older ones for deletion; tombstones (null values) explicitly delete a key. The result on the right is a 'last-write-wins' state view that downstream Streams k-tables and Connect sinks read directly.
Producersend(key, value)key, valuePARTITION 0users1 partitioncompact-retentionConsumerreads current state (0 keys)
A topic called "users" with one partition. The producer keeps writing the same keys (alice, bob, carol) with new values. Watch the compactor pass at the end — same key, only the last value survives.
Implementation
LogCleaner.compact
the dedup-by-key pass that rewrites a dirty segment
1def compact(segment):
2 # pass 1: scan, remember newest offset per key
3 latest_offset_for_key = {}
4 for record in segment:
5 latest_offset_for_key[record.key] = record.offset
6 # pass 2: emit only the survivor of each key
7 cleaned = []
8 for record in segment:
9 if shouldRetain(record, latest_offset_for_key):
10 cleaned.append(record)
11 swapSegment(segment, cleaned)
Cleaner.shouldRetain
per-record decision: am I the survivor for this key?
1def shouldRetain(record, latest_offset_for_key):
2 latest = latest_offset_for_key[record.key]
3 # survivor = the record at the newest offset
4 if record.offset != latest:
5 return False
6 # tombstones (null value) linger one extra round so
7 # downstream consumers observe the delete, then go
8 if record.value is None:
9 return within(delete.retention.ms, record.timestamp)
10 return True
Cleaner.dropTombstone
second pass: finally remove the tombstone (and the key)
1def dropTombstone(record, now):
2 if record.value is not None:
3 return False
4 # delete.retention.ms gives slow consumers a chance
5 # to observe the null and propagate the delete
6 if now - record.timestamp < delete.retention.ms:
7 return False
8 return True # key vanishes from the log
Not sure what to ask? Tap a question — the staff engineer answers in the chat panel.