cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

How to use foreachbatch in deltalivetable or DLT?

rdobbss
New Contributor II

I need to process some transformation on incoming data as a batch and want to know if there is way to use foreachbatch option in deltalivetable. I am using autoloader to load json files and then I need to apply foreachbatch and store results into another table.

4 REPLIES 4

Kaniz_Fatma
Community Manager
Community Manager

Hi @Ravi Dobariya​, This recipe helps you write the streaming aggregates in update mode using merge and foreachBatch into Delta Table in Databricks.

rdobbss
New Contributor II

@Kaniz Fatma​ I am aware about this but I am more specifically looking for using foreach in Delta Live Table pipeline. I am aware how it can be achieved in regular notebooks but haven't found anything for Delta Live Table

TomRenish
New Contributor III

Not sure if this will apply to you or not...

I was looking at the foreachbatch tool to reduce the workload of getting distinct data from a history table of 20million + records because the df.dropDuplicates() function was intermittently running out of memory during DLT pipeline execution. I ended up doing this instead:

##define the target table to push last vals to

dlt.create_target_table("stg_service_requests_unpacked_new_distinct")

#use the apply changes function to perform the merge

dlt.apply_changes(

 target = "stg_service_requests_unpacked_new_distinct",

 source = "stg_service_requests_unpacked_new",

 keys = dupe_cols_evaluation,

 sequence_by = col("_hdr_time_in_ms"),

 )

dupe_cols_evaluation is a python list where I defined the columns to evaluate for de-duplication. The outputs appear to be correct and running incremental updates is very speedy with this process.

JohnA
New Contributor III

You were running out of memory using .dropDuplicates on a stream because you need to specify a streaming watermark to define a threshold at which late data can be ignored and the state no longer needs to be kept for that time frame.

Join 100K+ Data Experts: Register Now & Grow with Us!

Excited to expand your horizons with us? Click here to Register and begin your journey to success!

Already a member? Login and join your local regional user group! If there isn’t one near you, fill out this form and we’ll create one for you to join!