cancel
Showing results for 
Search instead for 
Did you mean: 
Technical Blog
Explore in-depth articles, tutorials, and insights on data analytics and machine learning in the Databricks Technical Blog. Stay updated on industry trends, best practices, and advanced techniques.
cancel
Showing results for 
Search instead for 
Did you mean: 
PratikK
New Contributor III

What is Schema Evolution?

Schema evolution is a feature that allows users to change a table's current schema to accommodate changing data structures. In this blog, we will explore how one can handle evolving schemas for delta tables in a Streaming DLT pipeline with Unity Catalog within the Databricks Lakehouse. 

About the data pipeline

We will leverage Delta Live Tables with Unity Catalog to demonstrate various scenarios. The examples are specific to Delta Live Tables.

The data pipeline we are using for this scenario implements the Medallion Architecture, which loads source data into a Bronze table and applies data quality rules, expectations, and any transformations in the Silver table.

Dataset

We select parquet as the source data format since it has data types encoded within the data file. Hence schema inference and evolution can be detected from the data file. For formats that don’t encode data types (JSON, CSV, and XML), Auto Loader infers all columns as strings (including nested fields in JSON files). The data types/schema can be provided during the read.

We will consider an initial sample batch of user data in parquet format with 5 fields ID(Integer), name(String), discount(Integer), op(String), and timestamp(String). Timestamp is considered as String for simplicity, it can also be defined as timestamp type 

Id, name, discount, op, timestamp
1, "Alice", 10, "Insert", "2023-10-17 01:02:30.048"
2, "Bob", 15, "Insert", "2023-10-17 01:02:30.048"​


Bronze Layer:

The data pipeline begins with incrementally loading source data with Databricks Auto Loader into a Bronze table. The Bronze table acts as the initial landing zone for incoming streaming data, where records are inserted in an append-only fashion. 

The below code reads from a cloud storage path as a streaming source using Autoloader. The Autoloader infers the schema from the data during the initial run. The inference behavior depends on the file types, summary of the behavior can be found here. We keep the default configuration for schema evolution within autoloader. In the default behavior, the stream fails and adds the new columns to the schema in the next run. Existing columns do not evolve data types 

#Bronze Layer
@dlt.table(
name=f"bronze_user"
)
def bronze_table():
  df = (spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "parquet")
      .option("cloudFiles.inferColumnTypes", True)
      .load(f"cloud-storage-path"))
  return df


Silver Layer:

In the silver layer, we read the data from the bronze layer and apply the relevant expectations. We will evolve this code for the different scenarios.

#Silver Layer
@dlt.table(
  name=f"silver_user"
)
@dlt.expect_or_fail("schema_update_rule", "_rescued_data is null")
def silver_table():
  df = dlt.read_stream('bronze_user')
  return df​


Scenarios

Adding new columns

In a DLT pipeline, adding new columns is automatically handled. If there is no explicit selection of columns, the additional column is added to the target table as part of the write transaction. 

Example Dataset

A new column age is added to the next batch of data

Id, name, discount, age, op, timestamp
3, "Charlie", 11.5, 25, "Insert", "2023-10-18 02:02:30.048"
4, "David", 14.5, 35, "Insert", "2023-10-19 01:02:30.048"​


Option: Automatic addition of new Columns

What happens when the above data is processed:

No change in the above code is required. Since we do not explicitly select the columns, DLT will automatically add the age column to the bronze and silver tables. The DLT pipeline will initially fail and automatically restart whenever there is an additive change to the schema 

Deleting columns

Column deletions are handled by the DLT pipeline without any change. The column will not be removed from the target table. For any new data arriving, the value for the deleted column will be set to Null.

Changing Data Types

Example Dataset

Here is a batch of data where the data type of discount has been changed from integer to double:

Id, name, discount, op, timestamp
3, "Charlie", 11.5, "Insert", "2023-10-18 02:02:30.048"
4, "David", 14.5, "Insert", "2023-10-19 01:02:30.048"​

The following change of data types is handled automatically for a Delta table within the DLT pipeline. Changing of data types from NullType -> any other type or upcasts from ByteType -> ShortType -> IntegerType

For all other data type changes, the incompatible data will be rescued into to the _rescued_data column. One of the below options can be used to handle the schema evolution. 

Option 1: Merge Schema and Overwrite Data in Silver Layer

What happens when the data type changes in the incoming data:

Bronze Layer:

