Hey there!
I was wondering if there's any way of declaring a delta live table where we use foreachBatch to process the output of a streaming query.
Here's a simplification of my code:
def join_data(df_1, df_2):
df_joined = (
df_1
.withWatermark('timestamp_1', '30 seconds')
.join(
df_2
.withWatermark('timestamp_2', '10 seconds')
on=f.expr("""
df_1.id = df_2.id AND
timestamp_2 >= timestamp_1 - INTERVAL 24 hours AND
timestamp_2 <= timestamp_1 AND
"""),
how="left"
)
)
return df_joined
def foreachbatch_func(df_micro_batch, batchId):
(
df_micro_batch
.withColumn(
"rn",
f.row_number()
.over(
Window
.partitionBy(partition_by_cols)
.orderBy(order_by_cols)
)
)
.filter(f.col("rn") == 1).drop("rn")
)
# Only inserting if not in my delta table already
(
DeltaTable
.forPath(spark, mypath)
.alias("table")
.merge(
df_micro_batch.alias("current_batch"),
f.expr("myexpr")
)
.whenNotMatchedInsertAll()
.execute()
)
df = (
join_data(df_1, df_2)
.writeStream
.format("delta")
.foreachBatch(foreachbatch_func)
.outputMode("append")
.start()
)
Because multiple aggregations are not allowed in streaming queries, I need the foreachBatch call to perform deduplication within my micro batch and also to figure out which records have already been written to my delta table, so that I don't reinsert them.
The problem with this approach is that foreachBatch is a method of the DataStreamWriter
object, so I believe I can't call it without calling writeStream first, but at the same time, I think I can't call writeStream when defining a DLT, so would really appreciate some help to understand if there's a way around here!
Thanks in advance 🙂