Using foreachBatch within Delta Live Tables framework
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
11-22-2022 02:22 PM
Hey there!
I was wondering if there's any way of declaring a delta live table where we use foreachBatch to process the output of a streaming query.
Here's a simplification of my code:
def join_data(df_1, df_2):
df_joined = (
df_1
.withWatermark('timestamp_1', '30 seconds')
.join(
df_2
.withWatermark('timestamp_2', '10 seconds')
on=f.expr("""
df_1.id = df_2.id AND
timestamp_2 >= timestamp_1 - INTERVAL 24 hours AND
timestamp_2 <= timestamp_1 AND
"""),
how="left"
)
)
return df_joined
def foreachbatch_func(df_micro_batch, batchId):
(
df_micro_batch
.withColumn(
"rn",
f.row_number()
.over(
Window
.partitionBy(partition_by_cols)
.orderBy(order_by_cols)
)
)
.filter(f.col("rn") == 1).drop("rn")
)
# Only inserting if not in my delta table already
(
DeltaTable
.forPath(spark, mypath)
.alias("table")
.merge(
df_micro_batch.alias("current_batch"),
f.expr("myexpr")
)
.whenNotMatchedInsertAll()
.execute()
)
df = (
join_data(df_1, df_2)
.writeStream
.format("delta")
.foreachBatch(foreachbatch_func)
.outputMode("append")
.start()
)
Because multiple aggregations are not allowed in streaming queries, I need the foreachBatch call to perform deduplication within my micro batch and also to figure out which records have already been written to my delta table, so that I don't reinsert them.
The problem with this approach is that foreachBatch is a method of the DataStreamWriter
object, so I believe I can't call it without calling writeStream first, but at the same time, I think I can't call writeStream when defining a DLT, so would really appreciate some help to understand if there's a way around here!
Thanks in advance 🙂
- Labels:
-
Delta
-
Delta Live Tables
-
DLT
-
Foreachbatch
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
02-23-2023 09:03 AM
I was just going through this as well and require micro-batch operations. Can't see how this will work with DLT right now so I've switched back to structured streaming. I hope they add this functionality otherwise it limits DLT to more basic streaming.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
02-27-2025 03:51 AM
@diguid How did you implement your solution? we are looking out for something similar
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
02-27-2025 11:50 AM
foreachBatch support in DLT is coming soon, and you now have the ability to write to non-DLT sinks as well

