cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Streaming Delta Live Tables

kskistad
New Contributor III

I'm a little confused about how streaming works with DLT.

My first questions is what is the difference in behavior if you set the pipeline mode to "Continuous" but in your notebook you don't use the "streaming" prefix on table statements, and similarly, if you set the pipeline mode to "Triggered" but use "streaming_read" and other streaming statements in your notebook?

My second question is how do joins work in streaming tables in DLT? For example, if you have two streaming sources and you create a new streaming table using the .join clause, the DAG will show both tables running concurrently and converging into a single table, also streaming. This makes sense to me so far. But if Source1 drops a file with 10 rows and Source2 drops a file with related rows (common key between files) but 30 seconds later, wouldn't the pipeline immediately try to ingest Source1 and the inner join in the next step finds no common rows so doesn't load anything? So unless both files drop exactly at the same time, you will have a race condition that will always drop rows?

Third question, how is the batch size determined for streaming sources in DLT? If a file with 100 rows gets picked up by autoloader will it try to load all 100 before going on to the next step of the pipeline, or can it be < 100? Same question but for very large files (millions)?

1 ACCEPTED SOLUTION

Accepted Solutions

LandanG
Honored Contributor

Hi @Kory Skistad​ ,

First Q:  When an update is triggered for a pipeline, a streaming table or view processes only new data that has arrived since the last update. Data already processed is automatically tracked by the Delta Live Tables runtime. So you can have a streaming table with a batch pipeline, and when the batch pipeline is run only new data is processed and appended to the streaming table. On the flip side, if you have a non-streaming table but a continuous pipeline then the table will be reprocessed every time new data is added (not very cost-effective).

Second Q: Great question! For batch, the answer is that it this won't happen and the join will be fine. For streaming, you should only use stream-stream joins when: you have two fact tables that arrive within a bounded time of each other, and a reliable watermark for both, since DLT streaming uses Spark Structured Streaming under the hood. The watermark is observed in a given micro-batch (the maximum eventTime encountered) and updated in the next micro-batch. Basically, stream-to-stream joins are much more tricky but can work - https://docs.databricks.com/structured-streaming/delta-lake.html#process-initial-snapshot-without-da...

Third Q: Unfortunately not a super clear answer. The streaming data source you are reading from determines the batch size and the parallelism of ingestion. The only case where you should be setting these is when processing a huge, backlog, sometimes you need to pick a much larger default (i.e. maxFilesPerTrigger = 100000). 

View solution in original post

3 REPLIES 3

LandanG
Honored Contributor

Hi @Kory Skistad​ ,

First Q:  When an update is triggered for a pipeline, a streaming table or view processes only new data that has arrived since the last update. Data already processed is automatically tracked by the Delta Live Tables runtime. So you can have a streaming table with a batch pipeline, and when the batch pipeline is run only new data is processed and appended to the streaming table. On the flip side, if you have a non-streaming table but a continuous pipeline then the table will be reprocessed every time new data is added (not very cost-effective).

Second Q: Great question! For batch, the answer is that it this won't happen and the join will be fine. For streaming, you should only use stream-stream joins when: you have two fact tables that arrive within a bounded time of each other, and a reliable watermark for both, since DLT streaming uses Spark Structured Streaming under the hood. The watermark is observed in a given micro-batch (the maximum eventTime encountered) and updated in the next micro-batch. Basically, stream-to-stream joins are much more tricky but can work - https://docs.databricks.com/structured-streaming/delta-lake.html#process-initial-snapshot-without-da...

Third Q: Unfortunately not a super clear answer. The streaming data source you are reading from determines the batch size and the parallelism of ingestion. The only case where you should be setting these is when processing a huge, backlog, sometimes you need to pick a much larger default (i.e. maxFilesPerTrigger = 100000). 

Hi @Kory Skistad​, We haven’t heard from you since the last response from @Landan George​ and I was checking back to see if his suggestions helped you.

Or else, If you have any solution, please do share that with the community as it can be helpful to others.

Also, Please don't forget to click on the "Select As Best" button whenever the information provided helps resolve your question.

Harsh141220
New Contributor II

Is it possible to have custom upserts in streaming tables in a delta live tables pipeline?
Use case: I am trying to maintain a valid session based on timestamp column and want to upsert to the target table.
Tried going through the documentations but dlt.apply_changes() don't seem to work with custom upserts.

 

Thanks in advanced 

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group