12-21-2022 03:01 PM
So I have two delta live tables. One that is the master table that contains all the prior data, and another table that contains all the new data for that specific day. I want to be able to merge those two table so that the master table contains would contain the newest data. However, I only know how to do it with spark dataframes, but do not know if this is possible with delta live tables.
What I've done so far can be summed up in two approaches. converting the two delta live tables into spark dataframes and then perform the merge() operation with them is the first and then create a new dlt.table with the output of it.
The second option is to use CDC, but I am unsure on how to actually do this. I know that apply_changes function should contain a target table as well as a src table. but can I do that with two delta live tables? I really am quite confused on this matter. Any help is appreciated
from delta.tables import *
@dlt.table(name = "Merged_Table", comment = "this is the merge table for the daily update and the master table")
def merge_daily_and_master_table():
df_json_3 = dlt.read_stream("df_json_3")
MASTER_2 = dlt.read_stream("MASTER_2")
MASTER_2 = MASTER_2.toDF()
df_json_3 = df_json_3.toDF()
MASTER_2.alias('MASTER_2')\
.merge(
df_json_3.alias('df_json_3'),
"MASTER_2.PRODUCT_CODE = df_json_3.PRODUCT_CODE ")\
.whenMatchedUpdate(set = {
"product_name" : "df_json_3.NAME_ZT",
"product_name_en" : "df_json_3.NAME",
"description" : "df_json_3.DESCRIPTION_ZT",
"description_en" : "df_json_3.DESCRIPTION",
"category_code" : "df_json_3.ERPCATEGORYCODE",
"erp_product_type" : "df_json_3.ERPPRODUCTTYPE",
"Online" : "df_json_3.Online"
})\
.whenNotMatchedInsert(values = {
"PRODUCT_CODE" : "df_json_3.PRODUCT_CODE",
"product_name": "df_json_3.NAME_ZT",
"product_name_en": "df_json_3.NAME",
"description": "df_json_3.DESCRIPTION_ZT",
"description_en": "df_json_3.DESCRIPTION",
"category_code": "df_json_3.ERPCATEGORYCODE",
"erp_product_type": "df_json_3.ERPPRODUCTTYPE",
"Online": "df_json_3.Online"
})\
.execute()
return(
MASTER_2
)
The above code is ex on how I am trying to do at the moment
12-21-2022 07:55 PM
@Rishabh Pandey
12-21-2022 08:20 PM
what do you mean?
12-21-2022 09:16 PM
hey @Pierre Nanquette what i get to know from the delta live table concept is that we can't join two stream delta live tables at a moment , so you can go with your first option ,convert tables into dataframe and then merge it , from as of now we can't merge delta live tables directly .
12-21-2022 09:27 PM
how about turning a delta live table to a delta table? is there a way to do that? Like a way to read it, not like dlt.read, but one where it allows us to load it as just a delta table
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.
If there isn’t a group near you, start one and help create a community that brings people together.
Request a New Group