How to use foreachbatch in deltalivetable or DLT?
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
07-11-2022 08:20 AM
I need to process some transformation on incoming data as a batch and want to know if there is way to use foreachbatch option in deltalivetable. I am using autoloader to load json files and then I need to apply foreachbatch and store results into another table.
- Labels:
-
DLT
-
DLT Pipeline
-
Foreachbatch
-
JSON Files
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
07-14-2022 07:32 AM
@Kaniz Fatma I am aware about this but I am more specifically looking for using foreach in Delta Live Table pipeline. I am aware how it can be achieved in regular notebooks but haven't found anything for Delta Live Table
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
01-18-2023 11:33 AM
Not sure if this will apply to you or not...
I was looking at the foreachbatch tool to reduce the workload of getting distinct data from a history table of 20million + records because the df.dropDuplicates() function was intermittently running out of memory during DLT pipeline execution. I ended up doing this instead:
##define the target table to push last vals to
dlt.create_target_table("stg_service_requests_unpacked_new_distinct")
#use the apply changes function to perform the merge
dlt.apply_changes(
target = "stg_service_requests_unpacked_new_distinct",
source = "stg_service_requests_unpacked_new",
keys = dupe_cols_evaluation,
sequence_by = col("_hdr_time_in_ms"),
)
dupe_cols_evaluation is a python list where I defined the columns to evaluate for de-duplication. The outputs appear to be correct and running incremental updates is very speedy with this process.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
06-21-2023 07:49 PM
You were running out of memory using .dropDuplicates on a stream because you need to specify a streaming watermark to define a threshold at which late data can be ignored and the state no longer needs to be kept for that time frame.