cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

Help me understand streaming logic with Delta Tables

pgruetter
Contributor

Hello all

I have a delta table in bronze layer, let's call it BRZ. It contains 25B rows and many duplicates. It has a version 0 and a version 1, nothing else yet. 

I then create a silver table SLV by running one deduplication batch job. This creates version 0 of SLV.

Now I want to switch to Spark Streaming, but with a trigger once=True as we only want to run it once a day at the moment. Streaming is a nice way to handle incremental loads automatically. 

So my streaming job uses 
.option("startingVersion", 2)
and also writes to SLV with a merge statement. Version 2, because versions 0 and 1 were loaded with the batch job.

After updates on BRZ, I have table versions 2, 3 and 4. When running the streaming job for SLV, I expect it to be pretty fast, as it only needs to load versions 2-4, right?
Somehow, the job takes 10+ hours (on a 12 node cluster). Looking at the metrics, numTargetRowsCopied is 25B so it copies all rows again. It also wrote 6000+ parquet files. 

So my questions:
Is it even possible to do an initial load and then switch to streaming? Does the startingVersion do what I expect it to do (reading everything after a certain delta table version)? Why does the streaming job take so long?

Thanks!

 

3 REPLIES 3

Kaniz
Community Manager
Community Manager

Hi @pgruetter, You can use Delta Lake with Spark Structured Streaming to handle incremental loads automatically. When you load a Delta table as a stream source and use it in a streaming query, the query processes all of the data present in the table as well as any new data that arrives after the stream is started.

The startingVersion option in Spark Structured Streaming reads all the data after a certain Delta table version. 

In your case, you set the startingVersion to 2, which means that the streaming job should only read versions 2-4 of the Delta table SLV . 

When you load a Delta table as a stream source and use it in a streaming query, the query processes all of the data present in the table as well as any new data that arrives after the stream is started.

However, the streaming job takes 10+ hours and copies all 25B rows again. This could be because the startingVersion option is not working as expected.  You can try setting the starting version to 3 and see if the streaming job reads only version 4 of the Delta table SLV.

It’s also possible that the streaming job is writing to too many small files, which can cause performance issues. You can try increasing the size of the files written by the streaming job by setting the maxBytesPerTrigger option.

In general, it is possible to do an initial load and then switch to streaming.  You can try increasing the size of the files written by the streaming job by setting the maxBytesPerTrigger option.

Source - https://docs.delta.io/latest/delta-streaming.html

pgruetter
Contributor

Thanks for the confirmation. Not sure I see everything as your text gets truncated, but it basically confirms that it should work.

Anyway: It looks like the incremental load is working. The problem here is, that we receive late arriving facts that touch all previous months. So the merge statement reads in all existing parquet files and with that, rewrites almost all of them. 

We need to see if we can limit the input data to a few months. Don't see any other solution in this case.

Kaniz
Community Manager
Community Manager

Hi @pgruetter , You must be able to read the full texts now. 

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.