<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic Re: Update set in foreachBatch in Data Engineering</title>
    <link>https://community.databricks.com/t5/data-engineering/update-set-in-foreachbatch/m-p/100378#M40274</link>
    <description>&lt;P data-unlink="true"&gt;&lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/105594"&gt;@skarpeck&lt;/a&gt;&amp;nbsp;does your input df contain any filters? The empty &lt;STRONG&gt;codes&lt;/STRONG&gt; variable could be due to empty microbatches maybe.&amp;nbsp;&lt;BR /&gt;&lt;BR /&gt;Please check the &lt;STRONG&gt;numInputRows&lt;/STRONG&gt; from your query's&amp;nbsp;&lt;A href="https://docs.databricks.com/en/structured-streaming/stream-monitoring.html" target="_blank"&gt;Stream Monitoring Metrics&lt;/A&gt;. I recommend you to check if there are input rows for the batch ids you're observing that lead to no data in &lt;STRONG&gt;codes.&lt;/STRONG&gt;&lt;/P&gt;</description>
    <pubDate>Thu, 28 Nov 2024 23:10:28 GMT</pubDate>
    <dc:creator>raphaelblg</dc:creator>
    <dc:date>2024-11-28T23:10:28Z</dc:date>
    <item>
      <title>Update set in foreachBatch</title>
      <link>https://community.databricks.com/t5/data-engineering/update-set-in-foreachbatch/m-p/96445#M39280</link>
      <description>&lt;P&gt;I need to track codes of records that were ingested in foreachBatch function, and pass it as a task value, so downstream tasks can take actions based on this output. What would be the best approach to achieve that? Now, I have a following solution, but I can see that sometimes it just doesn't fill the set, and I can that task value "codes" is just empty...&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;LI-CODE lang="python"&gt;codes = set()

def foreach_func(df, batch_id):
    codes.update({ code.ColCode for code in df.select("ColCode").distinct().collect() })

    # Additional logic of inserting df data into tables
    ...
    ...
    ...
    


(
input_df.writeStream
    .trigger(availableNow=True)
    .format("delta")
    .outputMode("append")                    
    .option("checkpointLocation",checkpoint_location)   
    .option("badRecordsPath", errors_path)
    .foreachBatch(foreach_func)
    .start()
    .awaitTermination()
)

dbutils.jobs.taskValues.set(key = "codes", value = list(codes))&lt;/LI-CODE&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;</description>
      <pubDate>Mon, 28 Oct 2024 10:15:53 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/update-set-in-foreachbatch/m-p/96445#M39280</guid>
      <dc:creator>skarpeck</dc:creator>
      <dc:date>2024-10-28T10:15:53Z</dc:date>
    </item>
    <item>
      <title>Re: Update set in foreachBatch</title>
      <link>https://community.databricks.com/t5/data-engineering/update-set-in-foreachbatch/m-p/96510#M39284</link>
      <description>&lt;P&gt;I found it is related to a Shared cluster mode. When I use single user mode it all works fine. Furthermore, using Accumulator is not helping....&lt;/P&gt;</description>
      <pubDate>Mon, 28 Oct 2024 13:16:49 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/update-set-in-foreachbatch/m-p/96510#M39284</guid>
      <dc:creator>skarpeck</dc:creator>
      <dc:date>2024-10-28T13:16:49Z</dc:date>
    </item>
    <item>
      <title>Re: Update set in foreachBatch</title>
      <link>https://community.databricks.com/t5/data-engineering/update-set-in-foreachbatch/m-p/100378#M40274</link>
      <description>&lt;P data-unlink="true"&gt;&lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/105594"&gt;@skarpeck&lt;/a&gt;&amp;nbsp;does your input df contain any filters? The empty &lt;STRONG&gt;codes&lt;/STRONG&gt; variable could be due to empty microbatches maybe.&amp;nbsp;&lt;BR /&gt;&lt;BR /&gt;Please check the &lt;STRONG&gt;numInputRows&lt;/STRONG&gt; from your query's&amp;nbsp;&lt;A href="https://docs.databricks.com/en/structured-streaming/stream-monitoring.html" target="_blank"&gt;Stream Monitoring Metrics&lt;/A&gt;. I recommend you to check if there are input rows for the batch ids you're observing that lead to no data in &lt;STRONG&gt;codes.&lt;/STRONG&gt;&lt;/P&gt;</description>
      <pubDate>Thu, 28 Nov 2024 23:10:28 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/update-set-in-foreachbatch/m-p/100378#M40274</guid>
      <dc:creator>raphaelblg</dc:creator>
      <dc:date>2024-11-28T23:10:28Z</dc:date>
    </item>
    <item>
      <title>Re: Update set in foreachBatch</title>
      <link>https://community.databricks.com/t5/data-engineering/update-set-in-foreachbatch/m-p/101730#M40793</link>
      <description>&lt;P&gt;Another approach is to persist the collected codes in a Delta table and then read from this table in downstream tasks.&lt;/P&gt;
&lt;P&gt;Make sure to add ample logging and counts.&lt;/P&gt;
&lt;P&gt;&lt;SPAN&gt;Checkpointing also would help if you suspect the counts in set are not the same as what you see in the&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;
&lt;P class="p1"&gt;key = "codes".&amp;nbsp;&lt;/P&gt;</description>
      <pubDate>Wed, 11 Dec 2024 10:17:50 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/update-set-in-foreachbatch/m-p/101730#M40793</guid>
      <dc:creator>NandiniN</dc:creator>
      <dc:date>2024-12-11T10:17:50Z</dc:date>
    </item>
  </channel>
</rss>

