cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

DLT, Automatic Schema Evolution and Type Widening

MarkV
New Contributor III

I'm attempting to run a DLT pipeline that uses automatic schema evolution against tables that have type widening enabled.

I have code in this notebook that is a list of tables to create/update along with the schema for those tables. This list and spark schema are fed into this load_snapshot_tables function. That load_snapshot_tables function looks like this:

def load_snapshot_tables(source_system_name, source_schema_name, table_name, spark_schema, select_expression):

    @Dlt.table (
        name=table_name,
        comment=f"{source_system_name}_{source_schema_name}.{table_name}_Snapshot",
        table_properties={"delta.enableTypeWidening": "true"},
        cluster_by=["XXX_Snapshot_Date"]
    )
    def create_snapshot_table():

        snapshot_load_df = (
            spark.readStream
            .format("cloudFiles")
            .option("cloudFiles.format", "json")
            .option("cloudFiles.inferColumnTypes", False)
            .option("cloudFiles.includeExistingFiles", True)
            .option("pathGlobFilter", "*.json.gz")
            .schema(spark_schema)
            .load(f"abfss://YYY@{adl_name}.dfs.core.windows.net/Snapshot/{source_system_name}/{table_name}")
            .selectExpr(
                "CAST(concat(substring(_metadata.file_name, -20,4),'-',substring(_metadata.file_name, -16,2),'-',substring(_metadata.file_name, -14,2)) AS timestamp) AS XXX_Snapshot_Date",
                *select_expression,
                "_metadata.file_name AS XXX_File_Name",
                "_metadata AS XXX_File_Metadata"
            )
        )

        return (snapshot_load_df)

Everything works except type widening. New columns are added based on the schema I pass in. However, when changing data types, the process fails indicating a casting/type issue. Refreshing the tables resolves the errors. But, I don't want to have to refresh the tables. I've referenced URL Type-Widening in my work/research. In this URL, there is a section titled Widening Types with Automatic Schema Evolution. I meet all of the requirements listed there with possibly the only exception being the first bullet (The command uses INSERT or MERGE INTO). I would have assumed behind the scenes INSERT or MERGE INTO is somehow being used here.

I am using the Preview channel for the pipeline.

So, two questions:

  1. What am I missing in my python code to make sure type widening is being honored?
  2. What would my python code look like if I had to convert it to force it to use INSERT or MERGE INTO?

Thanks in advance!

6 REPLIES 6

Sidhant07
Databricks Employee
Databricks Employee

To make type widening work in your current setup, you can try the following modifications:

  1. Add the mergeSchema option to your read operation:
    snapshot_load_df = (
        spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "json")
        .option("cloudFiles.inferColumnTypes", False)
        .option("cloudFiles.includeExistingFiles", True)
        .option("pathGlobFilter", "*.json.gz")
        .option("mergeSchema", "true")  # Add this line
        .schema(spark_schema)
        .load(f"abfss://YYY@{adl_name}.dfs.core.windows.net/Snapshot/{source_system_name}/{table_name}")
        # ... rest of the code
    )
    

MarkV
New Contributor III

Thanks, Sidhant07 for the response. Unfortunately, the error I received (can't merge IntegerType to LongType) is the same as not using the option you suggested:

MarkV_0-1738789976378.png

I would really like to stick with this approach rather than the INSERT INTO approach. Any other thoughts?

MarkV
New Contributor III

I've also bounced pretty much the same question up against the Databricks Assistant to see if I'm missing anything. But, the DA code recommendation matched what I already have coded for (including the mergeSchema option).

So, I'm still searching for a solution here. Any additional help would be appreciated.

 

MarkV
New Contributor III

Sorry, @Sidhant07, forgot to mention you in my responses.

Sidhant07
Databricks Employee
Databricks Employee

Hi @MarkV ,

Apologies for the delayed response!!

Is it possible you to open a support ticket so that we can have a deeper look and investigate it further.

We need the complete error stack trace along with code to debug further.

Sidhant07
Databricks Employee
Databricks Employee

Alternatively, you can try using the INSERT INTO statement directly:

def load_snapshot_tables(source_system_name, source_schema_name, table_name, spark_schema, select_expression):
    snapshot_load_df = (
        spark.readStream
       .format("cloudFiles")
       .option("cloudFiles.format", "json")
       .option("cloudFiles.inferColumnTypes", False)
       .option("cloudFiles.includeExistingFiles", True)
       .option("pathGlobFilter", "*.json.gz")
       .schema(spark_schema)
       .load(f"abfss://YYY@{adl_name}.dfs.core.windows.net/Snapshot/{source_system_name}/{table_name}")
       .selectExpr(
            "CAST(concat(substring(_metadata.file_name, -20,4),'-',substring(_metadata.file_name, -16,2),'-',substring(_metadata.file_name, -14,2)) AS timestamp) AS XXX_Snapshot_Date",
            *select_expression,
            "_metadata.file_name AS XXX_File_Name",
            "_metadata AS XXX_File_Metadata"
        )
    )

    snapshot_load_df.writeStream \
       .format("delta") \
       .option("mergeSchema", "true") \
       .option("delta.enableTypeWidening", "true") \
       .outputMode("append") \
       .queryName(f"insert_into_{table_name}") \
       .toTable(f"{table_name}")

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group