cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

How to activate ignoreChanges in Delta Live Table read_stream ?

adrianlwn
New Contributor III

Hello everyone,

I'm using DLT (Delta Live Tables) and I've implemented some Change Data Capture for deduplication purposes. Now I am creating a downstream table that will read the DLT as a stream (dlt.read_stream("<tablename>")).

I keep receiving this error :

> Detected a data update (for example part-00000-6723832a-b8ca-4a20-b576-d69bd5e42652-c000.snappy.parquet) in the source table at version 11. This is currently not supported. If you'd like to ignore updates, set the option 'ignoreChanges' to 'true'. If you would like the data update to be reflected, please restart this query with a fresh checkpoint directory.

And I've tried these options to activate this configuration :

@dlt.view(name="_wp_strategies_dup",
           comment="This table contains the test strategy table",
          spark_conf={"ignoreChanges": "true"})
spark.readStream.option("ignoreChanges","true").table("LIVE.wp_parameters")
dlt.option("ignoreChanges","true").read_stream("wp_parameters")

And so far nothing has worked. Is it because this configuration is not possible with DLT ? Or is it because there is another way to set this configuration up ?

18 REPLIES 18

Kaniz_Fatma
Community Manager
Community Manager

Hi @Adrian Lรถwensteinโ€‹, This article has explained in detail below a couple of ideas on how to resolve this issue. Please let us know if this helps.

Also, This article describes how to update tables in your Delta Live Tables pipeline based on changes in source data. To learn how to record and query row-level change information for Delta tables, see Use Delta Lake change data feed on Databricks.

You can use change data capture (CDC) in Delta Live Tables to update tables based on changes in source data. CDC is supported in the Delta Live Tables SQL and Python interfaces. Delta Live Tables supports updating tables with slowly changing dimensions (SCD) type 1 and type 2:

  • Use SCD Type 1 to update records directly. History is not retained for records that are updated.
  • Use SCD Type 2 to retain the history of all updates to records.

Hi @Kaniz Fatmaโ€‹ , thanks you for your answer. Unfortunately it doesn't solve my issues.

My question was about Delta Live Tables and not classical Delta Tables. I was wondering if applying the suggested settings : ignoreChanges was even possible in DLT ...

Kaniz_Fatma
Community Manager
Community Manager

Hi @Adrian Lรถwensteinโ€‹ โ€‹, We havenโ€™t heard from you since my last response, and I was checking back to see if you have a resolution yet.

If you have any solution, please share it with the community, as it can be helpful to others. Otherwise, we will respond with more details and try to help.

Also, Please don't forget to click on the "Select As Best" button whenever the information provided helps resolve your question.

Hubert-Dudek
Esteemed Contributor III

Hi, the team @Prabakar Ammeappinโ€‹ @Werner Stinckensโ€‹ @Jose Gonzalezโ€‹ @Lindsay Olsonโ€‹ . Recently, I had the same issue with the .option("ignoreChanges", "true") not working for DLT tables, and it was frustrating ๐Ÿ™‚ Maybe we could get some internal insides about that.

Hi @Hubert Dudekโ€‹, Let us look into the issue and get back to you.

TH
New Contributor II

any update on this? will this be possible anytime soon with DLTs?

We would be also interested in this. This is critical functionality for us as we need to handle changes in the data. Otherwise, we cannot consider DLT as a viable solution although we would want to.โ€‹

I am also facing the same issue . is there any update on how to enable ignoreChanges for dlt tables please?

below is my code and it's not working

def messages_raw():

 return (

  # load incrementally

  spark.readStream

   .format("cloudFiles")

   .option("cloudFiles.format", "json")

   .schema(JSONschema)

   .option("ignoreChanges", "true")

#    .load("/mnt/raj-zuk-comparis-poc/messages*.json"))

   .load("s3://zuk-comparis-poc/"))

Louis_Perreault
New Contributor II

Hi @Kaniz Fatmaโ€‹ ,

We're facing with the same issue, but with the "ignoreDeletes" option. Is there any progress in solving the problem?

fecavalc08
New Contributor III

Have anyone found the issue? We are facing the same thing

AMadan
New Contributor II

Hi @Kaniz Fatmaโ€‹ ,

I am working on a use case where I am keeping customer data using medallion architecture using Delta live Tables.

But I would like to also delete data based on GDPR. So, I have tried deleting using simple delete script basically deleting that consumer older than 5 years from bronze, silver and gold tables.

After that, I tried to run DLT pipeline again and ran into issue like mentioned above.

" Detected a data update (for example part-00000-6723832a-b8ca-4a20-b576-d69bd5e42652-c000.snappy.parquet) in the source table at version 11. This is currently not supported. If you'd like to ignore updates, set the option 'ignoreChanges' to 'true'. If you would like the data update to be reflected, please restart this query with a fresh checkpoint directory."

Any idea how to implement ignore changes and ignore deletes in DLT?

Hubert-Dudek
Esteemed Contributor III

Yes, that is a pain currently. I bet that for now, you need to perform a full refresh with cleaned checkpoints.

SRK
Contributor III

We have identified a work around to resolve this issue:

df_table = spark.sql(f'''SELECT * FROM Employee''')

df_table.write.mode("append").json("/mnt/temp_table/ Employee ",ignoreNullFields=False)

CREATE STREAMING LIVE TABLE Employee_temp

COMMENT "Employee temp"

AS

SELECT

 *

FROM cloud_files("/mnt/temp_table/ Employee ", "json")

-- Create and populate the target table.

CREATE OR REFRESH STREAMING LIVE TABLE dim_employee;

APPLY CHANGES INTO

 live.dim_employee

FROM

 stream(Live. Employee_temp)

KEYS

 (employeeid)

IGNORE NULL UPDATES

SEQUENCE BY

 load_datetime

STORED AS

 SCD TYPE 2;

Kaniz_Fatma
Community Manager
Community Manager

Hi @Adrian Lรถwensteinโ€‹  (Customer)โ€‹ โ€‹, It would mean a lot if you could select the "Best Answer" to help others find the correct answer faster.

This makes that answer appear right after the question, so it's easier to find within a thread.

It also helps us mark the question as answered so we can have more eyes helping others with unanswered questions.

Join 100K+ Data Experts: Register Now & Grow with Us!

Excited to expand your horizons with us? Click here to Register and begin your journey to success!

Already a member? Login and join your local regional user group! If there isn’t one near you, fill out this form and we’ll create one for you to join!