cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

ignoreDeletes' option with Delta Live Table streaming source

Zachary_Higgins
Contributor

We have a delta streaming source in our delta live table pipelines that may have data deleted from time to time.

The error message is pretty self explanatory:

...from streaming source at version 191. This is currently not supported. If you'd like to ignore deletes, set the option 'ignoreDeletes' to 'true'.

What's not clear is how to set this option. This is what we have now but it's not producing the desired results. The desired result being new data is read and deletes are ignored.

SET pipelines.ignoreDeletes = true;
CREATE OR REFRESH STREAMING LIVE TABLE...

How should this option be set in a delta live table?

7 REPLIES 7

Kaniz
Community Manager
Community Manager

Hi @Zachary Higgins​,

For example, suppose you have a table user_events with date, user_email, and action columns partitioned by date. You stream out of the user_events table, and you need to delete data from it due to GDPR.

When you delete at partition boundaries (that is, the WHERE is on a partition column), the files are already segmented by value so the delete just drops those files from the metadata. Thus, if you just want to delete data from some partitions, you can use:

spark.readStream.format("delta")
  .option("ignoreDeletes", "true")
  .load("/tmp/delta/user_events")

However, if you have to delete data based on user_email, you will need to use:

spark.readStream.format("delta")
  .option("ignoreChanges", "true")
  .load("/tmp/delta/user_events")

If you update a user_email with the UPDATE statement, the file containing the user_email in question is rewritten. When you use ignoreChanges, the new record is propagated downstream with all other unchanged records in the same file. Your logic should be able to handle these incoming duplicate records.

For more details please go through the documentation .

Hi - Thanks for the response. Does your suggestion work with Delta live tables when you try it? This seems to produce the same error message when I use the code below:

@dlt.table(
   ...
    })
 
def table_fnc():
    return spark.readStream.format("delta").option("ignoreDeletes", "true").table("tablename")

I'm not worried about duplicates. I just want to stream out the tables current state and append it to a sink in my DLT pipeline. As far as I know, DLT can't just append data from a source unless it's streamed in...

I haven't heard back, but the response above was copy and pasted from here: Table streaming reads and writes | Databricks on AWS

We decided to just move these tables to a true structured stream. We hope that DLT can support simple appends later on.

@Kaniz Fatma​ - Has Databricks found a way to prune unwanted records from a source without requiring the entire sink table be recalculated with DLT?

JohnA
New Contributor III

@Kaniz Fatma​ Hi Kaniz, can we please circle around to this? Like @Zachary Higgins​ , I am unsure how to set the ignoreDeletes or ignoreChanges spark.sql configuration for my Delta Live Table Pipeline defined in SQL.

Thanks

7effrey
New Contributor III

Databricks, please provide an answer to this. It seems like there is no documentation on how delta live tables support table updates. The ignoreChanges is bound to spark.readstream method which is not made to dlt.read_stream

Michael42
New Contributor III

I'd am looking at this as well and would like to understand my options here.

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.