cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

Update set in foreachBatch

skarpeck
New Contributor III

I need to track codes of records that were ingested in foreachBatch function, and pass it as a task value, so downstream tasks can take actions based on this output. What would be the best approach to achieve that? Now, I have a following solution, but I can see that sometimes it just doesn't fill the set, and I can that task value "codes" is just empty...

 

 

codes = set()

def foreach_func(df, batch_id):
    codes.update({ code.ColCode for code in df.select("ColCode").distinct().collect() })

    # Additional logic of inserting df data into tables
    ...
    ...
    ...
    


(
input_df.writeStream
    .trigger(availableNow=True)
    .format("delta")
    .outputMode("append")                    
    .option("checkpointLocation",checkpoint_location)   
    .option("badRecordsPath", errors_path)
    .foreachBatch(foreach_func)
    .start()
    .awaitTermination()
)

dbutils.jobs.taskValues.set(key = "codes", value = list(codes))

 

 

 

3 REPLIES 3

skarpeck
New Contributor III

I found it is related to a Shared cluster mode. When I use single user mode it all works fine. Furthermore, using Accumulator is not helping....

raphaelblg
Databricks Employee
Databricks Employee

@skarpeck does your input df contain any filters? The empty codes variable could be due to empty microbatches maybe. 

Please check the numInputRows from your query's Stream Monitoring Metrics. I recommend you to check if there are input rows for the batch ids you're observing that lead to no data in codes.

Best regards,

Raphael Balogo
Sr. Technical Solutions Engineer
Databricks

NandiniN
Databricks Employee
Databricks Employee

Another approach is to persist the collected codes in a Delta table and then read from this table in downstream tasks.

Make sure to add ample logging and counts.

Checkpointing also would help if you suspect the counts in set are not the same as what you see in the 

key = "codes". 

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโ€™t want to miss the chance to attend and share knowledge.

If there isnโ€™t a group near you, start one and help create a community that brings people together.

Request a New Group