DLT, Automatic Schema Evolution and Type Widening
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
3 weeks ago
I'm attempting to run a DLT pipeline that uses automatic schema evolution against tables that have type widening enabled.
I have code in this notebook that is a list of tables to create/update along with the schema for those tables. This list and spark schema are fed into this load_snapshot_tables function. That load_snapshot_tables function looks like this:
def load_snapshot_tables(source_system_name, source_schema_name, table_name, spark_schema, select_expression):
@Dlt.table (
name=table_name,
comment=f"{source_system_name}_{source_schema_name}.{table_name}_Snapshot",
table_properties={"delta.enableTypeWidening": "true"},
cluster_by=["XXX_Snapshot_Date"]
)
def create_snapshot_table():
snapshot_load_df = (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.inferColumnTypes", False)
.option("cloudFiles.includeExistingFiles", True)
.option("pathGlobFilter", "*.json.gz")
.schema(spark_schema)
.load(f"abfss://YYY@{adl_name}.dfs.core.windows.net/Snapshot/{source_system_name}/{table_name}")
.selectExpr(
"CAST(concat(substring(_metadata.file_name, -20,4),'-',substring(_metadata.file_name, -16,2),'-',substring(_metadata.file_name, -14,2)) AS timestamp) AS XXX_Snapshot_Date",
*select_expression,
"_metadata.file_name AS XXX_File_Name",
"_metadata AS XXX_File_Metadata"
)
)
return (snapshot_load_df)
Everything works except type widening. New columns are added based on the schema I pass in. However, when changing data types, the process fails indicating a casting/type issue. Refreshing the tables resolves the errors. But, I don't want to have to refresh the tables. I've referenced URL Type-Widening in my work/research. In this URL, there is a section titled Widening Types with Automatic Schema Evolution. I meet all of the requirements listed there with possibly the only exception being the first bullet (The command uses INSERT or MERGE INTO). I would have assumed behind the scenes INSERT or MERGE INTO is somehow being used here.
I am using the Preview channel for the pipeline.
So, two questions:
- What am I missing in my python code to make sure type widening is being honored?
- What would my python code look like if I had to convert it to force it to use INSERT or MERGE INTO?
Thanks in advance!
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
3 weeks ago
To make type widening work in your current setup, you can try the following modifications:
- Add the
mergeSchema
option to your read operation:snapshot_load_df = ( spark.readStream .format("cloudFiles") .option("cloudFiles.format", "json") .option("cloudFiles.inferColumnTypes", False) .option("cloudFiles.includeExistingFiles", True) .option("pathGlobFilter", "*.json.gz") .option("mergeSchema", "true") # Add this line .schema(spark_schema) .load(f"abfss://YYY@{adl_name}.dfs.core.windows.net/Snapshot/{source_system_name}/{table_name}") # ... rest of the code )
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
2 weeks ago - last edited 2 weeks ago
Thanks, Sidhant07 for the response. Unfortunately, the error I received (can't merge IntegerType to LongType) is the same as not using the option you suggested:
I would really like to stick with this approach rather than the INSERT INTO approach. Any other thoughts?
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
2 weeks ago
I've also bounced pretty much the same question up against the Databricks Assistant to see if I'm missing anything. But, the DA code recommendation matched what I already have coded for (including the mergeSchema option).
So, I'm still searching for a solution here. Any additional help would be appreciated.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
a week ago
Sorry, @Sidhant07, forgot to mention you in my responses.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Friday
Hi @MarkV ,
Apologies for the delayed response!!
Is it possible you to open a support ticket so that we can have a deeper look and investigate it further.
We need the complete error stack trace along with code to debug further.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
3 weeks ago
Alternatively, you can try using the INSERT INTO
statement directly:
def load_snapshot_tables(source_system_name, source_schema_name, table_name, spark_schema, select_expression):
snapshot_load_df = (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.inferColumnTypes", False)
.option("cloudFiles.includeExistingFiles", True)
.option("pathGlobFilter", "*.json.gz")
.schema(spark_schema)
.load(f"abfss://YYY@{adl_name}.dfs.core.windows.net/Snapshot/{source_system_name}/{table_name}")
.selectExpr(
"CAST(concat(substring(_metadata.file_name, -20,4),'-',substring(_metadata.file_name, -16,2),'-',substring(_metadata.file_name, -14,2)) AS timestamp) AS XXX_Snapshot_Date",
*select_expression,
"_metadata.file_name AS XXX_File_Name",
"_metadata AS XXX_File_Metadata"
)
)
snapshot_load_df.writeStream \
.format("delta") \
.option("mergeSchema", "true") \
.option("delta.enableTypeWidening", "true") \
.outputMode("append") \
.queryName(f"insert_into_{table_name}") \
.toTable(f"{table_name}")
![](/skins/images/B38AF44D4BD6CE643D2A527BE673CCF6/responsive_peak/images/icon_anonymous_message.png)
![](/skins/images/B38AF44D4BD6CE643D2A527BE673CCF6/responsive_peak/images/icon_anonymous_message.png)