Update set in foreachBatch
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
10-28-2024 03:15 AM - edited 10-28-2024 03:15 AM
I need to track codes of records that were ingested in foreachBatch function, and pass it as a task value, so downstream tasks can take actions based on this output. What would be the best approach to achieve that? Now, I have a following solution, but I can see that sometimes it just doesn't fill the set, and I can that task value "codes" is just empty...
codes = set()
def foreach_func(df, batch_id):
codes.update({ code.ColCode for code in df.select("ColCode").distinct().collect() })
# Additional logic of inserting df data into tables
...
...
...
(
input_df.writeStream
.trigger(availableNow=True)
.format("delta")
.outputMode("append")
.option("checkpointLocation",checkpoint_location)
.option("badRecordsPath", errors_path)
.foreachBatch(foreach_func)
.start()
.awaitTermination()
)
dbutils.jobs.taskValues.set(key = "codes", value = list(codes))
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
10-28-2024 06:16 AM
I found it is related to a Shared cluster mode. When I use single user mode it all works fine. Furthermore, using Accumulator is not helping....
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
11-28-2024 03:10 PM
@skarpeck does your input df contain any filters? The empty codes variable could be due to empty microbatches maybe.
Please check the numInputRows from your query's Stream Monitoring Metrics. I recommend you to check if there are input rows for the batch ids you're observing that lead to no data in codes.
Raphael Balogo
Sr. Technical Solutions Engineer
Databricks
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
12-11-2024 02:17 AM
Another approach is to persist the collected codes in a Delta table and then read from this table in downstream tasks.
Make sure to add ample logging and counts.
Checkpointing also would help if you suspect the counts in set are not the same as what you see in the
key = "codes".

