02-25-2025 03:09 AM
We have streaming inputs coming from streaming tables and also the table from apply_changes.
In our target there is only one table which needs to be merged with all the sources. Each source provides different columns in our target table.
Challenge: Every source comes in different timeframe, there are chances where we can have even one-two days lag. But with the primary key we need to update only those specific new events data into the target table. So single table having multiple columns which needs to be updated by multiple sources.
Difficulties faced: Full outer does not work as there are chances one of the events need not have any data at that point of join and we dont get any output from DLT full outer as watermark is not working.
SCD mode in apply_changes we could only retain the last option but column level updates we are not sure how to get it enabled.
Can anyone help on this streaming table design?
02-25-2025 06:21 AM
Hello @JothyGanesan,
To handle the scenario where multiple sources update different columns of a single target table at different times, you can use the following approach to designing the streaming table:
Here’s an example code snippet:
import io.delta.tables._
val targetTable = DeltaTable.forPath(spark, "path/to/target/table")
def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {
targetTable.as("t")
.merge(
microBatchOutputDF.as("s"),
"s.primaryKey = t.primaryKey"
)
.whenMatched().updateAll()
.whenNotMatched().insertAll()
.execute()
}
source1StreamingDF.writeStream
.foreachBatch(upsertToDelta _)
.outputMode("update")
.start()
source2StreamingDF.writeStream
.foreachBatch(upsertToDelta _)
.outputMode("update")
.start()
02-25-2025 06:21 AM
Hello @JothyGanesan,
To handle the scenario where multiple sources update different columns of a single target table at different times, you can use the following approach to designing the streaming table:
Here’s an example code snippet:
import io.delta.tables._
val targetTable = DeltaTable.forPath(spark, "path/to/target/table")
def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {
targetTable.as("t")
.merge(
microBatchOutputDF.as("s"),
"s.primaryKey = t.primaryKey"
)
.whenMatched().updateAll()
.whenNotMatched().insertAll()
.execute()
}
source1StreamingDF.writeStream
.foreachBatch(upsertToDelta _)
.outputMode("update")
.start()
source2StreamingDF.writeStream
.foreachBatch(upsertToDelta _)
.outputMode("update")
.start()
02-26-2025 02:35 AM
Thank you for the explanation@Alberto_Umana .
we will try the same. But does it work on the databricks on unity catalog as well?
03-05-2025 12:10 AM
Thank you this worked
Passionate about hosting events and connecting people? Help us grow a vibrant local community—sign up today to get started!
Sign Up Now