I have a DLT pipeline that has bronze -> silver -> gold -> platinum. I need to include a table that is joined to the gold layer for platinum that allows upserts in the DLT pipeline. This table is managed externally via Databricks API. Anytime a change is made in our UI, the table updates or includes new rows. I need to be able to support that in the DLT pipeline, which I read apply_changes allows for this functionality.
I implemented the apply_changes logic and see there are upserts supported but when I make an update to the table, I get an error saying: "Detected a data update in the source table at version 1. This is currently not supported. If you'd like to ignore updates, set the option 'skipChangeCommits' to 'true'. If you would like the data update to be reflected, please restart this query with a fresh checkpoint directory".
What am I doing wrong?
@Dlt.view
def rules():
return spark.readStream.table("table")
rule_groups_schema = StructType([
StructField("id", StringType(), True),
StructField("name", StringType(), True),
StructField("rules", ArrayType(StringType()), True),
StructField("lastModifiedDate", TimestampType(), True)
])
dlt.create_streaming_table(
name="rule_groups",
comment="Target table for rule group updates",
schema=rule_groups_schema)
dlt.apply_changes(
target="rule_groups",
source="rules",
keys=["id"],
sequence_by=col("lastModifiedDate"),
stored_as_scd_type=1
)