Best Practices for Managing Schema Changes and Metadata Lineage in Delta Tables
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
01-09-2025 06:24 PM
Hello Databricks Community,
We are working with Airflow DAGs to trigger Databricks jobs that use Delta tables for performing upsert operations via a MERGE statement. The job was initially designed to perform a merge upsert with predefined Delta tables. However, we recently introduced a new column to the schema, and this has caused issues as the previous Delta tables don't have this new column, resulting in upsert failures.
While one solution we considered is to perform a schema overwrite just before the job run to add this new column with a NULL default, we are concerned that this approach would introduce unnecessary schema changes for every future job runs—even when the new column is already present. This would potentially lead to performance overhead and also lack clarity on when schema changes were actually necessary. Another approach would be, to change the code after the first job run to remove the modification since the metadata is established in delta with first run and the future job runs would be fine but we are missing the lineage of changes which might have happened.
Given this scenario, we believe this issue is akin to handling migrations in software engineering and managing table creation or modification in a robust way. We are looking for best practices or recommendations for:
Tracking Schema Changes: How can we manage schema evolution and ensure metadata lineage tracking for Delta tables in a consistent way?
Efficient Schema Updates: What are the best practices for modifying the schema without unnecessarily repeating operations? How can we avoid repeated schema additions if they are not needed anymore?
Table Creation and Management: Should we treat this as part of a database migration effort and ensure proper versioning of the Delta table schema? If so, what tools or strategies are recommended to track these changes effectively?
Any advice, strategies, or examples from the community would be greatly appreciated!
Thank you!
- Labels:
-
Delta Lake
-
Spark
-
Workflows
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
01-09-2025 11:47 PM - edited 01-10-2025 12:14 AM
Hi @geckopher ,
To address your concerns about managing schema evolution, tracking metadata lineage, and efficiently updating schemas in Delta tables, here are some best practices and strategies:
Tracking Schema Changes
Delta Table History: Utilize Delta Lake's built-in history feature to track changes to your Delta tables. You can use the DESCRIBE HISTORY command to view the history of operations performed on a Delta table.
DESCRIBE HISTORY <delta.`/path/to/delta-table`>
Efficient Schema Updates
1. Schema Evolution: Use Delta Lak's schema evolution capabilities to automatically handle schema changes. When performing a MERGE operation, you can use the mergeSchema option to allow Delta Lake to auto merge the schema changes.
Note: The overwriteSchema option is used when you want to completely overwrite the schema of the target table with the schema of the source data.
deltaTable
.alias("t")
.merge(
sourceDF.alias("s"),
"s.key = t.key"
)
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.option("mergeSchema", "true")
.execute()
2. Conditional Schema Updates: Before performing a schema update, check if the new column already exists. This can be done programmatically by inspecting the schema of the Delta table.
from delta.tables import DeltaTable
deltaTable = DeltaTable.forPath(spark, "/path/to/delta-table")
schema = deltaTable.toDF().schema
if "new_column" not in schema.fieldNames():
# Perform schema update
deltaTable.updateExpr(
condition="true",
set={"new_column": "NULL"}
)
Example Code
from delta.tables import DeltaTable
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("SchemaManagement").getOrCreate()
# Path to Delta table
delta_table_path = "/path/to/delta-table"
# Check if the new column exists
deltaTable = DeltaTable.forPath(spark, delta_table_path)
schema = deltaTable.toDF().schema
if "new_column" not in schema.fieldNames():
# Apply schema changes
spark.sql(f"""
ALTER TABLE delta.`{delta_table_path}`
ADD COLUMNS (new_column STRING)
""")
# Perform the MERGE operation
sourceDF = spark.read.format("delta").load("/path/to/source-data")
(
deltaTable.alias("t")
.merge(sourceDF.alias("s"), "s.key = t.key")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.option("mergeSchema", "true")
.execute()
)
# Track changes
history = spark.sql(f"DESCRIBE HISTORY delta.`{delta_table_path}`")
display(history)
Regards,
Hari Prasad

