04-21-2023 02:56 AM
Hello community :).
I am currently implementing some pipelines using DLT. They are working great for my medalion architecture for landed json in bronze -> silver (using apply_changes) then materialized gold views ontop.
However, I am attempting to create a new silver table by merging into it from an existing silver table. I only want to insert one column (let's call it id1), then I would like to set a second column (id2) equal to my source column id1.
So in essence data would move like so:
SOURCE: id = A
-> MERGE
TARGET: id1 = A, id2 = A
Is this possible with apply_changes?
I have been able to achieve it traditionally using deltaTables and .merge:
silver_match.alias("t").merge(
silver_supporter.alias("s"), "s.id1 = t.source_supporter_id"
).whenMatchedUpdate(set={"source_supporter_id": "s.id1"}).whenNotMatchedInsert(
values={
"source_supporter_id": "s.id1",
"cbi_id": "s.cbi_id",
}
).execute()
if this isn't supported using apply_changes, are there any recommended methods (or just general suggestions) to being able to load this table with my dlt pipelines? If not, does anyone have experience of running hybrid dlt and non-dlt table pipelines.
Any help or suggestions of where I might be able to get more information would be greatly apprecitated!
Thanks,
Robbie
04-25-2023 10:18 PM
@Robert Pearce :
It is possible to achieve the desired behavior using apply_changes in Databricks Delta Lake. You can use the merge operation to merge data from your source into your target Delta table, and then use whenMatchedUpdate to update the id2 column to be equal to the id1 column in the source data.
Here's an example code snippet:
from delta.tables import *
# Define source and target tables
source_table = "bronze_table"
target_table = "silver_table"
# Read the source data as a DataFrame
source_df = spark.read.table(source_table)
# Create a DeltaTable object for the target table
target_delta_table = DeltaTable.forPath(spark, target_table)
# Merge data from the source table into the target table
target_delta_table.alias("t").merge(
source_df.alias("s"),
"s.id = t.id1"
).whenMatchedUpdate(
set={"id2": "s.id"}
).whenNotMatchedInsert(
values={"id1": "s.id", "id2": "s.id"}
).execute()
This code will merge the data from the bronze_table source table into the silver_table target table. The
whenMatchedUpdate clause updates the id2 column to be equal to the id1 column in the source data, and the whenNotMatchedInsert clause inserts new rows with the id1 and id2 columns set to the
id column in the source data.
If you have any issues or need further guidance, the Databricks documentation on Delta Lake's merge
operation and apply_changes function can be very helpful.
04-25-2023 10:18 PM
@Robert Pearce :
It is possible to achieve the desired behavior using apply_changes in Databricks Delta Lake. You can use the merge operation to merge data from your source into your target Delta table, and then use whenMatchedUpdate to update the id2 column to be equal to the id1 column in the source data.
Here's an example code snippet:
from delta.tables import *
# Define source and target tables
source_table = "bronze_table"
target_table = "silver_table"
# Read the source data as a DataFrame
source_df = spark.read.table(source_table)
# Create a DeltaTable object for the target table
target_delta_table = DeltaTable.forPath(spark, target_table)
# Merge data from the source table into the target table
target_delta_table.alias("t").merge(
source_df.alias("s"),
"s.id = t.id1"
).whenMatchedUpdate(
set={"id2": "s.id"}
).whenNotMatchedInsert(
values={"id1": "s.id", "id2": "s.id"}
).execute()
This code will merge the data from the bronze_table source table into the silver_table target table. The
whenMatchedUpdate clause updates the id2 column to be equal to the id1 column in the source data, and the whenNotMatchedInsert clause inserts new rows with the id1 and id2 columns set to the
id column in the source data.
If you have any issues or need further guidance, the Databricks documentation on Delta Lake's merge
operation and apply_changes function can be very helpful.
05-12-2023 01:23 AM
that's great thank you!
I ended up getting to something similar myself so glad you suggest that too :).
I am wondering best way to handle the element of creating a new table if not exists now. I am using Unity Catalog so having to work with something using a try and except since I can't use Catalog.table_exists.
Have you had any experience of this?
09-29-2024 11:15 PM
while using .merge statement within DLT , I am getting error as "Attribute `merge` is not supported." , can you suggest how you resolved this issue.
06-01-2024 11:22 PM
Is it possible to have custom upserts for streaming tables in delta live tables?
Im getting the error:
pyspark.errors.exceptions.captured.AnalysisException: `blusmart_poc.information_schema.sessions` is not a Delta table.
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.
If there isn’t a group near you, start one and help create a community that brings people together.
Request a New Group