Build a CDC pipeline (Debezium + outbox) (12 scenes)
Scene 03 · The DB already has a log
Postgres WAL, MySQL binlog — the database already keeps a durable, ordered log of every change for replication. CDC reads this log instead of the tables.
Previously
Polling cannot be the answer — it loses deletes and collapses fast flips — so we need a source of truth that records every change in order, ideally one the database already keeps. It does: open the hood and look at the log strip.
Scene 03
The DB already has a log
Diagram
A database node split in two. The TABLES region on top holds the live rows. The append-only LOG STRIP at the bottom — labeled **WAL (Write-Ahead Log)** in Postgres, **binlog** in MySQL — is the durable, ordered record of every row change, used for crash recovery and replication. A replica on the right fetches from this strip; **CDC (Change Data Capture)** — any technique that turns DB writes into a stream of change events — reads it the same way.
Every replicated database keeps a durable, ordered record of every change. In Postgres it's called the WAL; in MySQL it's the binlog. Watch one INSERT, one UPDATE, one DELETE land in the tables region — and watch exactly one record per change get appended to the log strip below.
Implementation
Database.applyDml
write-ahead: log the change first, then mutate the table
1def applyDml(stmt):2 xid = txn.current_id()3 before = stmt.read_pre_image()4 after = stmt.compute_post_image()5 # 1. Append to the WAL (Postgres) / binlog (MySQL).6 # Durable, ordered, monotonic LSN.7 log.append(WalRecord(8 xid, stmt.op, before, after,9 ))10 log.fsync() # before any table page is dirtied11 # 2. Only now mutate the heap / index pages.12 table.apply(stmt.op, before, after)
Replica.fetchAndApply
the replica's standard fetch loop — tails the same log strip
1loop forever:2 # walreceiver (PG) / IO thread (MySQL):3 records = primary.stream_from(4 startLsn = self.confirmed_lsn,5 )6 for rec in records:7 self.log.append(rec)8 self.apply(rec.op, rec.before, rec.after)9 self.confirmed_lsn = rec.lsn10 primary.ack(self.confirmed_lsn)
primary.stream_from(startLsn)
the protocol replicas use — and CDC will use it the same way
1# Postgres: START_REPLICATION SLOT <name> LOGICAL <lsn>2# MySQL: COM_BINLOG_DUMP_GTID <server_id> <gtid_set>3def stream_from(startLsn):4 cursor = log.open_at(startLsn)5 while True:6 rec = cursor.next() # blocks until new WAL appended7 yield rec # one row change, in commit order