cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

How to merge two separate DELTA LIVE TABLE?

Trodenn
New Contributor III

So I have two delta live tables. One that is the master table that contains all the prior data, and another table that contains all the new data for that specific day. I want to be able to merge those two table so that the master table contains would contain the newest data. However, I only know how to do it with spark dataframes, but do not know if this is possible with delta live tables.

What I've done so far can be summed up in two approaches. converting the two delta live tables into spark dataframes and then perform the merge() operation with them is the first and then create a new dlt.table with the output of it.

The second option is to use CDC, but I am unsure on how to actually do this. I know that apply_changes function should contain a target table as well as a src table. but can I do that with two delta live tables? I really am quite confused on this matter. Any help is appreciated

from delta.tables import *
 
 
@dlt.table(name = "Merged_Table", comment = "this is the merge table for the daily update and the master table")
def merge_daily_and_master_table(): 
    df_json_3 = dlt.read_stream("df_json_3")
    MASTER_2 = dlt.read_stream("MASTER_2")
    MASTER_2 = MASTER_2.toDF()
    df_json_3 = df_json_3.toDF()
    MASTER_2.alias('MASTER_2')\
    .merge(
        df_json_3.alias('df_json_3'),
        "MASTER_2.PRODUCT_CODE = df_json_3.PRODUCT_CODE ")\
    .whenMatchedUpdate(set = {
            "product_name" : "df_json_3.NAME_ZT",
            "product_name_en" : "df_json_3.NAME",
            "description" : "df_json_3.DESCRIPTION_ZT",
            "description_en" : "df_json_3.DESCRIPTION",
            "category_code" : "df_json_3.ERPCATEGORYCODE",
            "erp_product_type" : "df_json_3.ERPPRODUCTTYPE",
            "Online" : "df_json_3.Online"
            })\
    .whenNotMatchedInsert(values = {
            "PRODUCT_CODE" : "df_json_3.PRODUCT_CODE",
            "product_name": "df_json_3.NAME_ZT",
            "product_name_en": "df_json_3.NAME",
            "description": "df_json_3.DESCRIPTION_ZT",
            "description_en": "df_json_3.DESCRIPTION",
            "category_code": "df_json_3.ERPCATEGORYCODE",
            "erp_product_type": "df_json_3.ERPPRODUCTTYPE",
            "Online": "df_json_3.Online"
            })\
    .execute()
    return(
    MASTER_2
    )

 The above code is ex on how I am trying to do at the moment

4 REPLIES 4

Ajay-Pandey
Esteemed Contributor III

@Rishabh Pandey​ 

Ajay Kumar Pandey

Trodenn
New Contributor III

what do you mean?

Rishabh-Pandey
Esteemed Contributor

hey @Pierre Nanquette​  what i get to know from the delta live table concept is that we can't join two stream delta live tables at a moment , so you can go with your first option ,convert tables into dataframe and then merge it , from as of now we can't merge delta live tables directly .

Rishabh Pandey

Trodenn
New Contributor III

how about turning a delta live table to a delta table? is there a way to do that? Like a way to read it, not like dlt.read, but one where it allows us to load it as just a delta table

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group