10-13-2022 06:29 AM
I'm trying to load data using DLT and SCD 1 and am running into the error message "Detected a data update in the source table at version x. This is currently not supported. If you'd like to ignore updates, set the option 'ignoreChanges' to 'true'.
I have one DLT named charter and another named branch. The branch table has a relationship to charter, so I join to it in the pipeline to lookup the charter_key. The initial load of the data runs just fine, but when I run the incremental portion the next day, that is when I get the error message. This data is pretty static, so no data actually changed between the initial load and the incremental load. However, in the charter DLT table that is created, __apply_changes_storage_charter the __UpsertVersion has the most recent file name and an updated __Timestamp. Is that where it is saying that it has detected a change?
My code from the DLT pipeline is below. The _bronze tables are just taking the raw parquet file and putting it into a temporary streaming live table, so I am not including that code.
CREATE OR REFRESH STREAMING LIVE TABLE charter
(charter_key bigint GENERATED ALWAYS AS IDENTITY (START WITH 1 INCREMENT BY 1)
,charter_number int
,charter_name string
,charter_address1 string
,charter_address2 string
,charter_zip_code string)
APPLY CHANGES INTO LIVE.charter
FROM STREAM(LIVE.charter_bronze)
KEYS (charter_number)
SEQUENCE BY file_name
COLUMNS * EXCEPT (file_name)
CREATE OR REFRESH TEMPORARY STREAMING LIVE TABLE branch_stage AS
SELECT c.charter_key
,b.branch_number
,b.branch_name
,b.branch_address1
,b.branch_address2
,b.branch_zip_code
,b.file_name
FROM STREAM(LIVE.charter) c
INNER JOIN STREAM(LIVE.branch_bronze) b
ON c.charter_number = b.charter_number
CREATE OR REFRESH STREAMING LIVE TABLE branch
(branch_key bigint GENERATED ALWAYS AS IDENTITY (START WITH 1 INCREMENT BY 1)
,charter_key bigint
,branch_number int
,branch_name string
,branch_address1 string
,branch_address2 string
,branch_zip_code string)
APPLY CHANGES INTO LIVE.branch
FROM STREAM(LIVE.branch_stage)
KEYS (charter_key, branch_number)
SEQUENCE BY file_name
COLUMNS * EXCEPT (file_name)
I've read that it treats the files as append only, but then how exactly does SCD work? In the pipeline I have both of these set to true:
pipelines.applyChangesPreviewEnabled
spark.databricks.delta.schema.autoMerge.enabled
Any help is appreciated.
10-18-2022 05:13 AM
The problem is that you join with the table made by APPLY CHANGES and that table can theoretically have updated records.
10-18-2022 07:36 AM
@Hubert Dudek That was my thinking as well as I started to dig into it more. Do you know what the recommended solution would be when trying to use APPLY CHANGES tables in downstream tables?
10-18-2022 10:52 AM
when table is published in metastore (target option in the pipeline) for Python I just used spark.read.table() with join of dlt table, so I bet in SQL it would be something like:
FROM STREAM(LIVE.branch_bronze) b
INNER JOIN TARGET_DB.charter c
10-27-2022 06:31 AM
Thanks @Hubert Dudek. I was at this same spot before, but I can't put that reference in the same notebook or even the same pipeline. If I put it in the same notebook then on the initial load it fails since the target_db.charter table doesn't exist. If I put them in separate notebooks but the same pipeline then it doesn't discover the dependency. So I would have to have multiple pipelines, which I was hoping to avoid.
11-20-2022 11:46 PM
Hi @Andy Bolk
Hope all is well!
Just wanted to check in if you were able to resolve your issue and would you be happy to share the solution or mark an answer as best? Else please let us know if you need more help.
We'd love to hear from you.
Thanks!
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.
If there isn’t a group near you, start one and help create a community that brings people together.
Request a New Group