cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

Streaming Delta Live Tables

kskistad
New Contributor III

I'm a little confused about how streaming works with DLT.

My first questions is what is the difference in behavior if you set the pipeline mode to "Continuous" but in your notebook you don't use the "streaming" prefix on table statements, and similarly, if you set the pipeline mode to "Triggered" but use "streaming_read" and other streaming statements in your notebook?

My second question is how do joins work in streaming tables in DLT? For example, if you have two streaming sources and you create a new streaming table using the .join clause, the DAG will show both tables running concurrently and converging into a single table, also streaming. This makes sense to me so far. But if Source1 drops a file with 10 rows and Source2 drops a file with related rows (common key between files) but 30 seconds later, wouldn't the pipeline immediately try to ingest Source1 and the inner join in the next step finds no common rows so doesn't load anything? So unless both files drop exactly at the same time, you will have a race condition that will always drop rows?

Third question, how is the batch size determined for streaming sources in DLT? If a file with 100 rows gets picked up by autoloader will it try to load all 100 before going on to the next step of the pipeline, or can it be < 100? Same question but for very large files (millions)?

1 ACCEPTED SOLUTION

Accepted Solutions

LandanG
Honored Contributor
Honored Contributor

Hi @Kory Skistad​ ,

First Q:  When an update is triggered for a pipeline, a streaming table or view processes only new data that has arrived since the last update. Data already processed is automatically tracked by the Delta Live Tables runtime. So you can have a streaming table with a batch pipeline, and when the batch pipeline is run only new data is processed and appended to the streaming table. On the flip side, if you have a non-streaming table but a continuous pipeline then the table will be reprocessed every time new data is added (not very cost-effective).

Second Q: Great question! For batch, the answer is that it this won't happen and the join will be fine. For streaming, you should only use stream-stream joins when: you have two fact tables that arrive within a bounded time of each other, and a reliable watermark for both, since DLT streaming uses Spark Structured Streaming under the hood. The watermark is observed in a given micro-batch (the maximum eventTime encountered) and updated in the next micro-batch. Basically, stream-to-stream joins are much more tricky but can work - https://docs.databricks.com/structured-streaming/delta-lake.html#process-initial-snapshot-without-da...

Third Q: Unfortunately not a super clear answer. The streaming data source you are reading from determines the batch size and the parallelism of ingestion. The only case where you should be setting these is when processing a huge, backlog, sometimes you need to pick a much larger default (i.e. maxFilesPerTrigger = 100000). 

View solution in original post

2 REPLIES 2

LandanG
Honored Contributor
Honored Contributor

Hi @Kory Skistad​ ,

First Q:  When an update is triggered for a pipeline, a streaming table or view processes only new data that has arrived since the last update. Data already processed is automatically tracked by the Delta Live Tables runtime. So you can have a streaming table with a batch pipeline, and when the batch pipeline is run only new data is processed and appended to the streaming table. On the flip side, if you have a non-streaming table but a continuous pipeline then the table will be reprocessed every time new data is added (not very cost-effective).

Second Q: Great question! For batch, the answer is that it this won't happen and the join will be fine. For streaming, you should only use stream-stream joins when: you have two fact tables that arrive within a bounded time of each other, and a reliable watermark for both, since DLT streaming uses Spark Structured Streaming under the hood. The watermark is observed in a given micro-batch (the maximum eventTime encountered) and updated in the next micro-batch. Basically, stream-to-stream joins are much more tricky but can work - https://docs.databricks.com/structured-streaming/delta-lake.html#process-initial-snapshot-without-da...

Third Q: Unfortunately not a super clear answer. The streaming data source you are reading from determines the batch size and the parallelism of ingestion. The only case where you should be setting these is when processing a huge, backlog, sometimes you need to pick a much larger default (i.e. maxFilesPerTrigger = 100000). 

Kaniz
Community Manager
Community Manager

Hi @Kory Skistad​, We haven’t heard from you since the last response from @Landan George​ and I was checking back to see if his suggestions helped you.

Or else, If you have any solution, please do share that with the community as it can be helpful to others.

Also, Please don't forget to click on the "Select As Best" button whenever the information provided helps resolve your question.

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.