Build a CDC pipeline (Debezium + outbox) (12 scenes)
Scene 04 · Debezium tails the log
Debezium is a connector that registers as a replica, decodes each WAL/binlog record into a structured change event, and emits one Kafka message per row change.
Previously
The DB already has a durable, ordered log of every change — so the next move is a process that tails it and turns each record into a downstream event.
Scene 04
Debezium tails the log
Diagram
Left: the database from scene 3 — a tables region on top, the **WAL** (Postgres) or **binlog** (MySQL) append-only strip below. Center: the Debezium **connector** — a process that registers as a replication subscriber and reads the log over the streaming replication protocol (not SQL). Right inside the connector: the JSON **change event** envelope {op, before, after, source}. Far right: the Kafka topic strip, one message per row change.
We need a process that tails the WAL and emits one event per row change. Debezium is the canonical open-source implementation; it runs as a Kafka Connect connector and emits one change event per row change. Watch: an UPDATE lands in the WAL, the connector decodes it, the topic gets one message.
Implementation
KafkaConnect.subscribeAsReplica
boot path: register as a streaming-replication subscriber
1def start(connectorConfig):2 conn = postgres.connect(replication='database')3 conn.create_replication_slot(4 name = connectorConfig.slot_name,5 plugin = 'pgoutput', # logical decoding6 )7 stream = conn.start_replication(8 slot = connectorConfig.slot_name,9 start_lsn = offsets.last_confirmed_lsn(),10 )11 for rec in stream:12 Connector.onWalRecord(rec)
Connector.onWalRecord
main loop: decode one WAL record, emit one Kafka message
1def onWalRecord(rec):2 if connector.mode == 'select':3 # SELECT path doesn't see deletes4 rows = db.query('SELECT * FROM ' + rec.table)5 return emitRowDiffs(rows)6 # streaming-replication path: decode the WAL tuple7 before, after = decodeTuples(rec)8 op = {INSERT:'c', UPDATE:'u', DELETE:'d'}[rec.kind]9 envelope = buildEnvelope(rec, op, before, after)10 kafka.produce(11 topic = topicFor(rec.table),12 key = pk(after or before),13 value = envelope,14 )
buildEnvelope
construct the {op, before, after, source} change event
1def buildEnvelope(rec, op, before, after):2 return {3 'op': op, # 'c' | 'u' | 'd'4 'before': before, # null on insert5 'after': after, # null on delete6 'source': {7 'table': rec.table,8 'lsn': rec.lsn,9 'ts_ms': rec.commit_ts_ms,10 },11 }