- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
02-25-2025 03:09 AM
We have streaming inputs coming from streaming tables and also the table from apply_changes.
In our target there is only one table which needs to be merged with all the sources. Each source provides different columns in our target table.
Challenge: Every source comes in different timeframe, there are chances where we can have even one-two days lag. But with the primary key we need to update only those specific new events data into the target table. So single table having multiple columns which needs to be updated by multiple sources.
Difficulties faced: Full outer does not work as there are chances one of the events need not have any data at that point of join and we dont get any output from DLT full outer as watermark is not working.
SCD mode in apply_changes we could only retain the last option but column level updates we are not sure how to get it enabled.
Can anyone help on this streaming table design?
Accepted Solutions
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
02-25-2025 06:21 AM
Hello @JothyGanesan,
To handle the scenario where multiple sources update different columns of a single target table at different times, you can use the following approach to designing the streaming table:
- Streaming Sources and Staging Tables: For each source, create separate streaming pipelines that write to individual staging tables. This ensures that each source's data is processed independently.
- Delta Table and Merge Operation: Utilize the Delta table's MERGE operation to update the target table. The MERGE operation allows you to specify the conditions under which updates and insertions are made to the target table.
- Use foreachBatch in Streaming Queries: Implement the streaming ingestion using Spark Structured Streaming with foreachBatch. This allows you to apply custom logic within each micro-batch, including the MERGE operation.
- Watermarking for Latency Tolerance: Set appropriate watermarks on your streaming queries to handle the processing of late data and ensure data consistency.
Here’s an example code snippet:
import io.delta.tables._
val targetTable = DeltaTable.forPath(spark, "path/to/target/table")
def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {
targetTable.as("t")
.merge(
microBatchOutputDF.as("s"),
"s.primaryKey = t.primaryKey"
)
.whenMatched().updateAll()
.whenNotMatched().insertAll()
.execute()
}
source1StreamingDF.writeStream
.foreachBatch(upsertToDelta _)
.outputMode("update")
.start()
source2StreamingDF.writeStream
.foreachBatch(upsertToDelta _)
.outputMode("update")
.start()
- Configuration: Ensure the streaming queries are correctly configured to handle processing at the desired intervals. For instance, using maxFilesPerTrigger or maxBytesPerTrigger to manage batch sizes effectively.
- Handling Schema Changes: Ensure that the schema of the Delta table is managed properly, considering possible schema evolution.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
02-25-2025 06:21 AM
Hello @JothyGanesan,
To handle the scenario where multiple sources update different columns of a single target table at different times, you can use the following approach to designing the streaming table:
- Streaming Sources and Staging Tables: For each source, create separate streaming pipelines that write to individual staging tables. This ensures that each source's data is processed independently.
- Delta Table and Merge Operation: Utilize the Delta table's MERGE operation to update the target table. The MERGE operation allows you to specify the conditions under which updates and insertions are made to the target table.
- Use foreachBatch in Streaming Queries: Implement the streaming ingestion using Spark Structured Streaming with foreachBatch. This allows you to apply custom logic within each micro-batch, including the MERGE operation.
- Watermarking for Latency Tolerance: Set appropriate watermarks on your streaming queries to handle the processing of late data and ensure data consistency.
Here’s an example code snippet:
import io.delta.tables._
val targetTable = DeltaTable.forPath(spark, "path/to/target/table")
def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {
targetTable.as("t")
.merge(
microBatchOutputDF.as("s"),
"s.primaryKey = t.primaryKey"
)
.whenMatched().updateAll()
.whenNotMatched().insertAll()
.execute()
}
source1StreamingDF.writeStream
.foreachBatch(upsertToDelta _)
.outputMode("update")
.start()
source2StreamingDF.writeStream
.foreachBatch(upsertToDelta _)
.outputMode("update")
.start()
- Configuration: Ensure the streaming queries are correctly configured to handle processing at the desired intervals. For instance, using maxFilesPerTrigger or maxBytesPerTrigger to manage batch sizes effectively.
- Handling Schema Changes: Ensure that the schema of the Delta table is managed properly, considering possible schema evolution.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
a month ago
Thank you for the explanation@Alberto_Umana .
we will try the same. But does it work on the databricks on unity catalog as well?
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
3 weeks ago
Thank you this worked

