Not sure if this will apply to you or not...
I was looking at the foreachbatch tool to reduce the workload of getting distinct data from a history table of 20million + records because the df.dropDuplicates() function was intermittently running out of memory during DLT pipeline execution. I ended up doing this instead:
##define the target table to push last vals to
dlt.create_target_table("stg_service_requests_unpacked_new_distinct")
#use the apply changes function to perform the merge
dlt.apply_changes(
target = "stg_service_requests_unpacked_new_distinct",
source = "stg_service_requests_unpacked_new",
keys = dupe_cols_evaluation,
sequence_by = col("_hdr_time_in_ms"),
)
dupe_cols_evaluation is a python list where I defined the columns to evaluate for de-duplication. The outputs appear to be correct and running incremental updates is very speedy with this process.