cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

How to use foreachbatch in deltalivetable or DLT?

rdobbss
New Contributor II

I need to process some transformation on incoming data as a batch and want to know if there is way to use foreachbatch option in deltalivetable. I am using autoloader to load json files and then I need to apply foreachbatch and store results into another table.

4 REPLIES 4

Kaniz
Community Manager
Community Manager

Hi @Ravi Dobariya​, This recipe helps you write the streaming aggregates in update mode using merge and foreachBatch into Delta Table in Databricks.

rdobbss
New Contributor II

@Kaniz Fatma​ I am aware about this but I am more specifically looking for using foreach in Delta Live Table pipeline. I am aware how it can be achieved in regular notebooks but haven't found anything for Delta Live Table

TomRenish
New Contributor III

Not sure if this will apply to you or not...

I was looking at the foreachbatch tool to reduce the workload of getting distinct data from a history table of 20million + records because the df.dropDuplicates() function was intermittently running out of memory during DLT pipeline execution. I ended up doing this instead:

##define the target table to push last vals to

dlt.create_target_table("stg_service_requests_unpacked_new_distinct")

#use the apply changes function to perform the merge

dlt.apply_changes(

 target = "stg_service_requests_unpacked_new_distinct",

 source = "stg_service_requests_unpacked_new",

 keys = dupe_cols_evaluation,

 sequence_by = col("_hdr_time_in_ms"),

 )

dupe_cols_evaluation is a python list where I defined the columns to evaluate for de-duplication. The outputs appear to be correct and running incremental updates is very speedy with this process.

JohnA
New Contributor III

You were running out of memory using .dropDuplicates on a stream because you need to specify a streaming watermark to define a threshold at which late data can be ignored and the state no longer needs to be kept for that time frame.

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.