Build a CDC pipeline (Debezium + outbox) (12 scenes)
Scene 07 · Schema evolution leaks the table
Raw CDC inherits the source DDL — an ALTER TABLE RENAME COLUMN silently breaks downstream unless a Schema Registry enforces a compatibility mode that rejects the change at registration.
Previously
Now the pipeline can faithfully replay history and stream live changes — but it is emitting raw row deltas with whatever column names the DBA picked, which means the next ALTER TABLE leaks straight into every consumer. So we need something that catches incompatible changes BEFORE the bytes leave the producer.
Scene 07
Schema evolution leaks the table
Diagram
A connector tails the database WAL and sends each change event through a Schema Registry before it lands on the Kafka topic. **Schema registry** — a separate service that stores every version of the change-event schema and gates registration based on a compatibility rule. **Schema evolution** — handling DDL changes (ADD/RENAME/DROP COLUMN) without breaking downstream consumers. **Change event** — one Kafka record per row change, shaped by the source table's columns. **Connector** — the process that decodes the WAL into change events.
The connector is streaming change events; the schema registry holds schema v1. Without a separate place to store schema versions, every consumer would have to redeploy on every DDL change. The component that does that storage is the schema registry, and the rules it enforces are called schema evolution. Watch what happens when the DBA adds a new column.
Implementation
SchemaRegistry.register
Gate a candidate schema by compatibility mode.
1def register(subject, new_schema, mode):2 old = versions[subject][-1]3 if mode == NONE:4 return accept(new_schema)5 if mode == BACKWARD:6 ok = can_read(reader=new_schema, data_of=old)7 elif mode == FORWARD:8 ok = can_read(reader=old, data_of=new_schema)9 elif mode == FULL:10 ok = (can_read(new_schema, old) and11 can_read(old, new_schema))12 return accept(new_schema) if ok else reject()
Connector.publishWithSchema
Register first, then embed the schema-id in the message.
1def publish_with_schema(event, subject):2 schema = avro_schema_of(event)3 verdict = registry.register(subject, schema, MODE)4 if verdict == REJECTED:5 halt('cannot emit — incompatible schema')6 schema_id = verdict.id7 payload = magic_byte + schema_id + avro_encode(event)8 kafka.produce(topic=subject, value=payload)9 # consumer.deserialize() looks up schema by id
renameColumn
Avro rename = remove + add; aliases bridge the gap.
1def rename_column(old_name, new_name, aliases_enabled):2 new_schema = drop_field(current, old_name)3 if aliases_enabled:4 new_schema = add_field(new_schema, new_name,5 aliases=[old_name])6 else:7 new_schema = add_field(new_schema, new_name)8 # under BACKWARD, old reader looks for old_name:9 # with alias -> resolves via aliases[] -> accept10 # no alias -> field missing -> reject11 return registry.register(subject, new_schema, MODE)