cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

How to merge two separate DELTA LIVE TABLE?

Trodenn
New Contributor III

So I have two delta live tables. One that is the master table that contains all the prior data, and another table that contains all the new data for that specific day. I want to be able to merge those two table so that the master table contains would contain the newest data. However, I only know how to do it with spark dataframes, but do not know if this is possible with delta live tables.

What I've done so far can be summed up in two approaches. converting the two delta live tables into spark dataframes and then perform the merge() operation with them is the first and then create a new dlt.table with the output of it.

The second option is to use CDC, but I am unsure on how to actually do this. I know that apply_changes function should contain a target table as well as a src table. but can I do that with two delta live tables? I really am quite confused on this matter. Any help is appreciated

from delta.tables import *
 
 
@dlt.table(name = "Merged_Table", comment = "this is the merge table for the daily update and the master table")
def merge_daily_and_master_table(): 
    df_json_3 = dlt.read_stream("df_json_3")
    MASTER_2 = dlt.read_stream("MASTER_2")
    MASTER_2 = MASTER_2.toDF()
    df_json_3 = df_json_3.toDF()
    MASTER_2.alias('MASTER_2')\
    .merge(
        df_json_3.alias('df_json_3'),
        "MASTER_2.PRODUCT_CODE = df_json_3.PRODUCT_CODE ")\
    .whenMatchedUpdate(set = {
            "product_name" : "df_json_3.NAME_ZT",
            "product_name_en" : "df_json_3.NAME",
            "description" : "df_json_3.DESCRIPTION_ZT",
            "description_en" : "df_json_3.DESCRIPTION",
            "category_code" : "df_json_3.ERPCATEGORYCODE",
            "erp_product_type" : "df_json_3.ERPPRODUCTTYPE",
            "Online" : "df_json_3.Online"
            })\
    .whenNotMatchedInsert(values = {
            "PRODUCT_CODE" : "df_json_3.PRODUCT_CODE",
            "product_name": "df_json_3.NAME_ZT",
            "product_name_en": "df_json_3.NAME",
            "description": "df_json_3.DESCRIPTION_ZT",
            "description_en": "df_json_3.DESCRIPTION",
            "category_code": "df_json_3.ERPCATEGORYCODE",
            "erp_product_type": "df_json_3.ERPPRODUCTTYPE",
            "Online": "df_json_3.Online"
            })\
    .execute()
    return(
    MASTER_2
    )

 The above code is ex on how I am trying to do at the moment

4 REPLIES 4

Ajay-Pandey
Esteemed Contributor III

@Rishabh Pandey​ 

Trodenn
New Contributor III

what do you mean?

Rishabh-Pandey
Honored Contributor III

hey @Pierre Nanquette​  what i get to know from the delta live table concept is that we can't join two stream delta live tables at a moment , so you can go with your first option ,convert tables into dataframe and then merge it , from as of now we can't merge delta live tables directly .

Rishabh Pandey

Trodenn
New Contributor III

how about turning a delta live table to a delta table? is there a way to do that? Like a way to read it, not like dlt.read, but one where it allows us to load it as just a delta table

Join 100K+ Data Experts: Register Now & Grow with Us!

Excited to expand your horizons with us? Click here to Register and begin your journey to success!

Already a member? Login and join your local regional user group! If there isn’t one near you, fill out this form and we’ll create one for you to join!