If there is a change in the data type of an existing column, Auto Loader (bronze layer) will add the data for that particular column to the _rescued_data column. Example of how the entry will look with the _rescued_data column in the bronze table

Id, name, discount, op, timestamp, _rescued_data
3, "Charlie",, "Insert", "2023-10-18 02:02:30.048", {"discount":11.5,"_file_path":"dbfs:/data_file_path/part-00001.c000.snappy.parquet"}
4, "David",, "Insert", "2023-10-19 01:02:30.048", {"discount":14.5,"_file_path":"dbfs:/data_file_path/part-00001.c000.snappy.parquet"}

Therefore, we need to extract the data from the _rescued_data column, cast it to the new data type, and merge it into the existing column. We can do that by the following method which is called in the below silver layer:

def process_rescue_data(df, target_schema: StructType):
    df = df.withColumn("_rescued_data_modified", from_json(col("_rescued_data"), MapType(StringType(), StringType())))
    for field in target_schema.fields:
        data_type = field.dataType
        column_name = field.name
        # Check if "_rescue_data" is not null and if the key exists
        key_condition = expr(f"_rescued_data_modified IS NOT NULL AND map_contains_key(_rescued_data_modified, '{column_name}')")
        # Extract the rescued value for this column, if it exists, and cast it to the target data type
        rescued_value = when(key_condition,                
col("_rescued_data_modified").getItem(column_name).cast(data_type)).otherwise(col(column_name).cast(data_type))
        # Update the DataFrame with the merged column
        df = df.withColumn(column_name, rescued_value)
        df = df.withColumn(column_name, col(column_name).cast(data_type))
    df = df.drop('_rescued_data_modified')

    # Setting the _rescued_data to null after processing since we use the column to check qualit expectation for schema update
    df = df.withColumn('_rescued_data', lit(None).cast(StringType()))
    return df​


Silver Layer:

The pipeline will fail in the silver layer since it cannot merge the data types. To resolve the error, we do a Full Refresh of the silver layer in the DLT pipeline. The Full Refresh will reload the data from the bronze layer to the silver layer with the changed data type. 

Before the reload, we need to change the data type in the target schema within the code. We change the data type of the discount column to Double.

updated_datatypes = StructType([
  StructField("discount", DoubleType(), True)
])
@dlt.table(
    name=f"silver_user"
)
@dlt.expect("schema_update_rule", "_rescued_data is null")
def silver_table():
    df = (dlt.read_stream('bronze_user'))
    df = process_rescue_data(df, updated_datatypes)
    return df

Once the full refresh is complete, the table will be updated with the changed data type. The next run will append data to the table with the changed data type

Pros

Cons

No need to reload the bronze layer table

Table in silver needs to be completely reloaded

Minimal code changes

Data from the _rescued_data column needs to be processed


Options 2: Without Data Overwrite

To avoid reloading the silver layer table, we can create an additional table every time a column's data type changes and create a view on top that unions all the tables. 

What happens when the data type changes in the incoming data:

Bronze Layer:

Once there is an incompatible schema change, all the data for the particular column/s would be added to the _rescued_data column. For optimal performance, we should avoid permanently adding the data to the _rescued_data column. Hence, if possible, the new data should be added to a new location, and we should create a new bronze table for the change.

The updated code for the bronze layer shall be as follows:

#Bronze Layer
@dlt.table(
name=f"bronze_user"
)
def bronze_table():
  df = (spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "parquet")
      .option("cloudFiles.inferColumnTypes", True)
      .load(f"cloud-storage-path"))
  return df

@dlt.table(
name=f"bronze_user_v1"
)
def bronze_table():
  df = (spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "parquet")
      .option("cloudFiles.inferColumnTypes", True)
      .load(f"cloud-storage–updated-path"))
  return df​

Silver Layer:

The silver layer will fail whenever there is an incompatible data change for an existing column. We add another version of the table with the new schema, which contains the changed data type, and create a view that unions both tables. 

#Silver Layer
@dlt.table(
    name=f"silver_user"
)
@dlt.expect_or_fail("schema_update_rule", "_rescued_data IS NULL")
def silver_table():
  df = (dlt.read_stream('bronze_user').filter("_rescued_data IS NULL"))
  return df

dlt.create_streaming_table("silver_user_v1", {"schema_update_rule": "_rescued_data IS NULL"})

