Hi All, I am working on DLT pipeline, to create SCD2 for my bronze layer, my architecture has 4 layers, namely Raw, Bronze, Silver, Gold. I am ingesting data directly into raw, and then I am creating history(SCD2) into bronze.
My code:
@Dlt.view(name=source_name)
def source():
return (
spark.readStream.format("delta")
.option("readChangeData", "true")
.option("startingVersion", 0)
.option("mergeSchema", "true")
.table(source_table_location)
.select(
*[col(source_col).alias(target_col) for source_col, target_col in alias_config.items()]
)
)
# Defining the target table for SCD2
dlt.create_streaming_table(
name=target_table_name
)
dlt.apply_changes(
target=target_table_name,
source=source_name,
keys=keys,
sequence_by=col(last_updated_column),
track_history_except_column_list=track_history_except_column_list,
stored_as_scd_type="2",
apply_as_deletes=col(deleted_flag) == True if deleted_flag else False
)
when the run the pipeline, it shows DLT does not support schema evolution, and other similar errors related to schema being changed somehow.
Is there a way to resolve this?
Also, even though the data is in delta table as batch upload, but it has last modified date available.
Can someone help?
DELTA_SCHEMA_CHANGED_WITH_STARTING_OPTIONS(The error code)