cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Using foreachBatch within Delta Live Tables framework

diguid
New Contributor III

Hey there!

I was wondering if there's any way of declaring a delta live table where we use foreachBatch to process the output of a streaming query.

Here's a simplification of my code:

def join_data(df_1, df_2):
    df_joined = (
        df_1
        .withWatermark('timestamp_1', '30 seconds')
        .join(
            df_2
            .withWatermark('timestamp_2', '10 seconds')
            on=f.expr("""
                df_1.id = df_2.id AND
                timestamp_2 >= timestamp_1 - INTERVAL 24 hours AND 
                timestamp_2 <= timestamp_1 AND
            """),
            how="left"
        )
    )
 
    return df_joined
 
def foreachbatch_func(df_micro_batch, batchId):
    (
        df_micro_batch
        .withColumn(
            "rn",
            f.row_number()
            .over(
                Window
                .partitionBy(partition_by_cols)
                .orderBy(order_by_cols)
            )
        )
        .filter(f.col("rn") == 1).drop("rn")
    )
    
    
    # Only inserting if not in my delta table already
    (
        DeltaTable
        .forPath(spark, mypath)
        .alias("table")
        .merge(
          df_micro_batch.alias("current_batch"),
          f.expr("myexpr")
        )
        .whenNotMatchedInsertAll()
        .execute()
    )
 
 
df = (
  join_data(df_1, df_2)
  .writeStream
  .format("delta")
  .foreachBatch(foreachbatch_func)
  .outputMode("append")
  .start()
)

Because multiple aggregations are not allowed in streaming queries, I need the foreachBatch call to perform deduplication within my micro batch and also to figure out which records have already been written to my delta table, so that I don't reinsert them.

The problem with this approach is that foreachBatch is a method of the DataStreamWriter

object, so I believe I can't call it without calling writeStream first, but at the same time, I think I can't call writeStream when defining a DLT, so would really appreciate some help to understand if there's a way around here!

Thanks in advance 🙂

1 REPLY 1

JJ_LVS1
New Contributor III

I was just going through this as well and require micro-batch operations. Can't see how this will work with DLT right now so I've switched back to structured streaming. I hope they add this functionality otherwise it limits DLT to more basic streaming.

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group