Alberto_Umana
Databricks Employee
Databricks Employee

Hi @JothyGanesan,

Delta Live Tables (DLT) currently does not support the MERGE operation directly within a DLT pipeline. This limitation is not related to the DLT version but rather a general restriction in the functionality of DLT.

 

However, you can achieve the desired outcome by using a combination of foreachBatch and MERGE within a streaming query. Here is an example of how you can use foreachBatch to perform a MERGE operation

 

from delta.tables import *

 

def upsert_to_delta(microBatchOutputDF, batchId):

    deltaTable = DeltaTable.forName(spark, "target_table_name")

    deltaTable.alias("t").merge(

        microBatchOutputDF.alias("s"),

        "s.key = t.key"

    ).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()

 

streamingDF.writeStream.foreachBatch(upsert_to_delta).outputMode("update").start()

 

In this example, upsert_to_delta is a function that performs the MERGE operation using the Delta Lake APIs. The foreachBatch method is used to apply this function to each micro-batch of the streaming DataFrame