cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

How to activate ignoreChanges in Delta Live Table read_stream ?

adrianlwn
New Contributor III

Hello everyone,

I'm using DLT (Delta Live Tables) and I've implemented some Change Data Capture for deduplication purposes. Now I am creating a downstream table that will read the DLT as a stream (dlt.read_stream("<tablename>")).

I keep receiving this error :

> Detected a data update (for example part-00000-6723832a-b8ca-4a20-b576-d69bd5e42652-c000.snappy.parquet) in the source table at version 11. This is currently not supported. If you'd like to ignore updates, set the option 'ignoreChanges' to 'true'. If you would like the data update to be reflected, please restart this query with a fresh checkpoint directory.

And I've tried these options to activate this configuration :

@dlt.view(name="_wp_strategies_dup",
           comment="This table contains the test strategy table",
          spark_conf={"ignoreChanges": "true"})
spark.readStream.option("ignoreChanges","true").table("LIVE.wp_parameters")
dlt.option("ignoreChanges","true").read_stream("wp_parameters")

And so far nothing has worked. Is it because this configuration is not possible with DLT ? Or is it because there is another way to set this configuration up ?

14 REPLIES 14

Hi @Kaniz Fatmaโ€‹ , thanks you for your answer. Unfortunately it doesn't solve my issues.

My question was about Delta Live Tables and not classical Delta Tables. I was wondering if applying the suggested settings : ignoreChanges was even possible in DLT ...

Hubert-Dudek
Esteemed Contributor III

Hi, the team @Prabakar Ammeappinโ€‹ @Werner Stinckensโ€‹ @Jose Gonzalezโ€‹ @Lindsay Olsonโ€‹ . Recently, I had the same issue with the .option("ignoreChanges", "true") not working for DLT tables, and it was frustrating ๐Ÿ™‚ Maybe we could get some internal insides about that.

TH
New Contributor II

any update on this? will this be possible anytime soon with DLTs?

We would be also interested in this. This is critical functionality for us as we need to handle changes in the data. Otherwise, we cannot consider DLT as a viable solution although we would want to.โ€‹

I am also facing the same issue . is there any update on how to enable ignoreChanges for dlt tables please?

below is my code and it's not working

def messages_raw():

 return (

  # load incrementally

  spark.readStream

   .format("cloudFiles")

   .option("cloudFiles.format", "json")

   .schema(JSONschema)

   .option("ignoreChanges", "true")

#    .load("/mnt/raj-zuk-comparis-poc/messages*.json"))

   .load("s3://zuk-comparis-poc/"))

Louis_Perreault
New Contributor II

Hi @Kaniz Fatmaโ€‹ ,

We're facing with the same issue, but with the "ignoreDeletes" option. Is there any progress in solving the problem?

fecavalc08
New Contributor III

Have anyone found the issue? We are facing the same thing

AMadan
New Contributor II

Hi @Kaniz Fatmaโ€‹ ,

I am working on a use case where I am keeping customer data using medallion architecture using Delta live Tables.

But I would like to also delete data based on GDPR. So, I have tried deleting using simple delete script basically deleting that consumer older than 5 years from bronze, silver and gold tables.

After that, I tried to run DLT pipeline again and ran into issue like mentioned above.

" Detected a data update (for example part-00000-6723832a-b8ca-4a20-b576-d69bd5e42652-c000.snappy.parquet) in the source table at version 11. This is currently not supported. If you'd like to ignore updates, set the option 'ignoreChanges' to 'true'. If you would like the data update to be reflected, please restart this query with a fresh checkpoint directory."

Any idea how to implement ignore changes and ignore deletes in DLT?

Hubert-Dudek
Esteemed Contributor III

Yes, that is a pain currently. I bet that for now, you need to perform a full refresh with cleaned checkpoints.

SRK
Contributor III

We have identified a work around to resolve this issue:

df_table = spark.sql(f'''SELECT * FROM Employee''')

df_table.write.mode("append").json("/mnt/temp_table/ Employee ",ignoreNullFields=False)

CREATE STREAMING LIVE TABLE Employee_temp

COMMENT "Employee temp"

AS

SELECT

 *

FROM cloud_files("/mnt/temp_table/ Employee ", "json")

-- Create and populate the target table.

CREATE OR REFRESH STREAMING LIVE TABLE dim_employee;

APPLY CHANGES INTO

 live.dim_employee

FROM

 stream(Live. Employee_temp)

KEYS

 (employeeid)

IGNORE NULL UPDATES

SEQUENCE BY

 load_datetime

STORED AS

 SCD TYPE 2;

TH
New Contributor II

Hi @Swapnil Kamleโ€‹ ,

we also implemented change data capture for deduplication purposes in DLTs. We do it in SQL using the APPLY CHANGES INTO command. How does your workaround solve the issue of updates in such a case? Would you mind explaining?

Thanks

SRK
Contributor III

Hi TH,

If you look at the code which I have shared, there I am using append to write the data in Json first then I read the Json file using autoloader.

df_table.write.mode("append").json("/mnt/temp_table/ Employee ",ignoreNullFields=False)

So, it's only appending the data not updating, which helps me to fix the issue related to updates.

Thanks

TH
New Contributor II

Thanks for your answer. But then you are not doing Change Data Capture (for deduplication purposes) as initially asked. I am looking for a solution that still lets me do deduplication...

gopรญnath
New Contributor II

In DLT read_stream, we can't use ignoreChanges / ignoreDeletes. These are the configs helps to avoid the failures but it is actually ignoring the operations done on the upstream. So you need to manually perform the deletes or updates in the downstream. (Spark structured streaming supports ever growing / append only sources).

If you have use cases where the upstream can have updates / deletes and you want to pass these operations automatically to downstream you can follow the below suggested architectures in DLT. In both setup using live tables helps to handle updates / deletes from upstream.

Architecture 1:

You can use live tables to handle this. For use cases where you perform updates/deletes on the bronze table to reflect these deletes/updates in the silver table, you can create silver table as live table

Refer below diagram:

image 

Architecture 2:

Other way to handle updates / deletes and pass through downstream is you can use DLT CDC. The CDC architecture looks something like below.

DLT bronze table --> DLT silver using CDC apply_changes --> DLT gold live table

Here silver table picks change data from bronze(updates or delete) and do necessary operations.

In both setups, if you delete/update any record in bronze table for use cases like GDPR, this delete/update will automatically flow to silver table(you no need to manually delete/update from silver and then gold). Now gold will pick this silver table and perform full refresh. (live table).

Also DLT has a special feature called enลพyme. Enลพyme helps to avoid full re-computation for the LIVE table and improve the performance.

What is enลพyme?

Compared to the existing method of fully recomputing all rows in the live table โ€“ even rows which do not need to be changed โ€“ enลพyme may significantly reduce resource utilization and improve overall pipeline latency by only updating the rows in the live table which are necessary to materialize the result.

For more details on enลพyme you can refer this blog: https://www.databricks.com/blog/2022/06/29/delta-live-tables-announces-new-capabilities-and-performa...

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโ€™t want to miss the chance to attend and share knowledge.

If there isnโ€™t a group near you, start one and help create a community that brings people together.

Request a New Group