cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Streaming data - Merge in Target - DLT

JothyGanesan
New Contributor III

We have streaming inputs coming from streaming tables and also the table from apply_changes.

In our target there is only one table which needs to be merged with all the sources. Each source provides different columns in our target table. 

Challenge: Every source comes in different timeframe, there are chances where we can have even one-two days lag. But with the primary key we need to update only those specific new events data into the target table. So single table having multiple columns which needs to be updated by multiple sources.

Difficulties faced: Full outer does not work as there are chances one of the events need not have any data at that point of join and we dont get any output from DLT full outer as watermark is not working.
SCD mode in apply_changes we could only retain the last option but column level updates we are not sure how to get it enabled.

Can anyone help on this streaming table design?

1 ACCEPTED SOLUTION

Accepted Solutions

Alberto_Umana
Databricks Employee
Databricks Employee

Hello @JothyGanesan,

To handle the scenario where multiple sources update different columns of a single target table at different times, you can use the following  approach to designing the streaming table:

  1. Streaming Sources and Staging Tables: For each source, create separate streaming pipelines that write to individual staging tables. This ensures that each source's data is processed independently.
  2. Delta Table and Merge Operation: Utilize the Delta table's MERGE operation to update the target table. The MERGE operation allows you to specify the conditions under which updates and insertions are made to the target table.
  3. Use foreachBatch in Streaming Queries: Implement the streaming ingestion using Spark Structured Streaming with foreachBatch. This allows you to apply custom logic within each micro-batch, including the MERGE operation.
  4. Watermarking for Latency Tolerance: Set appropriate watermarks on your streaming queries to handle the processing of late data and ensure data consistency.

Here’s an example code snippet:

import io.delta.tables._

val targetTable = DeltaTable.forPath(spark, "path/to/target/table")

def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {

  targetTable.as("t")

    .merge(

      microBatchOutputDF.as("s"),

      "s.primaryKey = t.primaryKey"

    )

    .whenMatched().updateAll()

    .whenNotMatched().insertAll()

    .execute()

}

 

source1StreamingDF.writeStream

  .foreachBatch(upsertToDelta _)

  .outputMode("update")

  .start()

 

source2StreamingDF.writeStream

  .foreachBatch(upsertToDelta _)

  .outputMode("update")

  .start()

  1. Configuration: Ensure the streaming queries are correctly configured to handle processing at the desired intervals. For instance, using maxFilesPerTrigger or maxBytesPerTrigger to manage batch sizes effectively.
  2. Handling Schema Changes: Ensure that the schema of the Delta table is managed properly, considering possible schema evolution.

View solution in original post

3 REPLIES 3

Alberto_Umana
Databricks Employee
Databricks Employee

Hello @JothyGanesan,

To handle the scenario where multiple sources update different columns of a single target table at different times, you can use the following  approach to designing the streaming table:

  1. Streaming Sources and Staging Tables: For each source, create separate streaming pipelines that write to individual staging tables. This ensures that each source's data is processed independently.
  2. Delta Table and Merge Operation: Utilize the Delta table's MERGE operation to update the target table. The MERGE operation allows you to specify the conditions under which updates and insertions are made to the target table.
  3. Use foreachBatch in Streaming Queries: Implement the streaming ingestion using Spark Structured Streaming with foreachBatch. This allows you to apply custom logic within each micro-batch, including the MERGE operation.
  4. Watermarking for Latency Tolerance: Set appropriate watermarks on your streaming queries to handle the processing of late data and ensure data consistency.

Here’s an example code snippet:

import io.delta.tables._

val targetTable = DeltaTable.forPath(spark, "path/to/target/table")

def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {

  targetTable.as("t")

    .merge(

      microBatchOutputDF.as("s"),

      "s.primaryKey = t.primaryKey"

    )

    .whenMatched().updateAll()

    .whenNotMatched().insertAll()

    .execute()

}

 

source1StreamingDF.writeStream

  .foreachBatch(upsertToDelta _)

  .outputMode("update")

  .start()

 

source2StreamingDF.writeStream

  .foreachBatch(upsertToDelta _)

  .outputMode("update")

  .start()

  1. Configuration: Ensure the streaming queries are correctly configured to handle processing at the desired intervals. For instance, using maxFilesPerTrigger or maxBytesPerTrigger to manage batch sizes effectively.
  2. Handling Schema Changes: Ensure that the schema of the Delta table is managed properly, considering possible schema evolution.

JothyGanesan
New Contributor III

Thank you for the explanation@Alberto_Umana .

we will try the same. But does it work on the databricks on unity catalog as well?

JothyGanesan
New Contributor III

Thank you this worked

Join Us as a Local Community Builder!

Passionate about hosting events and connecting people? Help us grow a vibrant local community—sign up today to get started!

Sign Up Now