cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

Streaming Delta Live Table, if I re-run the pipeline, does it append the new data to the current table?

Mado
Valued Contributor II

Hi,

I have a question about DLT table.

Assume that I have a streaming DLT pipeline which reads data from a Bronze table and apply transformation on data.

Pipeline mode is triggered.

If I re-run the pipeline, does it append new data to the current table? If so, how to be sure that there is no duplicate in the resulting table?

In case of using structured streaming, method "writeStream" has an option "outputMode" to control how to write data. But, DLT pipeline doesn't have "writeStream" method. So, how can I control writing data into the DLT table?

1 ACCEPTED SOLUTION

Accepted Solutions

Anonymous
Not applicable

@Mohammad Saber​ :

In a Databricks Delta Lake (DLT) pipeline, when you re-run the pipeline in "append" mode, new data will be appended to the existing table. Delta Lake provides built-in support for handling duplicates through its "upsert" functionality. You can use the "MERGE" command to merge the new data with the existing data in the table based on a specific condition. This can be done using a primary key or unique identifier to ensure that duplicates are not added to the table.

To control writing data into the DLT table in a DLT pipeline, you can use the Delta Lake API to write data to the table. This can be done using the deltaTable object in PySpark, which provides an interface for reading from and writing to Delta tables. When writing data to a Delta table, you can use the mode parameter to control how data is written. The mode parameter can take values like "append", "overwrite", and "ignore", among others. To ensure that duplicates are not added to the table, you can use the "upsert" mode, which is supported by Delta Lake.

View solution in original post

4 REPLIES 4

Anonymous
Not applicable

@Mohammad Saber​ :

In a Databricks Delta Lake (DLT) pipeline, when you re-run the pipeline in "append" mode, new data will be appended to the existing table. Delta Lake provides built-in support for handling duplicates through its "upsert" functionality. You can use the "MERGE" command to merge the new data with the existing data in the table based on a specific condition. This can be done using a primary key or unique identifier to ensure that duplicates are not added to the table.

To control writing data into the DLT table in a DLT pipeline, you can use the Delta Lake API to write data to the table. This can be done using the deltaTable object in PySpark, which provides an interface for reading from and writing to Delta tables. When writing data to a Delta table, you can use the mode parameter to control how data is written. The mode parameter can take values like "append", "overwrite", and "ignore", among others. To ensure that duplicates are not added to the table, you can use the "upsert" mode, which is supported by Delta Lake.

Mado
Valued Contributor II

Thanks @Suteja Kanuri​ 

If there is any sample notebook on how to control writing data into the DLT table in a DLT pipeline using the Delta Lake API, could you please share it with me?

Thanks.  

Anonymous
Not applicable

@Mohammad Saber​ : can you check out - https://www.dbdemos.ai/

also giving you some code -

# Import necessary libraries
from delta import DeltaTable
from pyspark.sql.functions import *
 
# Define the Delta Lake table path
table_path = "/mnt/delta/my_table"
 
# Load data into a Spark DataFrame
df = spark.read.format("csv").option("header", "true").load("/mnt/my_data.csv")
 
# Filter the data to only include rows with a certain value
df_filtered = df.filter(col("my_column") == "my_value")
 
# Create a DeltaTable object for the table
delta_table = DeltaTable.forPath(spark, table_path)
 
# Check if the table exists and create it if it doesn't
if not DeltaTable.isDeltaTable(spark, table_path):
    delta_table.create(
        df_filtered.schema,
        partitionBy="my_column"
    )
 
# Insert the filtered data into the table
delta_table.alias("t").merge(
    df_filtered.alias("s"),
    "t.my_column = s.my_column"
).whenNotMatchedInsertAll().execute()

In this example, we first load some data into a Spark DataFrame and filter it to only include rows with a certain value. We then create a DeltaTable object for the DLT table at the specified path and check if it exists. If the table doesn't exist, we create it with the schema of the filtered DataFrame and partition it by a column. Finally, we use the DeltaTable merge() function to insert the filtered data into the table. The merge() function performs an upsert operation, updating rows that match a given condition and inserting rows that don't. In this case, we're using the my_column column as the merge condition, which means that if a row with the same value of my_column already exists in the table, it will be updated with the values from the filtered DataFrame, and if it doesn't exist, a new row will be inserted.

Valentin1
New Contributor III

Can you change the outputMode of a streaming pipeline in Delta Live Tables? (I wish to chage to update mode to use `applyInPandasWithState` in `update` outputMode. Is there any difference from `append` in delta live tables as it seems to work with that?)

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.