cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

DLT apply_changes not accepting upsert

rimaissa
New Contributor III

I have a DLT pipeline that has bronze -> silver -> gold -> platinum. I need to include a table that is joined to the gold layer for platinum that allows upserts in the DLT pipeline. This table is managed externally via Databricks API. Anytime a change is made in our UI, the table updates or includes new rows. I need to be able to support that in the DLT pipeline, which I read apply_changes allows for this functionality. 

I implemented the apply_changes logic and see there are upserts supported but when I make an update to the table, I get an error saying: "Detected a data update in the source table at version 1. This is currently not supported. If you'd like to ignore updates, set the option 'skipChangeCommits' to 'true'. If you would like the data update to be reflected, please restart this query with a fresh checkpoint directory".

What am I doing wrong?

@Dlt.view
def rules():
  return spark.readStream.table("table")

rule_groups_schema = StructType([
  StructField("id", StringType(), True),
  StructField("name", StringType(), True), 
  StructField("rules", ArrayType(StringType()), True),
  StructField("lastModifiedDate", TimestampType(), True)
])

dlt.create_streaming_table(
  name="rule_groups",
  comment="Target table for rule group updates",
  schema=rule_groups_schema)

dlt.apply_changes(
  target="rule_groups",
  source="rules",
  keys=["id"],
  sequence_by=col("lastModifiedDate"),
  stored_as_scd_type=1
)
3 REPLIES 3

Mike_Szklarczyk
New Contributor III

Hi @Mike_Szklarczyk, I want the changes to flow through though. My understanding is, if I add skipChangeCommits it will not process the upserts to the table.

Mike_Szklarczyk
New Contributor III

You obtain this error: "Detected a data update in the source table at version 1. This is currently not supported...."
becuse DLT is based on Structured Streaming.... and for Structured Streaming any changes (deletes, updates) in the source table are not allowed (by default of course).

Your understanding is wrong. If you use this option skipChangeCommits your 'streaming task' won't broke because of changes in the source table.

Please read carefull at previously posted link:

"By default, streaming tables require append-only sources. When a streaming table uses another streaming table as a source, and the source streaming table requires updates or deletes, for example, GDPR “right to be forgotten” processing, the skipChangeCommits flag can be set when reading the source streaming table to ignore those changes. For more information about this flag, see Ignore updates and deletes."

 

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group