cancel
Showing results for 
Search instead for 
Did you mean: 

custom upsert for delta live tables apply_changes()

PearceR
New Contributor II

Hello community :).

I am currently implementing some pipelines using DLT. They are working great for my medalion architecture for landed json in bronze -> silver (using apply_changes) then materialized gold views ontop.

However, I am attempting to create a new silver table by merging into it from an existing silver table. I only want to insert one column (let's call it id1), then I would like to set a second column (id2) equal to my source column id1.

So in essence data would move like so:

SOURCE: id = A

-> MERGE

TARGET: id1 = A, id2 = A

Is this possible with apply_changes?

I have been able to achieve it traditionally using deltaTables and .merge:

silver_match.alias("t").merge(
        silver_supporter.alias("s"), "s.id1 = t.source_supporter_id"
    ).whenMatchedUpdate(set={"source_supporter_id": "s.id1"}).whenNotMatchedInsert(
        values={
            "source_supporter_id": "s.id1",
            "cbi_id": "s.cbi_id",
        }
    ).execute()

if this isn't supported using apply_changes, are there any recommended methods (or just general suggestions) to being able to load this table with my dlt pipelines? If not, does anyone have experience of running hybrid dlt and non-dlt table pipelines.

Any help or suggestions of where I might be able to get more information would be greatly apprecitated!

Thanks,

Robbie

1 ACCEPTED SOLUTION

Accepted Solutions

Anonymous
Not applicable

@Robert Pearce​ :

It is possible to achieve the desired behavior using apply_changes in Databricks Delta Lake. You can use the merge operation to merge data from your source into your target Delta table, and then use whenMatchedUpdate to update the id2 column to be equal to the id1 column in the source data.

Here's an example code snippet:

from delta.tables import *
 
# Define source and target tables
source_table = "bronze_table"
target_table = "silver_table"
 
# Read the source data as a DataFrame
source_df = spark.read.table(source_table)
 
# Create a DeltaTable object for the target table
target_delta_table = DeltaTable.forPath(spark, target_table)
 
# Merge data from the source table into the target table
target_delta_table.alias("t").merge(
    source_df.alias("s"),
    "s.id = t.id1"
).whenMatchedUpdate(
    set={"id2": "s.id"}
).whenNotMatchedInsert(
    values={"id1": "s.id", "id2": "s.id"}
).execute()

This code will merge the data from the bronze_table source table into the silver_table target table. The

whenMatchedUpdate clause updates the id2 column to be equal to the id1 column in the source data, and the whenNotMatchedInsert clause inserts new rows with the id1 and id2 columns set to the

id column in the source data.

If you have any issues or need further guidance, the Databricks documentation on Delta Lake's merge

operation and apply_changes function can be very helpful.

View solution in original post

2 REPLIES 2

Anonymous
Not applicable

@Robert Pearce​ :

It is possible to achieve the desired behavior using apply_changes in Databricks Delta Lake. You can use the merge operation to merge data from your source into your target Delta table, and then use whenMatchedUpdate to update the id2 column to be equal to the id1 column in the source data.

Here's an example code snippet:

from delta.tables import *
 
# Define source and target tables
source_table = "bronze_table"
target_table = "silver_table"
 
# Read the source data as a DataFrame
source_df = spark.read.table(source_table)
 
# Create a DeltaTable object for the target table
target_delta_table = DeltaTable.forPath(spark, target_table)
 
# Merge data from the source table into the target table
target_delta_table.alias("t").merge(
    source_df.alias("s"),
    "s.id = t.id1"
).whenMatchedUpdate(
    set={"id2": "s.id"}
).whenNotMatchedInsert(
    values={"id1": "s.id", "id2": "s.id"}
).execute()

This code will merge the data from the bronze_table source table into the silver_table target table. The

whenMatchedUpdate clause updates the id2 column to be equal to the id1 column in the source data, and the whenNotMatchedInsert clause inserts new rows with the id1 and id2 columns set to the

id column in the source data.

If you have any issues or need further guidance, the Databricks documentation on Delta Lake's merge

operation and apply_changes function can be very helpful.

PearceR
New Contributor II

that's great thank you!

I ended up getting to something similar myself so glad you suggest that too :).

I am wondering best way to handle the element of creating a new table if not exists now. I am using Unity Catalog so having to work with something using a try and except since I can't use Catalog.table_exists.

Have you had any experience of this?

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.