cancel
Showing results for 
Search instead for 
Did you mean: 
Get Started Discussions
Start your journey with Databricks by joining discussions on getting started guides, tutorials, and introductory topics. Connect with beginners and experts alike to kickstart your Databricks experience.
cancel
Showing results for 
Search instead for 
Did you mean: 

Updating a Delta Table in Delta Live Tables (DLT) from Two Event Hubs

Radix95
New Contributor II

I am working with Databricks Delta Live Tables (DLT) and need to ingest data from two different Event Hubs. My goal is to:

  1. Ingest initial data from the first Event Hub (Predictor) and store it in a Delta Table (data_predictions).
  2. Later, update this table when related data arrives from the second Event Hub (Confirmer).
  3. Each message from the second Event Hub contains a uuid, and if it matches an existing uuid in data_predictions, the corresponding record should be updated with the new fields from the Confirmer message
  4. Ensure that data_predictions remains a Delta Table (not a Materialized View) and retains historical records even if the pipeline is restarted.

Challenges I'm facing:

  • DLT does not allow referring to the same table (data_predictions) multiple times within the same pipeline.
  • If I try to directly update data_predictions using apply_changes(), DLT breaks because it conflicts with the table creation.
  • If I perform a join between two streaming sources, Spark forces the output to be a Materialized View, which loses historical data upon pipeline restarts.
  • I need to process late-arriving updates: The Confirmer data may arrive days or weeks after the Predictor data.

What I need:

  • A DLT pipeline that allows ingesting and persisting data from the Predictor Event Hub.
  • A way to update the same data_predictions table when data from the Confirmer Event Hub arrives.
  • A robust solution that prevents data loss and ensures updates work even if the Confirmer data is significantly delayed.

My dlt code:

 

.table(name="staging_data_predictor", temporary=True)
def table_data_predictor():
    df_raw = (
        spark.readStream
        .format("kafka")
        .options(**KAFKA_OPTIONS_PREDICTOR)
        .load()
    )

    df_parsed = df_raw.selectExpr("CAST(value AS STRING) AS json_data") \
        .select(from_json(col("json_data"), schema).alias("data")) \
        .select(
            col("data.uuid"),
            to_json(col("data.predictions")).alias("predictions"),
            col("data.invoice_data.*"),
        )

    df_predictor = df_parsed \
        .withColumn("timestamp", current_timestamp()) \
        .withColumn("code1", lit(None).cast(StringType())) \
        .withColumn("code2", lit(None).cast(StringType())) \
        .withColumn("timestamp_update", lit(None).cast(TimestampType()))

    return df_predictor

# Vista con i dati aggiornati da Event Hub Confirmer
@dlt.table(name="staging_data_confirmer", temporary=True)
def table_data_confirmer():
    df_raw = (
        spark.readStream
        .format("kafka")
        .options(**KAFKA_OPTIONS_CONFIRMER)
        .load()
    )

    df_confirmer = df_raw.selectExpr("CAST(value AS STRING) AS json_data") \
        .select(from_json(col("json_data"), schema_update).alias("data")) \
        .select("data.*") \
        .withColumn("timestamp_update", current_timestamp()) 

    return df_confirmer

@dlt.table(name="data_predictions")
def table_data_predictions():
    df_predictor = dlt.read("staging_data_predictor")
    df_confirmer = dlt.read("staging_data_confirmer")

    #Update dei record esistenti con dati da confirmer
    df_updated_records = df_predictor.alias("pred") \
        .join(df_confirmer.alias("conf"), "uuid", "left") \
        .select(
            col("pred.uuid"),
            col("pred.predictions"),
            coalesce(col("conf.code1"), col("pred.code1")).alias("code1"),
            coalesce(col("conf.code2"), col("pred.code2")).alias("code2"),
            when(
                col("conf.code1").isNotNull() | col("conf.code2").isNotNull(),
                current_timestamp()
            ).otherwise(col("pred.timestamp_update")).alias("timestamp_update")
        )

    return df_updated_records

 

 

Right now, data_predictions is a Materialized View, meaning it only contains the latest data in the pipeline execution and loses historical records when the pipeline restarts.Updates from the Confirmer Event Hub only affect currently available records in the pipeline, but do not modify historical data that was ingested in previous executions.

What is the best approach to achieve this in DLT without creating multiple persistent tables while ensuring data_predictions remains a Delta Table?

 

Any suggestions or best practices would be greatly appreciated!

 

1 REPLY 1

mark_ott
Databricks Employee
Databricks Employee

To achieve robust, persistent CDC (Change Data Capture)–style updates in Databricks DLT with your scenario—while keeping data_predictions as a Delta Table (not a Materialized View)—you need to carefully avoid streaming joins and side effects across streaming tables in a single DLT pipeline, because of DLT’s constraints on table references and state handling.

