cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

How to merge two separate DELTA LIVE TABLE?

Trodenn
New Contributor III

So I have two delta live tables. One that is the master table that contains all the prior data, and another table that contains all the new data for that specific day. I want to be able to merge those two table so that the master table contains would contain the newest data. However, I only know how to do it with spark dataframes, but do not know if this is possible with delta live tables.

What I've done so far can be summed up in two approaches. converting the two delta live tables into spark dataframes and then perform the merge() operation with them is the first and then create a new dlt.table with the output of it.

The second option is to use CDC, but I am unsure on how to actually do this. I know that apply_changes function should contain a target table as well as a src table. but can I do that with two delta live tables? I really am quite confused on this matter. Any help is appreciated

from delta.tables import *
 
 
@dlt.table(name = "Merged_Table", comment = "this is the merge table for the daily update and the master table")
def merge_daily_and_master_table(): 
    df_json_3 = dlt.read_stream("df_json_3")
    MASTER_2 = dlt.read_stream("MASTER_2")
    MASTER_2 = MASTER_2.toDF()
    df_json_3 = df_json_3.toDF()
    MASTER_2.alias('MASTER_2')\
    .merge(
        df_json_3.alias('df_json_3'),
        "MASTER_2.PRODUCT_CODE = df_json_3.PRODUCT_CODE ")\
    .whenMatchedUpdate(set = {
            "product_name" : "df_json_3.NAME_ZT",
            "product_name_en" : "df_json_3.NAME",
            "description" : "df_json_3.DESCRIPTION_ZT",
            "description_en" : "df_json_3.DESCRIPTION",
            "category_code" : "df_json_3.ERPCATEGORYCODE",
            "erp_product_type" : "df_json_3.ERPPRODUCTTYPE",
            "Online" : "df_json_3.Online"
            })\
    .whenNotMatchedInsert(values = {
            "PRODUCT_CODE" : "df_json_3.PRODUCT_CODE",
            "product_name": "df_json_3.NAME_ZT",
            "product_name_en": "df_json_3.NAME",
            "description": "df_json_3.DESCRIPTION_ZT",
            "description_en": "df_json_3.DESCRIPTION",
            "category_code": "df_json_3.ERPCATEGORYCODE",
            "erp_product_type": "df_json_3.ERPPRODUCTTYPE",
            "Online": "df_json_3.Online"
            })\
    .execute()
    return(
    MASTER_2
    )

 The above code is ex on how I am trying to do at the moment

4 REPLIES 4

Ajay-Pandey
Esteemed Contributor III

@Rishabh Pandey​ 

Trodenn
New Contributor III

what do you mean?

Rishabh264
Honored Contributor II

hey @Pierre Nanquette​  what i get to know from the delta live table concept is that we can't join two stream delta live tables at a moment , so you can go with your first option ,convert tables into dataframe and then merge it , from as of now we can't merge delta live tables directly .

Trodenn
New Contributor III

how about turning a delta live table to a delta table? is there a way to do that? Like a way to read it, not like dlt.read, but one where it allows us to load it as just a delta table

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.