cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

Override and Merge mode write using AutoLoader in Databricks

Anonymous
New Contributor III

We are reading files using Autoloader in Databricks. Source system is giving full snapshot of complete data in files. So we want to read the data and write in delta table in override mode so all old data is replaced by the new data. Similarly for other use case, we have requirement to merge and update existing records in delta table.

While Autoloader only supports writing in append mode, is there any option to write in override and merge.

Below is the code we are using for writing in append mode using AutoLoader

 df.writeStream.format("delta").option("mergeSchema", "true").outputMode("append").option("checkpointLocation", checkpointLocation).trigger(once=True).start(deltaLakePath)

1 ACCEPTED SOLUTION

Accepted Solutions

-werners-
Esteemed Contributor III

@Ranjeet Jaiswalโ€‹ ,

afaik merge is supported:

https://docs.databricks.com/_static/notebooks/merge-in-streaming.html

This link does some aggregation but that can be ommitted of course.

โ€‹

The interesting part here is outputMode("update"), and the foreachBatch function which does the actual merge

View solution in original post

6 REPLIES 6

Anonymous
Not applicable

Hello there, @Ranjeet Jaiswalโ€‹! My name is Piper, and I'm a moderator for the community. Welcome and thank you for your question. We will give your peers a chance to respond before we come back to this.

Thanks in advance for your patience. ๐Ÿ™‚

-werners-
Esteemed Contributor III

@Ranjeet Jaiswalโ€‹ ,

afaik merge is supported:

https://docs.databricks.com/_static/notebooks/merge-in-streaming.html

This link does some aggregation but that can be ommitted of course.

โ€‹

The interesting part here is outputMode("update"), and the foreachBatch function which does the actual merge

Anonymous
New Contributor III

This is what we are using but I was wondering if this is the most efficient way.

In our example, we get around 30-40 million records with every new file (source is giving the complete snapshot of data). Thus, even if there is change in a few records, it need to go through merging of all the 30-40 million records, which takes some time.

Not sure if there is any efficient way to handle this complete override of data use case without the expensive merge.

I have the same question.... the incoming/streaming data (using AutoLoader) could have 1 to 1M records. We need to find out from our target delta table (which has Billions of rows) which of these primary keys already exist, ignore them, and only insert the new ones. This Merge solution sounds a bit inefficient. Or perhaps I'm missing some special sauce??

-werners-
Esteemed Contributor III

If you want to go for an insert-only approach, there is a way, but it also involves some overhead. You can do a left_anti join of your new data against the delta table.

Only records that do not yet exist remain and those can be appended.

Of course this also means that changes of existing records are not registered in the delta table.

The reason it works as it is is because parquet files are immutable. If you want to change 1 record in a parquet file; you have to rewrite the whole file.

Kaniz
Community Manager
Community Manager

Hi @Ranjeet Jaiswalโ€‹ , Does @Werner Stinckensโ€‹ 's solution work in your case?

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.