Schema evolution is a feature that allows users to change a table's current schema to accommodate changing data structures. In this blog, we will explore how one can handle evolving schemas for delta tables in a Streaming DLT pipeline with Unity Catalog within the Databricks Lakehouse.
We will leverage Delta Live Tables with Unity Catalog to demonstrate various scenarios. The examples are specific to Delta Live Tables.
The data pipeline we are using for this scenario implements the Medallion Architecture, which loads source data into a Bronze table and applies data quality rules, expectations, and any transformations in the Silver table.
We select parquet as the source data format since it has data types encoded within the data file. Hence schema inference and evolution can be detected from the data file. For formats that don’t encode data types (JSON, CSV, and XML), Auto Loader infers all columns as strings (including nested fields in JSON files). The data types/schema can be provided during the read.
We will consider an initial sample batch of user data in parquet format with 5 fields ID(Integer), name(String), discount(Integer), op(String), and timestamp(String). Timestamp is considered as String for simplicity, it can also be defined as timestamp type
|
The data pipeline begins with incrementally loading source data with Databricks Auto Loader into a Bronze table. The Bronze table acts as the initial landing zone for incoming streaming data, where records are inserted in an append-only fashion.
The below code reads from a cloud storage path as a streaming source using Autoloader. The Autoloader infers the schema from the data during the initial run. The inference behavior depends on the file types, summary of the behavior can be found here. We keep the default configuration for schema evolution within autoloader. In the default behavior, the stream fails and adds the new columns to the schema in the next run. Existing columns do not evolve data types
|
In the silver layer, we read the data from the bronze layer and apply the relevant expectations. We will evolve this code for the different scenarios.
|
In a DLT pipeline, adding new columns is automatically handled. If there is no explicit selection of columns, the additional column is added to the target table as part of the write transaction.
A new column age is added to the next batch of data
|
What happens when the above data is processed:
No change in the above code is required. Since we do not explicitly select the columns, DLT will automatically add the age column to the bronze and silver tables. The DLT pipeline will initially fail and automatically restart whenever there is an additive change to the schema
Column deletions are handled by the DLT pipeline without any change. The column will not be removed from the target table. For any new data arriving, the value for the deleted column will be set to Null.
Here is a batch of data where the data type of discount has been changed from integer to double:
|
The following change of data types is handled automatically for a Delta table within the DLT pipeline. Changing of data types from NullType -> any other type or upcasts from ByteType -> ShortType -> IntegerType
For all other data type changes, the incompatible data will be rescued into to the _rescued_data column. One of the below options can be used to handle the schema evolution.
What happens when the data type changes in the incoming data:
Bronze Layer:
If there is a change in the data type of an existing column, Auto Loader (bronze layer) will add the data for that particular column to the _rescued_data column. Example of how the entry will look with the _rescued_data column in the bronze table
|
Therefore, we need to extract the data from the _rescued_data column, cast it to the new data type, and merge it into the existing column. We can do that by the following method which is called in the below silver layer:
|
Silver Layer:
The pipeline will fail in the silver layer since it cannot merge the data types. To resolve the error, we do a Full Refresh of the silver layer in the DLT pipeline. The Full Refresh will reload the data from the bronze layer to the silver layer with the changed data type.
Before the reload, we need to change the data type in the target schema within the code. We change the data type of the discount column to Double.
|
Once the full refresh is complete, the table will be updated with the changed data type. The next run will append data to the table with the changed data type
Pros |
Cons |
No need to reload the bronze layer table |
Table in silver needs to be completely reloaded |
Minimal code changes |
Data from the _rescued_data column needs to be processed |
To avoid reloading the silver layer table, we can create an additional table every time a column's data type changes and create a view on top that unions all the tables.
What happens when the data type changes in the incoming data:
Bronze Layer:
Once there is an incompatible schema change, all the data for the particular column/s would be added to the _rescued_data column. For optimal performance, we should avoid permanently adding the data to the _rescued_data column. Hence, if possible, the new data should be added to a new location, and we should create a new bronze table for the change.
The updated code for the bronze layer shall be as follows:
|
Silver Layer:
The silver layer will fail whenever there is an incompatible data change for an existing column. We add another version of the table with the new schema, which contains the changed data type, and create a view that unions both tables.
|
In the above scenario, we create a new streaming table silver_user_v1. We use DLT append flows to add stream data from 2 sources into this table. The bronze_read_table_old flow will process the data from bronze_user table where the data has been rescued in the _rescued_data column. New data where discount is of type double will arrive in the bronze_user_v1 which will be read and written by the bronze_read_table_new flow.
We create a view that unions both the tables:
|
Pros |
Cons |
No reload of data |
New table needs to be created for every incompatible data type change |
Code change required |
What happens when the data type changes in the incoming data:
With the Auto Loader, this works only for file types like CSV and JSON where data type information for the columns is not present in the file. For file types like parquet, even though the schema with the changed data type is specified, the records with different data types for the same column will be added to the _rescued_data column.
For example, with parquet files, if the discount column's data type is changed from Integer to Double, the records where the discount column has an Integer type will be moved to the _rescued_data column. A full refresh of the entire pipeline will load the data into the corresponding columns with the changed data type.
Bronze layer:
We specify the data types for columns using schema hints instead of inferring the schema.
|
Silver layer:
No changes in the silver layer.
Pros |
Cons |
No special handling of the _rescued_data column |
All the raw data should be stored from the beginning since it will be re-read from the source |
Minimal code changes |
Reload of tables in both bronze and silver layer |
The batch of data where the name column is renamed to full_name.
|
Let's consider an example where the name column is renamed to full_name in the source database table. The new files landing in the cloud storage will have the renamed column name. Column renaming is not allowed for Streaming tables created by DLT in Unity Catalog. We can handle the column renaming by merging the old column with the renamed column.
Bronze Layer:
The pipeline is not aware of the renaming of the column and hence it will be considered as a new column. We do not make any changes in the bronze layer and the column is added as a new column in the bronze table.
Silver Layer:
To avoid the full refresh of the silver layer, we need to make the following changes after the column is renamed at the source. We can coalesce the new column in the silver layer with the old column.
|
We create a view on top to rename the column(name) to the new column name(full_name) for the end user.
|
Keeping the column name the same as the old name may be beneficial in cases where there is an apply_changes after this step, for example, when the renamed column is used to track a unique row.
Pros |
Cons |
No Data Reload |
Need to create an additional view |
To rename a column in the silver layer table, we can execute a full refresh with a coalesce of the old column and the new column.
|
Pros |
Cons |
No need to create an additional view |
Reload of data in the silver layer |
Handling schema evolution is vital to ensuring the reliability, scalability, and longevity of a streaming pipeline.
By understanding the challenges presented in various scenarios and implementing the strategies discussed, you can ensure the adaptability and reliability of your streaming pipelines against schema changes.
You must be a registered user to add a comment. If you've already registered, sign in. Otherwise, register and sign in.