Below is a best-practice architecture and pipeline pattern for your use case:


Key Solution Principles

  • Ingest Predictor and Confirmer streams each to their own staging Delta Tables (no materialized views or joins here).

  • Make data_predictions a Delta Table with historical CDC semantics (append-only with late-arriving data).

  • Perform updates via MERGE INTO (upserts) using a batch job or a separate DLT pipeline instead of streaming/SQL views, as true updates on existing Delta Tables from multiple sources in a single DLT pipeline are not well supported.

  • Retain historical data by enabling Delta Lake time travel and mergeSchema options, so even after pipeline restarts, your table’s history is intact and queryable.


Step-by-Step Pipeline Design

1. Stage Each Stream in Its Own Raw Table

python
@Dlt.table(name="raw_predictor") def raw_predictor(): df = ( spark.readStream .format("kafka") .options(**KAFKA_OPTIONS_PREDICTOR) .load() .selectExpr("CAST(value AS STRING) as json_data") .select(from_json(col("json_data"), schema).alias("data")) # flatten, select, etc... .withColumn("etl_ingest_ts", current_timestamp()) ) return df @Dlt.table(name="raw_confirmer") def raw_confirmer(): df = ( spark.readStream .format("kafka") .options(**KAFKA_OPTIONS_CONFIRMER) .load() .selectExpr("CAST(value AS STRING) as json_data") .select(from_json(col("json_data"), schema_update).alias("data")) # flatten, select, etc... .withColumn("confirmer_etl_ts", current_timestamp()) ) return df

These are persisted, append-only Delta tables—not materialized views.


2. Build/Persist the Gold Table (data_predictions) via a Batch Upsert

  • Create a separate batch pipeline or notebook that periodically runs a MERGE INTO operation to keep data_predictions current with the latest from both staging tables, using a uuid key.

Example batch upsert job:

python
# in a notebook or DLT batch pipeline cell # Load tables df_predictor = spark.read.table("raw_predictor") df_confirmer = spark.read.table("raw_confirmer") # Prepare the upsert source by joining the latest confirmer data from pyspark.sql import functions as F df_upsert = df_predictor.join( df_confirmer, on="uuid", how="left" ).select( "uuid", "predictions", F.coalesce(df_confirmer["code1"], df_predictor["code1"]).alias("code1"), F.coalesce(df_confirmer["code2"], df_predictor["code2"]).alias("code2"), df_predictor["etl_ingest_ts"], F.when(df_confirmer["code1"].isNotNull() | df_confirmer["code2"].isNotNull(), df_confirmer["confirmer_etl_ts"]).otherwise(df_predictor["timestamp_update"]).alias("timestamp_update") ) # Merge (upsert) into persistent Delta Table from delta.tables import DeltaTable deltaTable = DeltaTable.forPath(spark, "/mnt/path/to/data_predictions") ( deltaTable.alias("t") .merge( df_upsert.alias("s"), "t.uuid = s.uuid" ) .whenMatchedUpdateAll() .whenNotMatchedInsertAll() .execute() )
  • Schedule this merge as a Databricks Job every few minutes or hours (frequency depends on latency requirements).

  • Delta will retain full historical records. Use Delta Time Travel (VERSION AS OF, TIMESTAMP AS OF), schema evolution, and keep the data_predictions table as a true Delta Table, not a view. Pipelines restarts will not destroy historical data.


Additional Tips

  • Never use streaming joins or self-updating tables inside DLT for this CDC merge. DLT wants a strictly acyclic DAG for tables, and streaming joins typically result in views—not Delta Tables—and lose history.

  • Consider watermarking and data retention on the raw tables, if storage is a concern.

  • Late-arriving data is handled: Because all upserts/merges are re-run on new Confirmer data, historical Predictor rows are updated as soon as late Confirmer data arrives.

  • You can time travel historical states using Delta’s time travel features:

    sql
    SELECT * FROM data_predictions VERSION AS OF 10
  • If you want the data_predictions to always reflect only the most recent state per uuid, keep only the latest row per uuid during the upsert.


Illustrated Workflow

Source DLT Table Upsert Engine Gold Table (Delta Table)
Event Hub 1 raw_predictor (Delta) Batch Notebook/Job data_predictions (Delta Table)
Event Hub 2 raw_confirmer (Delta) (merge on uuid) (all history retained by Delta)
 
 

 

References

  • [Delta Live Tables Limitations & Best Practices]

  • [How to use CDC & Upserts with Delta Tables]


Summary

  • Ingest both streams to separate persistent Delta tables.

  • Run scheduled upsert jobs to consolidate into a single Delta Table (data_predictions) with history.

  • Avoid streaming joins and self-updates within DLT—use batch merge logic.

  • Leverage Delta Lake for historical queries and late data handling.

This approach meets your requirements and works reliably for CDC and late-arriving updates in DLT.