Build a CDC pipeline (Debezium + outbox) (12 scenes)
Scene 07 · Schema evolution leaks the table
Raw CDC inherits the source DDL — an ALTER TABLE RENAME COLUMN silently breaks downstream unless a Schema Registry enforces a compatibility mode that rejects the change at registration.
Previously

Now the pipeline can faithfully replay history and stream live changes — but it is emitting raw row deltas with whatever column names the DBA picked, which means the next ALTER TABLE leaks straight into every consumer. So we need something that catches incompatible changes BEFORE the bytes leave the producer.

Scene 07
Schema evolution leaks the table
Diagram
A connector tails the database WAL and sends each change event through a Schema Registry before it lands on the Kafka topic. **Schema registry** — a separate service that stores every version of the change-event schema and gates registration based on a compatibility rule. **Schema evolution** — handling DDL changes (ADD/RENAME/DROP COLUMN) without breaking downstream consumers. **Change event** — one Kafka record per row change, shaped by the source table's columns. **Connector** — the process that decodes the WAL into change events.
DDL paletteADDRENAMEDROPsource DBorders (id, user_id, amount, currency…)Schema Registryschema v1compatibility:BACKWARDFORWARDFULLNONEidle · waiting for DDLBACKWARD: new schema can read old dataDebeziumemits change events→ registers schemaKafka topicpayloads tagged v1old consumerv1✓ deserialize OKreading v1 cleanlynew consumerv2✓ deserialize OKreading v1 cleanly
The connector is streaming change events; the schema registry holds schema v1. Without a separate place to store schema versions, every consumer would have to redeploy on every DDL change. The component that does that storage is the schema registry, and the rules it enforces are called schema evolution. Watch what happens when the DBA adds a new column.
Implementation
SchemaRegistry.register
Gate a candidate schema by compatibility mode.
1def register(subject, new_schema, mode):
2 old = versions[subject][-1]
3 if mode == NONE:
4 return accept(new_schema)
5 if mode == BACKWARD:
6 ok = can_read(reader=new_schema, data_of=old)
7 elif mode == FORWARD:
8 ok = can_read(reader=old, data_of=new_schema)
9 elif mode == FULL:
10 ok = (can_read(new_schema, old) and
11 can_read(old, new_schema))
12 return accept(new_schema) if ok else reject()
Connector.publishWithSchema
Register first, then embed the schema-id in the message.
1def publish_with_schema(event, subject):
2 schema = avro_schema_of(event)
3 verdict = registry.register(subject, schema, MODE)
4 if verdict == REJECTED:
5 halt('cannot emit — incompatible schema')
6 schema_id = verdict.id
7 payload = magic_byte + schema_id + avro_encode(event)
8 kafka.produce(topic=subject, value=payload)
9 # consumer.deserialize() looks up schema by id
renameColumn
Avro rename = remove + add; aliases bridge the gap.
1def rename_column(old_name, new_name, aliases_enabled):
2 new_schema = drop_field(current, old_name)
3 if aliases_enabled:
4 new_schema = add_field(new_schema, new_name,
5 aliases=[old_name])
6 else:
7 new_schema = add_field(new_schema, new_name)
8 # under BACKWARD, old reader looks for old_name:
9 # with alias -> resolves via aliases[] -> accept
10 # no alias -> field missing -> reject
11 return registry.register(subject, new_schema, MODE)