updated_data_types = StructType([
  StructField("discount", DoubleType(), True)
])
@dlt.append_flow(
  target=f"silver_user_v1"
)
def bronze_read_table_old():
  df = (dlt.read_stream('bronze_user').filter("_rescued_data IS NOT NULL"))
  df = process_rescue_data(df, updated_data_types)
  return df

@dlt.append_flow(
  target=f"silver_user_v1"
)
def bronze_read_table_new():
  df = (dlt.read_stream('bronze_user_v1'))
  return df

In the above scenario, we create a new streaming table silver_user_v1. We use DLT append flows to add stream data from 2 sources into this table. The bronze_read_table_old flow will process the data from bronze_user table where the data has been rescued in the _rescued_data column. New data where discount is of type double will arrive in the bronze_user_v1 which will be read and written by the bronze_read_table_new flow.

We create a view that unions both the tables:

CREATE OR REPLACE VIEW silver_user as
Select id, name, cast(discount as double) as discount, op, timestamp from silver_user_v1
Union
Select id, name, discount, op, timestamp from silver_user_v2​


PratikK_0-1717095671257.png

 

Pros

Cons

No reload of data

New table needs to be created for every incompatible data type change

 

Code change required


Option 3: Overwrite Data in Bronze and Silver Layer

What happens when the data type changes in the incoming data:

With the Auto Loader, this works only for file types like CSV and JSON where data type information for the columns is not present in the file. For file types like parquet, even though the schema with the changed data type is specified, the records with different data types for the same column will be added to the _rescued_data column. 

For example, with parquet files, if the discount column's data type is changed from Integer to Double, the records where the discount column has an Integer type will be moved to the _rescued_data column. A full refresh of the entire pipeline will load the data into the corresponding columns with the changed data type.

Bronze layer:

We specify the data types for columns using schema hints instead of inferring the schema.

#Bronze Layer
@dlt.table(
name=f"bronze_user"
)
def bronze_table():
  df = (spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .option("cloudFiles.schemaHints", "discount double")
      .load(f"cloud-storage-path"))
  return df

Silver layer:

No changes in the silver layer.

Pros

Cons

No special handling of the _rescued_data column

All the raw data should be stored from the beginning since it will be re-read from the source

Minimal code changes

Reload of tables in both bronze and silver layer


Renaming of Columns

Example Dataset

The batch of data where the name column is renamed to full_name.

Id, full_name, discount, op, timestamp
6, "Charlie J", 11.5, "Insert", "2023-10-20 02:02:30.048"
7, "David Stoinis", 14.5, "Insert", "2023-10-21 01:02:30.048"​

Let's consider an example where the name column is renamed to full_name in the source database table. The new files landing in the cloud storage will have the renamed column name. Column renaming is not allowed for Streaming tables created by DLT in Unity Catalog. We can handle the column renaming by merging the old column with the renamed column. 

What happens when the column is renamed in the incoming data?

Bronze Layer:

The pipeline is not aware of the renaming of the column and hence it will be considered as a new column. We do not make any changes in the bronze layer and the column is added as a new column in the bronze table. 

Option 1: Without Data Overwrite in Silver Layer

Silver Layer:

To avoid the full refresh of the silver layer, we need to make the following changes after the column is renamed at the source. We can coalesce the new column in the silver layer with the old column.

.table(
  name=f"silver_user"
)
def silver_table():
    df = (dlt.read_stream('bronze_user'))
    df = df.withColumn('name', coalesce('full_name', 'name'))
    return df

We create a view on top to rename the column(name) to the new column name(full_name) for the end user.

CREATE OR REPLACE VIEW silver_user_view as
Select id, name as full_name, cast(discount as double) as discount, op, timestamp from silver_user

Keeping the column name the same as the old name may be beneficial in cases where there is an apply_changes after this step, for example, when the renamed column is used to track a unique row.

Pros

Cons

No Data Reload

Need to create an additional view


Option 2: Data Overwrite in the Silver Layer

To rename a column in the silver layer table, we can execute a full refresh with a coalesce of the old column and the new column.

.table(
  name=f"silver_user"
)
def silver_table():
    df = (dlt.read_stream('bronze_user'))
    df = df.withColumn('full_name', coalesce('full_name', 'name'))
    return df​

 

Pros

Cons

No need to create an additional view

Reload of data in the silver layer


Conclusion

Handling schema evolution is vital to ensuring the reliability, scalability, and longevity of a streaming pipeline.

By understanding the challenges presented in various scenarios and implementing the strategies discussed, you can ensure the adaptability and reliability of your streaming pipelines against schema changes.