cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

Using foreachBatch within Delta Live Tables framework

diguid
New Contributor III

Hey there!

I was wondering if there's any way of declaring a delta live table where we use foreachBatch to process the output of a streaming query.

Here's a simplification of my code:

def join_data(df_1, df_2):
    df_joined = (
        df_1
        .withWatermark('timestamp_1', '30 seconds')
        .join(
            df_2
            .withWatermark('timestamp_2', '10 seconds')
            on=f.expr("""
                df_1.id = df_2.id AND
                timestamp_2 >= timestamp_1 - INTERVAL 24 hours AND 
                timestamp_2 <= timestamp_1 AND
            """),
            how="left"
        )
    )
 
    return df_joined
 
def foreachbatch_func(df_micro_batch, batchId):
    (
        df_micro_batch
        .withColumn(
            "rn",
            f.row_number()
            .over(
                Window
                .partitionBy(partition_by_cols)
                .orderBy(order_by_cols)
            )
        )
        .filter(f.col("rn") == 1).drop("rn")
    )
    
    
    # Only inserting if not in my delta table already
    (
        DeltaTable
        .forPath(spark, mypath)
        .alias("table")
        .merge(
          df_micro_batch.alias("current_batch"),
          f.expr("myexpr")
        )
        .whenNotMatchedInsertAll()
        .execute()
    )
 
 
df = (
  join_data(df_1, df_2)
  .writeStream
  .format("delta")
  .foreachBatch(foreachbatch_func)
  .outputMode("append")
  .start()
)

Because multiple aggregations are not allowed in streaming queries, I need the foreachBatch call to perform deduplication within my micro batch and also to figure out which records have already been written to my delta table, so that I don't reinsert them.

The problem with this approach is that foreachBatch is a method of the DataStreamWriter

object, so I believe I can't call it without calling writeStream first, but at the same time, I think I can't call writeStream when defining a DLT, so would really appreciate some help to understand if there's a way around here!

Thanks in advance 🙂

1 REPLY 1

JJ_LVS1
New Contributor III

I was just going through this as well and require micro-batch operations. Can't see how this will work with DLT right now so I've switched back to structured streaming. I hope they add this functionality otherwise it limits DLT to more basic streaming.

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.