cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Using foreachBatch within Delta Live Tables framework

diguid
New Contributor III

Hey there!

I was wondering if there's any way of declaring a delta live table where we use foreachBatch to process the output of a streaming query.

Here's a simplification of my code:

def join_data(df_1, df_2):
    df_joined = (
        df_1
        .withWatermark('timestamp_1', '30 seconds')
        .join(
            df_2
            .withWatermark('timestamp_2', '10 seconds')
            on=f.expr("""
                df_1.id = df_2.id AND
                timestamp_2 >= timestamp_1 - INTERVAL 24 hours AND 
                timestamp_2 <= timestamp_1 AND
            """),
            how="left"
        )
    )
 
    return df_joined
 
def foreachbatch_func(df_micro_batch, batchId):
    (
        df_micro_batch
        .withColumn(
            "rn",
            f.row_number()
            .over(
                Window
                .partitionBy(partition_by_cols)
                .orderBy(order_by_cols)
            )
        )
        .filter(f.col("rn") == 1).drop("rn")
    )
    
    
    # Only inserting if not in my delta table already
    (
        DeltaTable
        .forPath(spark, mypath)
        .alias("table")
        .merge(
          df_micro_batch.alias("current_batch"),
          f.expr("myexpr")
        )
        .whenNotMatchedInsertAll()
        .execute()
    )
 
 
df = (
  join_data(df_1, df_2)
  .writeStream
  .format("delta")
  .foreachBatch(foreachbatch_func)
  .outputMode("append")
  .start()
)

Because multiple aggregations are not allowed in streaming queries, I need the foreachBatch call to perform deduplication within my micro batch and also to figure out which records have already been written to my delta table, so that I don't reinsert them.

The problem with this approach is that foreachBatch is a method of the DataStreamWriter

object, so I believe I can't call it without calling writeStream first, but at the same time, I think I can't call writeStream when defining a DLT, so would really appreciate some help to understand if there's a way around here!

Thanks in advance 🙂

1 REPLY 1

JJ_LVS1
New Contributor III

I was just going through this as well and require micro-batch operations. Can't see how this will work with DLT right now so I've switched back to structured streaming. I hope they add this functionality otherwise it limits DLT to more basic streaming.

Join 100K+ Data Experts: Register Now & Grow with Us!

Excited to expand your horizons with us? Click here to Register and begin your journey to success!

Already a member? Login and join your local regional user group! If there isn’t one near you, fill out this form and we’ll create one for you to join!