<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic Re: Using foreachBatch within Delta Live Tables framework in Data Engineering</title>
    <link>https://community.databricks.com/t5/data-engineering/using-foreachbatch-within-delta-live-tables-framework/m-p/111408#M43884</link>
    <description>&lt;P&gt;foreachBatch support in DLT is coming soon, and you now have the &lt;A href="https://www.databricks.com/blog/introducing-dlt-sink-api-write-pipelines-kafka-and-external-delta-tables" target="_self"&gt;ability to write to non-DLT sinks&lt;/A&gt; as well&lt;/P&gt;</description>
    <pubDate>Thu, 27 Feb 2025 19:50:42 GMT</pubDate>
    <dc:creator>cgrant</dc:creator>
    <dc:date>2025-02-27T19:50:42Z</dc:date>
    <item>
      <title>Using foreachBatch within Delta Live Tables framework</title>
      <link>https://community.databricks.com/t5/data-engineering/using-foreachbatch-within-delta-live-tables-framework/m-p/21089#M14327</link>
      <description>&lt;P&gt;Hey there!&lt;/P&gt;&lt;P&gt;​&lt;/P&gt;&lt;P&gt;I was wondering if there's any way of declaring a delta live table where we use foreachBatch to process the output of a streaming query.&lt;/P&gt;&lt;P&gt;​&lt;/P&gt;&lt;P&gt;Here's a simplification of my code:&lt;/P&gt;&lt;P&gt;​&lt;/P&gt;&lt;PRE&gt;&lt;CODE&gt;def join_data(df_1, df_2):
    df_joined = (
        df_1
        .withWatermark('timestamp_1', '30 seconds')
        .join(
            df_2
            .withWatermark('timestamp_2', '10 seconds')
            on=f.expr("""
                df_1.id = df_2.id AND
                timestamp_2 &amp;gt;= timestamp_1 - INTERVAL 24 hours AND 
                timestamp_2 &amp;lt;= timestamp_1 AND
            """),
            how="left"
        )
    )
&amp;nbsp;
    return df_joined
&amp;nbsp;
def foreachbatch_func(df_micro_batch, batchId):
    (
        df_micro_batch
        .withColumn(
            "rn",
            f.row_number()
            .over(
                Window
                .partitionBy(partition_by_cols)
                .orderBy(order_by_cols)
            )
        )
        .filter(f.col("rn") == 1).drop("rn")
    )
    
    
    # Only inserting if not in my delta table already
    (
        DeltaTable
        .forPath(spark, mypath)
        .alias("table")
        .merge(
          df_micro_batch.alias("current_batch"),
          f.expr("myexpr")
        )
        .whenNotMatchedInsertAll()
        .execute()
    )
&amp;nbsp;
&amp;nbsp;
df = (
  join_data(df_1, df_2)
  .writeStream
  .format("delta")
  .foreachBatch(foreachbatch_func)
  .outputMode("append")
  .start()
)&lt;/CODE&gt;&lt;/PRE&gt;&lt;P&gt;​&lt;/P&gt;&lt;P&gt;Because multiple aggregations are not allowed in streaming queries, I need the foreachBatch call to perform deduplication within my micro batch and also to figure out which records have already been written to my delta table, so that I don't reinsert them.&lt;/P&gt;&lt;P&gt;​&lt;/P&gt;&lt;P&gt;The problem with this approach is that foreachBatch is a method of the DataStreamWriter&lt;/P&gt;&lt;P&gt;object, so I believe I can't call it without calling writeStream first, but at the same time, I think I can't call writeStream when defining a DLT, so would really appreciate some help to understand if there's a way around here!&lt;/P&gt;&lt;P&gt;​&lt;/P&gt;&lt;P&gt;Thanks in advance &lt;span class="lia-unicode-emoji" title=":slightly_smiling_face:"&gt;🙂&lt;/span&gt; &lt;/P&gt;&lt;P&gt;​&lt;/P&gt;&lt;P&gt;​&lt;/P&gt;</description>
      <pubDate>Tue, 22 Nov 2022 22:22:46 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/using-foreachbatch-within-delta-live-tables-framework/m-p/21089#M14327</guid>
      <dc:creator>diguid</dc:creator>
      <dc:date>2022-11-22T22:22:46Z</dc:date>
    </item>
    <item>
      <title>Re: Using foreachBatch within Delta Live Tables framework</title>
      <link>https://community.databricks.com/t5/data-engineering/using-foreachbatch-within-delta-live-tables-framework/m-p/21090#M14328</link>
      <description>&lt;P&gt;I was just going through this as well and require micro-batch operations.   Can't see how this will work with DLT right now so I've switched back to structured streaming.  I hope they add this functionality otherwise it limits DLT to more basic streaming.  &lt;/P&gt;</description>
      <pubDate>Thu, 23 Feb 2023 17:03:14 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/using-foreachbatch-within-delta-live-tables-framework/m-p/21090#M14328</guid>
      <dc:creator>JJ_LVS1</dc:creator>
      <dc:date>2023-02-23T17:03:14Z</dc:date>
    </item>
    <item>
      <title>Re: Using foreachBatch within Delta Live Tables framework</title>
      <link>https://community.databricks.com/t5/data-engineering/using-foreachbatch-within-delta-live-tables-framework/m-p/111367#M43865</link>
      <description>&lt;P&gt;&lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/78253"&gt;@diguid&lt;/a&gt;&amp;nbsp;How did you implement your solution? we are looking out for something similar&lt;/P&gt;</description>
      <pubDate>Thu, 27 Feb 2025 11:51:27 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/using-foreachbatch-within-delta-live-tables-framework/m-p/111367#M43865</guid>
      <dc:creator>JothyGanesan</dc:creator>
      <dc:date>2025-02-27T11:51:27Z</dc:date>
    </item>
    <item>
      <title>Re: Using foreachBatch within Delta Live Tables framework</title>
      <link>https://community.databricks.com/t5/data-engineering/using-foreachbatch-within-delta-live-tables-framework/m-p/111408#M43884</link>
      <description>&lt;P&gt;foreachBatch support in DLT is coming soon, and you now have the &lt;A href="https://www.databricks.com/blog/introducing-dlt-sink-api-write-pipelines-kafka-and-external-delta-tables" target="_self"&gt;ability to write to non-DLT sinks&lt;/A&gt; as well&lt;/P&gt;</description>
      <pubDate>Thu, 27 Feb 2025 19:50:42 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/using-foreachbatch-within-delta-live-tables-framework/m-p/111408#M43884</guid>
      <dc:creator>cgrant</dc:creator>
      <dc:date>2025-02-27T19:50:42Z</dc:date>
    </item>
  </channel>
</rss>

