How to handle MERGE with Schema Evolution in Delta Lake
Hi everyone,
Schema evolution during MERGE is one of the trickiest parts of building robust Delta Lake pipelines. Databricks actually has a native SQL syntax for this — plus Python API options for programmatic pipelines. Here's a complete guide.
The Cleanest Way — Native SQL Syntax (Databricks DBR 12.2+)
Databricks added a dedicated WITH SCHEMA EVOLUTION clause directly in the MERGE statement. No session configs needed:
sqlMERGE WITH SCHEMA EVOLUTION INTO target_table AS t
USING source_table AS s
ON t.id = s.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
That's it. Any new columns in source_table are automatically added to target_table. Clean, readable, and production-safe.
Scenario 1 — New columns added to source
sql-- source_table now has a new column: loyalty_tier
MERGE WITH SCHEMA EVOLUTION INTO my_catalog.my_schema.customers AS t
USING staging.customers_updates AS s
ON t.id = s.id
WHEN MATCHED THEN
UPDATE SET
t.name = s.name,
t.email = s.email,
t.updated_at = s.updated_at,
t.loyalty_tier = s.loyalty_tier -- new column, auto-added to target
WHEN NOT MATCHED THEN
INSERT *
After execution, loyalty_tier is permanently added to the target schema. Existing rows get NULL.
Scenario 2 — Python API (when you need programmatic control)
For Python-based pipelines, enable autoMerge at session level:
pythonfrom delta.tables import DeltaTable
spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", "true")
target = DeltaTable.forName(spark, "my_catalog.my_schema.customers")
target.alias("t").merge(
source_df.alias("s"),
"t.id = s.id"
).whenMatchedUpdateAll() \
.whenNotMatchedInsertAll() \
.execute()
Scenario 3 — Column type change (e.g., INT → BIGINT)
WITH SCHEMA EVOLUTION does NOT handle type changes. Cast explicitly in the source:
sqlMERGE WITH SCHEMA EVOLUTION INTO my_catalog.my_schema.customers AS t
USING (
SELECT
id,
name,
CAST(order_count AS BIGINT) AS order_count -- was INT in target
FROM staging.customers_updates
) AS s
ON t.id = s.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
⚠️ Type widening (INT → BIGINT, FLOAT → DOUBLE) is safe. Narrowing will still fail.
Scenario 4 — Streaming pipeline with foreachBatch
pythonspark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", "true")
target = DeltaTable.forName(spark, "my_catalog.my_schema.customers")
def merge_with_schema_evolution(batch_df, batch_id):
target.alias("t").merge(
batch_df.alias("s"),
"t.id = s.id"
).whenMatchedUpdateAll() \
.whenNotMatchedInsertAll() \
.execute()
query = (
spark.readStream
.format("delta")
.table("staging.customers_stream")
.writeStream
.foreachBatch(merge_with_schema_evolution)
.option("checkpointLocation", "/checkpoints/customers_merge")
.trigger(availableNow=True)
.start()
)