Build a CDC pipeline (Debezium + outbox) (12 scenes)
Scene 04 · Debezium tails the log
Debezium is a connector that registers as a replica, decodes each WAL/binlog record into a structured change event, and emits one Kafka message per row change.
Previously

The DB already has a durable, ordered log of every change — so the next move is a process that tails it and turns each record into a downstream event.

Scene 04
Debezium tails the log
Diagram
Left: the database from scene 3 — a tables region on top, the **WAL** (Postgres) or **binlog** (MySQL) append-only strip below. Center: the Debezium **connector** — a process that registers as a replication subscriber and reads the log over the streaming replication protocol (not SQL). Right inside the connector: the JSON **change event** envelope {op, before, after, source}. Far right: the Kafka topic strip, one message per row change.
DB · Postgrestablesid=42 │ status=pending │ amount=$120id=73 │ status=shipped │ amount=$48id=08 │ status=pending │ amount=$305WALstreaming replication protocolDebezium connector(stopped)(waiting for first WAL record)Kafka topic(no events yet)
We need a process that tails the WAL and emits one event per row change. Debezium is the canonical open-source implementation; it runs as a Kafka Connect connector and emits one change event per row change. Watch: an UPDATE lands in the WAL, the connector decodes it, the topic gets one message.
Implementation
KafkaConnect.subscribeAsReplica
boot path: register as a streaming-replication subscriber
1def start(connectorConfig):
2 conn = postgres.connect(replication='database')
3 conn.create_replication_slot(
4 name = connectorConfig.slot_name,
5 plugin = 'pgoutput', # logical decoding
6 )
7 stream = conn.start_replication(
8 slot = connectorConfig.slot_name,
9 start_lsn = offsets.last_confirmed_lsn(),
10 )
11 for rec in stream:
12 Connector.onWalRecord(rec)
Connector.onWalRecord
main loop: decode one WAL record, emit one Kafka message
1def onWalRecord(rec):
2 if connector.mode == 'select':
3 # SELECT path doesn't see deletes
4 rows = db.query('SELECT * FROM ' + rec.table)
5 return emitRowDiffs(rows)
6 # streaming-replication path: decode the WAL tuple
7 before, after = decodeTuples(rec)
8 op = {INSERT:'c', UPDATE:'u', DELETE:'d'}[rec.kind]
9 envelope = buildEnvelope(rec, op, before, after)
10 kafka.produce(
11 topic = topicFor(rec.table),
12 key = pk(after or before),
13 value = envelope,
14 )
buildEnvelope
construct the {op, before, after, source} change event
1def buildEnvelope(rec, op, before, after):
2 return {
3 'op': op, # 'c' | 'u' | 'd'
4 'before': before, # null on insert
5 'after': after, # null on delete
6 'source': {
7 'table': rec.table,
8 'lsn': rec.lsn,
9 'ts_ms': rec.commit_ts_ms,
10 },
11 }