Setup
Bronze source table (external to DLT, CDF & type widening enabled):
# Source table properties:
# delta.enableChangeDataFeed: "true"
# delta.enableDeletionVectors: "true"
# delta.enableTypeWidening: "true"
# delta.minReaderVersion: "3"
# delta.minWriterVersion: "7"
# Type widening applied:
spark.sql("ALTER TABLE cat_dev.poc.source_data ALTER COLUMN id TYPE BIGINT")
DLT Pipeline notebook:
import dlt
from pyspark.sql import functions as F
CATALOG = spark.conf.get("catalog", "test_catalog")
SOURCE_SCHEMA = "poc_schema_evolution"
SOURCE_TABLE = "source_data"
@Dlt.view
def bronze_source():
"""Read source data as CDF streaming"""
return (
spark.readStream
.format("delta")
.option("readChangeFeed", "true")
.table(f"{CATALOG}.{SOURCE_SCHEMA}.{SOURCE_TABLE}")
)
dlt.create_streaming_table(
name="silver_person",
comment="Silver person table - POC for schema evolution",
table_properties={
"delta.minReaderVersion": "3",
"delta.minWriterVersion": "7",
"pipelines.typeWideningAvailable": "true",
"pipelines.enableTypeWidening": "true",
"delta.columnMapping.mode": "name",
"delta.enableDeletionVectors": "true",
"delta.enableChangeDataFeed": "true",
"delta.autoOptimize.autoCompact": "true",
"delta.autoOptimize.optimizeWrite": "true",
"pipelines.reset.allowed": "false",
}
)
dlt.apply_changes(
target="silver_person",
source="bronze_source",
keys=["id"],
sequence_by=F.col("load_dts"),
stored_as_scd_type=1,
except_column_list=["_change_type", "_commit_version", "_commit_timestamp"],
)
Pipeline configuration (databricks.yml):
configuration:
catalog: ${var.catalog}
"spark.databricks.delta.streaming.skipChangeCommits": "true"
"pipelines.enableTypeWidening": "true"
"spark.databricks.delta.streaming.allowSourceColumnTypeChange": "true"
Verified Table Properties
Silver table (after creation):
delta.autoOptimize.autoCompact: "true"
delta.autoOptimize.optimizeWrite: "true"
delta.columnMapping.mode: "name"
delta.enableChangeDataFeed: "true"
delta.enableDeletionVectors: "true"
pipelines.enableTypeWidening: "true"
pipelines.typeWideningAvailable: "true"
pipelines.reset.allowed: "false"
Error
After several successful runs with INT id column, I widened the source table's id column from INT to BIGINT. The DLT pipeline fails with:
[CANNOT_UPDATE_TABLE_SCHEMA] Failed to merge the current and new schemas for table silver_person.
To proceed with this schema change, you can trigger a full refresh of this table.
[DELTA_FAILED_TO_MERGE_FIELDS] Failed to merge fields 'id' and 'id'
[DELTA_MERGE_INCOMPATIBLE_DATATYPE] Failed to merge incompatible data types IntegerType and LongType
Full refresh also fails with the same error (even with pipelines.reset.allowed temporarily set to true).
schemaTrackingLocation Tried in readStream ❌ Error: must be under DLT checkpoint
Questions
Is there a way to handle type widening (INT→BIGINT) in DLT streaming tables with apply_changes without dropping the target table?
Does type widening with schema evolution work for DLT streaming tables, or is it only supported for batch/merge operations?
Is schemaTrackingLocation supported in @Dlt.view, or only in regular Structured Streaming outside DLT?
What is the recommended pattern for handling upstream type changes in production DLT pipelines?