TomRenish
New Contributor III

Not sure if this will apply to you or not...

I was looking at the foreachbatch tool to reduce the workload of getting distinct data from a history table of 20million + records because the df.dropDuplicates() function was intermittently running out of memory during DLT pipeline execution. I ended up doing this instead:

##define the target table to push last vals to

dlt.create_target_table("stg_service_requests_unpacked_new_distinct")

#use the apply changes function to perform the merge

dlt.apply_changes(

 target = "stg_service_requests_unpacked_new_distinct",

 source = "stg_service_requests_unpacked_new",

 keys = dupe_cols_evaluation,

 sequence_by = col("_hdr_time_in_ms"),

 )

dupe_cols_evaluation is a python list where I defined the columns to evaluate for de-duplication. The outputs appear to be correct and running incremental updates is very speedy with this process.