01-29-2025 12:16 PM
I'm attempting to run a DLT pipeline that uses automatic schema evolution against tables that have type widening enabled.
I have code in this notebook that is a list of tables to create/update along with the schema for those tables. This list and spark schema are fed into this load_snapshot_tables function. That load_snapshot_tables function looks like this:
def load_snapshot_tables(source_system_name, source_schema_name, table_name, spark_schema, select_expression):
@Dlt.table (
name=table_name,
comment=f"{source_system_name}_{source_schema_name}.{table_name}_Snapshot",
table_properties={"delta.enableTypeWidening": "true"},
cluster_by=["XXX_Snapshot_Date"]
)
def create_snapshot_table():
snapshot_load_df = (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.inferColumnTypes", False)
.option("cloudFiles.includeExistingFiles", True)
.option("pathGlobFilter", "*.json.gz")
.schema(spark_schema)
.load(f"abfss://YYY@{adl_name}.dfs.core.windows.net/Snapshot/{source_system_name}/{table_name}")
.selectExpr(
"CAST(concat(substring(_metadata.file_name, -20,4),'-',substring(_metadata.file_name, -16,2),'-',substring(_metadata.file_name, -14,2)) AS timestamp) AS XXX_Snapshot_Date",
*select_expression,
"_metadata.file_name AS XXX_File_Name",
"_metadata AS XXX_File_Metadata"
)
)
return (snapshot_load_df)
Everything works except type widening. New columns are added based on the schema I pass in. However, when changing data types, the process fails indicating a casting/type issue. Refreshing the tables resolves the errors. But, I don't want to have to refresh the tables. I've referenced URL Type-Widening in my work/research. In this URL, there is a section titled Widening Types with Automatic Schema Evolution. I meet all of the requirements listed there with possibly the only exception being the first bullet (The command uses INSERT or MERGE INTO). I would have assumed behind the scenes INSERT or MERGE INTO is somehow being used here.
I am using the Preview channel for the pipeline.
So, two questions:
Thanks in advance!
01-29-2025 10:59 PM
To make type widening work in your current setup, you can try the following modifications:
mergeSchema
option to your read operation:snapshot_load_df = (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.inferColumnTypes", False)
.option("cloudFiles.includeExistingFiles", True)
.option("pathGlobFilter", "*.json.gz")
.option("mergeSchema", "true") # Add this line
.schema(spark_schema)
.load(f"abfss://YYY@{adl_name}.dfs.core.windows.net/Snapshot/{source_system_name}/{table_name}")
# ... rest of the code
)
02-05-2025 01:13 PM - edited 02-05-2025 01:18 PM
Thanks, Sidhant07 for the response. Unfortunately, the error I received (can't merge IntegerType to LongType) is the same as not using the option you suggested:
I would really like to stick with this approach rather than the INSERT INTO approach. Any other thoughts?
02-07-2025 05:20 AM
I've also bounced pretty much the same question up against the Databricks Assistant to see if I'm missing anything. But, the DA code recommendation matched what I already have coded for (including the mergeSchema option).
So, I'm still searching for a solution here. Any additional help would be appreciated.
02-07-2025 11:51 AM
Sorry, @Sidhant07, forgot to mention you in my responses.
a month ago
Hi @MarkV ,
Apologies for the delayed response!!
Is it possible you to open a support ticket so that we can have a deeper look and investigate it further.
We need the complete error stack trace along with code to debug further.
01-29-2025 11:00 PM
Alternatively, you can try using the INSERT INTO
statement directly:
def load_snapshot_tables(source_system_name, source_schema_name, table_name, spark_schema, select_expression):
snapshot_load_df = (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.inferColumnTypes", False)
.option("cloudFiles.includeExistingFiles", True)
.option("pathGlobFilter", "*.json.gz")
.schema(spark_schema)
.load(f"abfss://YYY@{adl_name}.dfs.core.windows.net/Snapshot/{source_system_name}/{table_name}")
.selectExpr(
"CAST(concat(substring(_metadata.file_name, -20,4),'-',substring(_metadata.file_name, -16,2),'-',substring(_metadata.file_name, -14,2)) AS timestamp) AS XXX_Snapshot_Date",
*select_expression,
"_metadata.file_name AS XXX_File_Name",
"_metadata AS XXX_File_Metadata"
)
)
snapshot_load_df.writeStream \
.format("delta") \
.option("mergeSchema", "true") \
.option("delta.enableTypeWidening", "true") \
.outputMode("append") \
.queryName(f"insert_into_{table_name}") \
.toTable(f"{table_name}")
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.
If there isn’t a group near you, start one and help create a community that brings people together.
Request a New Group