cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Best Practices for Managing Schema Changes and Metadata Lineage in Delta Tables

geckopher
Visitor

Hello Databricks Community,

We are working with Airflow DAGs to trigger Databricks jobs that use Delta tables for performing upsert operations via a MERGE statement. The job was initially designed to perform a merge upsert with predefined Delta tables. However, we recently introduced a new column to the schema, and this has caused issues as the previous Delta tables don't have this new column, resulting in upsert failures.

While one solution we considered is to perform a schema overwrite just before the job run to add this new column with a NULL default, we are concerned that this approach would introduce unnecessary schema changes for every future job runs—even when the new column is already present. This would potentially lead to performance overhead and also lack clarity on when schema changes were actually necessary. Another approach would be, to change the code after the first job run to remove the modification since the metadata is established in delta with first run and the future job runs would be fine but we are missing the lineage of changes which might have happened.

Given this scenario, we believe this issue is akin to handling migrations in software engineering and managing table creation or modification in a robust way. We are looking for best practices or recommendations for:

  1. Tracking Schema Changes: How can we manage schema evolution and ensure metadata lineage tracking for Delta tables in a consistent way?

  2. Efficient Schema Updates: What are the best practices for modifying the schema without unnecessarily repeating operations? How can we avoid repeated schema additions if they are not needed anymore?

  3. Table Creation and Management: Should we treat this as part of a database migration effort and ensure proper versioning of the Delta table schema? If so, what tools or strategies are recommended to track these changes effectively?

Any advice, strategies, or examples from the community would be greatly appreciated!

Thank you!

1 REPLY 1

hari-prasad
Valued Contributor

Hi @geckopher ,
To address your concerns about managing schema evolution, tracking metadata lineage, and efficiently updating schemas in Delta tables, here are some best practices and strategies:

Tracking Schema Changes

Delta Table History: Utilize Delta Lake's built-in history feature to track changes to your Delta tables. You can use the DESCRIBE HISTORY command to view the history of operations performed on a Delta table.

 

 

DESCRIBE HISTORY <delta.`/path/to/delta-table`>

 

 

 

Efficient Schema Updates

1. Schema Evolution: Use Delta Lak's schema evolution capabilities to automatically handle schema changes. When performing a MERGE operation, you can use the mergeSchema option to allow Delta Lake to auto merge the schema changes.

Note: The overwriteSchema option is used when you want to completely overwrite the schema of the target table with the schema of the source data.

 

 

deltaTable
    .alias("t")
    .merge(
        sourceDF.alias("s"),
        "s.key = t.key"
    )
    .whenMatchedUpdateAll()
    .whenNotMatchedInsertAll()
    .option("mergeSchema", "true")
    .execute()

 

 

 

2. Conditional Schema Updates: Before performing a schema update, check if the new column already exists. This can be done programmatically by inspecting the schema of the Delta table.

 

 

from delta.tables import DeltaTable

deltaTable = DeltaTable.forPath(spark, "/path/to/delta-table")
schema = deltaTable.toDF().schema

if "new_column" not in schema.fieldNames():
    # Perform schema update
    deltaTable.updateExpr(
        condition="true",
        set={"new_column": "NULL"}
    )

 

 

 

 

Example Code

 

 

from delta.tables import DeltaTable
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("SchemaManagement").getOrCreate()

# Path to Delta table
delta_table_path = "/path/to/delta-table"

# Check if the new column exists
deltaTable = DeltaTable.forPath(spark, delta_table_path)
schema = deltaTable.toDF().schema

if "new_column" not in schema.fieldNames():
    # Apply schema changes
    spark.sql(f"""
    ALTER TABLE delta.`{delta_table_path}`
    ADD COLUMNS (new_column STRING)
    """)

# Perform the MERGE operation
sourceDF = spark.read.format("delta").load("/path/to/source-data")
(
    deltaTable.alias("t")
    .merge(sourceDF.alias("s"), "s.key = t.key")
    .whenMatchedUpdateAll()
    .whenNotMatchedInsertAll()
    .option("mergeSchema", "true")
    .execute()
)

# Track changes
history = spark.sql(f"DESCRIBE HISTORY delta.`{delta_table_path}`")
display(history)

 

 

 
By following these best practices and strategies, you can manage schema evolution, track metadata lineage, and efficiently update schemas in Delta tables without unnecessary repetition or performance overhead.
 
Refer this link for more understanding Delta Lake Schema Evolution | Delta Lake
 
Regards,
Hai Prasad


Regards,
Hari Prasad

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group