cancel
Showing results for 
Search instead for 
Did you mean: 
Community Platform Discussions
Connect with fellow community members to discuss general topics related to the Databricks platform, industry trends, and best practices. Share experiences, ask questions, and foster collaboration within the community.
cancel
Showing results for 
Search instead for 
Did you mean: 

Updating a Delta Table in Delta Live Tables (DLT) from Two Event Hubs

Radix95
New Contributor II

I am working with Databricks Delta Live Tables (DLT) and need to ingest data from two different Event Hubs. My goal is to:

  1. Ingest initial data from the first Event Hub (Predictor) and store it in a Delta Table (data_predictions).
  2. Later, update this table when related data arrives from the second Event Hub (Confirmer).
  3. Each message from the second Event Hub contains a uuid, and if it matches an existing uuid in data_predictions, the corresponding record should be updated with the new fields from the Confirmer message
  4. Ensure that data_predictions remains a Delta Table (not a Materialized View) and retains historical records even if the pipeline is restarted.

Challenges I'm facing:

  • DLT does not allow referring to the same table (data_predictions) multiple times within the same pipeline.
  • If I try to directly update data_predictions using apply_changes(), DLT breaks because it conflicts with the table creation.
  • If I perform a join between two streaming sources, Spark forces the output to be a Materialized View, which loses historical data upon pipeline restarts.
  • I need to process late-arriving updates: The Confirmer data may arrive days or weeks after the Predictor data.

What I need:

  • A DLT pipeline that allows ingesting and persisting data from the Predictor Event Hub.
  • A way to update the same data_predictions table when data from the Confirmer Event Hub arrives.
  • A robust solution that prevents data loss and ensures updates work even if the Confirmer data is significantly delayed.

My dlt code:

 

.table(name="staging_data_predictor", temporary=True)
def table_data_predictor():
    df_raw = (
        spark.readStream
        .format("kafka")
        .options(**KAFKA_OPTIONS_PREDICTOR)
        .load()
    )

    df_parsed = df_raw.selectExpr("CAST(value AS STRING) AS json_data") \
        .select(from_json(col("json_data"), schema).alias("data")) \
        .select(
            col("data.uuid"),
            to_json(col("data.predictions")).alias("predictions"),
            col("data.invoice_data.*"),
        )

    df_predictor = df_parsed \
        .withColumn("timestamp", current_timestamp()) \
        .withColumn("code1", lit(None).cast(StringType())) \
        .withColumn("code2", lit(None).cast(StringType())) \
        .withColumn("timestamp_update", lit(None).cast(TimestampType()))

    return df_predictor

# Vista con i dati aggiornati da Event Hub Confirmer
@dlt.table(name="staging_data_confirmer", temporary=True)
def table_data_confirmer():
    df_raw = (
        spark.readStream
        .format("kafka")
        .options(**KAFKA_OPTIONS_CONFIRMER)
        .load()
    )

    df_confirmer = df_raw.selectExpr("CAST(value AS STRING) AS json_data") \
        .select(from_json(col("json_data"), schema_update).alias("data")) \
        .select("data.*") \
        .withColumn("timestamp_update", current_timestamp()) 

    return df_confirmer

@dlt.table(name="data_predictions")
def table_data_predictions():
    df_predictor = dlt.read("staging_data_predictor")
    df_confirmer = dlt.read("staging_data_confirmer")

    #Update dei record esistenti con dati da confirmer
    df_updated_records = df_predictor.alias("pred") \
        .join(df_confirmer.alias("conf"), "uuid", "left") \
        .select(
            col("pred.uuid"),
            col("pred.predictions"),
            coalesce(col("conf.code1"), col("pred.code1")).alias("code1"),
            coalesce(col("conf.code2"), col("pred.code2")).alias("code2"),
            when(
                col("conf.code1").isNotNull() | col("conf.code2").isNotNull(),
                current_timestamp()
            ).otherwise(col("pred.timestamp_update")).alias("timestamp_update")
        )

    return df_updated_records

 

 

Right now, data_predictions is a Materialized View, meaning it only contains the latest data in the pipeline execution and loses historical records when the pipeline restarts.Updates from the Confirmer Event Hub only affect currently available records in the pipeline, but do not modify historical data that was ingested in previous executions.

What is the best approach to achieve this in DLT without creating multiple persistent tables while ensuring data_predictions remains a Delta Table?

 

Any suggestions or best practices would be greatly appreciated!

 

0 REPLIES 0

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group