Alberto_Umana
Databricks Employee
Databricks Employee

Hello @JothyGanesan,

To handle the scenario where multiple sources update different columns of a single target table at different times, you can use the following  approach to designing the streaming table:

  1. Streaming Sources and Staging Tables: For each source, create separate streaming pipelines that write to individual staging tables. This ensures that each source's data is processed independently.
  2. Delta Table and Merge Operation: Utilize the Delta table's MERGE operation to update the target table. The MERGE operation allows you to specify the conditions under which updates and insertions are made to the target table.
  3. Use foreachBatch in Streaming Queries: Implement the streaming ingestion using Spark Structured Streaming with foreachBatch. This allows you to apply custom logic within each micro-batch, including the MERGE operation.
  4. Watermarking for Latency Tolerance: Set appropriate watermarks on your streaming queries to handle the processing of late data and ensure data consistency.

Here’s an example code snippet:

import io.delta.tables._

val targetTable = DeltaTable.forPath(spark, "path/to/target/table")

def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {

  targetTable.as("t")

    .merge(

      microBatchOutputDF.as("s"),

      "s.primaryKey = t.primaryKey"

    )

    .whenMatched().updateAll()

    .whenNotMatched().insertAll()

    .execute()

}

 

source1StreamingDF.writeStream

  .foreachBatch(upsertToDelta _)

  .outputMode("update")

  .start()

 

source2StreamingDF.writeStream

  .foreachBatch(upsertToDelta _)

  .outputMode("update")

  .start()

  1. Configuration: Ensure the streaming queries are correctly configured to handle processing at the desired intervals. For instance, using maxFilesPerTrigger or maxBytesPerTrigger to manage batch sizes effectively.
  2. Handling Schema Changes: Ensure that the schema of the Delta table is managed properly, considering possible schema evolution.

View solution in original post