Hi, Thanks for the suggestion.
Performance is dreadful writing from cloud_files using apply into (61M rows). So I have loaded bronze, including the duplicates and used the silver layer to de-dup using apply into.
headers 6.5M rows
items 61M rows
The pipeline executes and on a full refresh, never finished, using 1 worker & 4 executors. runs out of memory after about 30 minutes. There are no issues with structured streaming and delta. I'm pretty sure the below should work, but not having much luck. The join seems to be killing it.
CREATE STREAMING TABLE sales_header(
sales_header_bk string,
date_created timestamp,
date_month_closed date
...more fields....
CONSTRAINT valid_site_bk EXPECT (sales_header_bk IS NOT NULL) ON VIOLATION FAIL UPDATE
)
PARTITIONED BY (date_month_closed)
COMMENT "The cleaned sales header valid sales_header_bk. Partitioned by date_month_closed."
TBLPROPERTIES ("quality" = "silver");
APPLY CHANGES INTO LIVE.epos_sales_header
FROM
(
SELECT
concat_ws('-', h.site_id, h.check_item_id, h.split_number) as sales_header_bk,
...more fields....
FROM
STREAM(dev_bronze.quadranet.sales_masterdata_header) h
) KEYS (sales_header_bk) SEQUENCE BY date_created;
CREATE STREAMING TABLE LIVE.sales_item(
sales_item_bk string,
sales_header_bk string,
...more fields....
date_created timestamp,
date_loaded_utc timestamp,
date_closed timestamp,
date_closed_adjusted int,
date_month_closed int,
date_id_closed int,
date_id_closed_adjusted int,
CONSTRAINT valid_site_id EXPECT (sales_item_bk IS NOT NULL) ON VIOLATION FAIL UPDATE
) PARTITIONED BY (date_month_closed) COMMENT "The cleaned sales items with valid sales_header_bk. Partitioned by date_month_closed." TBLPROPERTIES ("quality" = "silver");
APPLY CHANGES INTO LIVE.sales_item
FROM
(
SELECT
i.key as sales_item_bk,
concat_ws('-', i.site_id, i.check_item_id, i.split_number) as sales_header_bk,
i.date_created,
...more fields....
h.date_closed,
h.date_closed_adjusted,
h.date_month_closed,
h.date_id_closed,
h.date_id_closed_adjusted
from
STREAM(bronze.sales_item) i
INNER JOIN STREAM(LIVE.sales_header) h on i.sales_item_bk = h.sales_item_bk
) KEYS (sales_item_bk) SEQUENCE BY date_created
Here is the SQL sample (there are a lot more fields)