So I have two delta live tables. One that is the master table that contains all the prior data, and another table that contains all the new data for that specific day. I want to be able to merge those two table so that the master table contains would contain the newest data. However, I only know how to do it with spark dataframes, but do not know if this is possible with delta live tables.
What I've done so far can be summed up in two approaches. converting the two delta live tables into spark dataframes and then perform the merge() operation with them is the first and then create a new dlt.table with the output of it.
The second option is to use CDC, but I am unsure on how to actually do this. I know that apply_changes function should contain a target table as well as a src table. but can I do that with two delta live tables? I really am quite confused on this matter. Any help is appreciated
from delta.tables import *
@dlt.table(name = "Merged_Table", comment = "this is the merge table for the daily update and the master table")
def merge_daily_and_master_table():
df_json_3 = dlt.read_stream("df_json_3")
MASTER_2 = dlt.read_stream("MASTER_2")
MASTER_2 = MASTER_2.toDF()
df_json_3 = df_json_3.toDF()
MASTER_2.alias('MASTER_2')\
.merge(
df_json_3.alias('df_json_3'),
"MASTER_2.PRODUCT_CODE = df_json_3.PRODUCT_CODE ")\
.whenMatchedUpdate(set = {
"product_name" : "df_json_3.NAME_ZT",
"product_name_en" : "df_json_3.NAME",
"description" : "df_json_3.DESCRIPTION_ZT",
"description_en" : "df_json_3.DESCRIPTION",
"category_code" : "df_json_3.ERPCATEGORYCODE",
"erp_product_type" : "df_json_3.ERPPRODUCTTYPE",
"Online" : "df_json_3.Online"
})\
.whenNotMatchedInsert(values = {
"PRODUCT_CODE" : "df_json_3.PRODUCT_CODE",
"product_name": "df_json_3.NAME_ZT",
"product_name_en": "df_json_3.NAME",
"description": "df_json_3.DESCRIPTION_ZT",
"description_en": "df_json_3.DESCRIPTION",
"category_code": "df_json_3.ERPCATEGORYCODE",
"erp_product_type": "df_json_3.ERPPRODUCTTYPE",
"Online": "df_json_3.Online"
})\
.execute()
return(
MASTER_2
)
The above code is ex on how I am trying to do at the moment