cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

custom upsert for delta live tables apply_changes()

PearceR
New Contributor III

Hello community :).

I am currently implementing some pipelines using DLT. They are working great for my medalion architecture for landed json in bronze -> silver (using apply_changes) then materialized gold views ontop.

However, I am attempting to create a new silver table by merging into it from an existing silver table. I only want to insert one column (let's call it id1), then I would like to set a second column (id2) equal to my source column id1.

So in essence data would move like so:

SOURCE: id = A

-> MERGE

TARGET: id1 = A, id2 = A

Is this possible with apply_changes?

I have been able to achieve it traditionally using deltaTables and .merge:

silver_match.alias("t").merge(
        silver_supporter.alias("s"), "s.id1 = t.source_supporter_id"
    ).whenMatchedUpdate(set={"source_supporter_id": "s.id1"}).whenNotMatchedInsert(
        values={
            "source_supporter_id": "s.id1",
            "cbi_id": "s.cbi_id",
        }
    ).execute()

if this isn't supported using apply_changes, are there any recommended methods (or just general suggestions) to being able to load this table with my dlt pipelines? If not, does anyone have experience of running hybrid dlt and non-dlt table pipelines.

Any help or suggestions of where I might be able to get more information would be greatly apprecitated!

Thanks,

Robbie

1 ACCEPTED SOLUTION

Accepted Solutions

Anonymous
Not applicable

@Robert Pearce​ :

It is possible to achieve the desired behavior using apply_changes in Databricks Delta Lake. You can use the merge operation to merge data from your source into your target Delta table, and then use whenMatchedUpdate to update the id2 column to be equal to the id1 column in the source data.

Here's an example code snippet:

from delta.tables import *
 
# Define source and target tables
source_table = "bronze_table"
target_table = "silver_table"
 
# Read the source data as a DataFrame
source_df = spark.read.table(source_table)
 
# Create a DeltaTable object for the target table
target_delta_table = DeltaTable.forPath(spark, target_table)
 
# Merge data from the source table into the target table
target_delta_table.alias("t").merge(
    source_df.alias("s"),
    "s.id = t.id1"
).whenMatchedUpdate(
    set={"id2": "s.id"}
).whenNotMatchedInsert(
    values={"id1": "s.id", "id2": "s.id"}
).execute()

This code will merge the data from the bronze_table source table into the silver_table target table. The

whenMatchedUpdate clause updates the id2 column to be equal to the id1 column in the source data, and the whenNotMatchedInsert clause inserts new rows with the id1 and id2 columns set to the

id column in the source data.

If you have any issues or need further guidance, the Databricks documentation on Delta Lake's merge

operation and apply_changes function can be very helpful.

View solution in original post

4 REPLIES 4

Anonymous
Not applicable

@Robert Pearce​ :

It is possible to achieve the desired behavior using apply_changes in Databricks Delta Lake. You can use the merge operation to merge data from your source into your target Delta table, and then use whenMatchedUpdate to update the id2 column to be equal to the id1 column in the source data.

Here's an example code snippet:

from delta.tables import *
 
# Define source and target tables
source_table = "bronze_table"
target_table = "silver_table"
 
# Read the source data as a DataFrame
source_df = spark.read.table(source_table)
 
# Create a DeltaTable object for the target table
target_delta_table = DeltaTable.forPath(spark, target_table)
 
# Merge data from the source table into the target table
target_delta_table.alias("t").merge(
    source_df.alias("s"),
    "s.id = t.id1"
).whenMatchedUpdate(
    set={"id2": "s.id"}
).whenNotMatchedInsert(
    values={"id1": "s.id", "id2": "s.id"}
).execute()

This code will merge the data from the bronze_table source table into the silver_table target table. The

whenMatchedUpdate clause updates the id2 column to be equal to the id1 column in the source data, and the whenNotMatchedInsert clause inserts new rows with the id1 and id2 columns set to the

id column in the source data.

If you have any issues or need further guidance, the Databricks documentation on Delta Lake's merge

operation and apply_changes function can be very helpful.

PearceR
New Contributor III

that's great thank you!

I ended up getting to something similar myself so glad you suggest that too :).

I am wondering best way to handle the element of creating a new table if not exists now. I am using Unity Catalog so having to work with something using a try and except since I can't use Catalog.table_exists.

Have you had any experience of this?

while using .merge statement within DLT , I am getting error as "Attribute `merge` is not supported." , can you suggest how you resolved this issue.

Harsh141220
New Contributor II

Is it possible to have custom upserts for streaming tables in delta live tables?
Im getting the error:
pyspark.errors.exceptions.captured.AnalysisException: `blusmart_poc.information_schema.sessions` is not a Delta table.

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group