cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

How to handle MERGE with Schema Evolution in Delta Lake

DushendRaghavan
New Contributor

How to handle MERGE with Schema Evolution in Delta Lake
Hi everyone,
Schema evolution during MERGE is one of the trickiest parts of building robust Delta Lake pipelines. Databricks actually has a native SQL syntax for this — plus Python API options for programmatic pipelines. Here's a complete guide.

The Cleanest Way — Native SQL Syntax (Databricks DBR 12.2+)
Databricks added a dedicated WITH SCHEMA EVOLUTION clause directly in the MERGE statement. No session configs needed:
sqlMERGE WITH SCHEMA EVOLUTION INTO target_table AS t
USING source_table AS s
ON t.id = s.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
That's it. Any new columns in source_table are automatically added to target_table. Clean, readable, and production-safe.

Scenario 1 — New columns added to source
sql-- source_table now has a new column: loyalty_tier
MERGE WITH SCHEMA EVOLUTION INTO my_catalog.my_schema.customers AS t
USING staging.customers_updates AS s
ON t.id = s.id
WHEN MATCHED THEN
UPDATE SET
t.name = s.name,
t.email = s.email,
t.updated_at = s.updated_at,
t.loyalty_tier = s.loyalty_tier -- new column, auto-added to target
WHEN NOT MATCHED THEN
INSERT *
After execution, loyalty_tier is permanently added to the target schema. Existing rows get NULL.

Scenario 2 — Python API (when you need programmatic control)
For Python-based pipelines, enable autoMerge at session level:
pythonfrom delta.tables import DeltaTable

spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", "true")

target = DeltaTable.forName(spark, "my_catalog.my_schema.customers")

target.alias("t").merge(
source_df.alias("s"),
"t.id = s.id"
).whenMatchedUpdateAll() \
.whenNotMatchedInsertAll() \
.execute()

Scenario 3 — Column type change (e.g., INT → BIGINT)
WITH SCHEMA EVOLUTION does NOT handle type changes. Cast explicitly in the source:
sqlMERGE WITH SCHEMA EVOLUTION INTO my_catalog.my_schema.customers AS t
USING (
SELECT
id,
name,
CAST(order_count AS BIGINT) AS order_count -- was INT in target
FROM staging.customers_updates
) AS s
ON t.id = s.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *

⚠️ Type widening (INT → BIGINT, FLOAT → DOUBLE) is safe. Narrowing will still fail.


Scenario 4 — Streaming pipeline with foreachBatch
pythonspark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", "true")

target = DeltaTable.forName(spark, "my_catalog.my_schema.customers")

def merge_with_schema_evolution(batch_df, batch_id):
target.alias("t").merge(
batch_df.alias("s"),
"t.id = s.id"
).whenMatchedUpdateAll() \
.whenNotMatchedInsertAll() \
.execute()

query = (
spark.readStream
.format("delta")
.table("staging.customers_stream")
.writeStream
.foreachBatch(merge_with_schema_evolution)
.option("checkpointLocation", "/checkpoints/customers_merge")
.trigger(availableNow=True)
.start()
)

1 REPLY 1

nayan_wylde
Esteemed Contributor II

Great post. Would also like to consider the following points:

Guardrails: schema evolution is powerful — it can also accidentally add garbage columns if upstream sends unexpected fields.
Recommendation: validate/allowlist schema changes in higher environments before promoting to prod (especially for critical tables).
Observability: log schema diffs when evolution occurs (e.g., compare batch_df.schema with table schema).

This increases trust and helps teams adopt it confidently.