I need to track codes of records that were ingested in foreachBatch function, and pass it as a task value, so downstream tasks can take actions based on this output. What would be the best approach to achieve that? Now, I have a following solution, but I can see that sometimes it just doesn't fill the set, and I can that task value "codes" is just empty...
codes = set()
def foreach_func(df, batch_id):
codes.update({ code.ColCode for code in df.select("ColCode").distinct().collect() })
# Additional logic of inserting df data into tables
...
...
...
(
input_df.writeStream
.trigger(availableNow=True)
.format("delta")
.outputMode("append")
.option("checkpointLocation",checkpoint_location)
.option("badRecordsPath", errors_path)
.foreachBatch(foreach_func)
.start()
.awaitTermination()
)
dbutils.jobs.taskValues.set(key = "codes", value = list(codes))