cancel
Showing results for 
Search instead for 
Did you mean: 
Get Started Discussions
Start your journey with Databricks by joining discussions on getting started guides, tutorials, and introductory topics. Connect with beginners and experts alike to kickstart your Databricks experience.
cancel
Showing results for 
Search instead for 
Did you mean: 

Updating a Delta Table in Delta Live Tables (DLT) from Two Event Hubs

Radix95
New Contributor II

I am working with Databricks Delta Live Tables (DLT) and need to ingest data from two different Event Hubs. My goal is to:

  1. Ingest initial data from the first Event Hub (Predictor) and store it in a Delta Table (data_predictions).
  2. Later, update this table when related data arrives from the second Event Hub (Confirmer).
  3. Each message from the second Event Hub contains a uuid, and if it matches an existing uuid in data_predictions, the corresponding record should be updated with the new fields from the Confirmer message
  4. Ensure that data_predictions remains a Delta Table (not a Materialized View) and retains historical records even if the pipeline is restarted.

Challenges I'm facing:

  • DLT does not allow referring to the same table (data_predictions) multiple times within the same pipeline.
  • If I try to directly update data_predictions using apply_changes(), DLT breaks because it conflicts with the table creation.
  • If I perform a join between two streaming sources, Spark forces the output to be a Materialized View, which loses historical data upon pipeline restarts.
  • I need to process late-arriving updates: The Confirmer data may arrive days or weeks after the Predictor data.

What I need:

  • A DLT pipeline that allows ingesting and persisting data from the Predictor Event Hub.
  • A way to update the same data_predictions table when data from the Confirmer Event Hub arrives.
  • A robust solution that prevents data loss and ensures updates work even if the Confirmer data is significantly delayed.

My dlt code:

 

.table(name="staging_data_predictor", temporary=True)
def table_data_predictor():
    df_raw = (
        spark.readStream
        .format("kafka")
        .options(**KAFKA_OPTIONS_PREDICTOR)
        .load()
    )

    df_parsed = df_raw.selectExpr("CAST(value AS STRING) AS json_data") \
        .select(from_json(col("json_data"), schema).alias("data")) \
        .select(
            col("data.uuid"),
            to_json(col("data.predictions")).alias("predictions"),
            col("data.invoice_data.*"),
        )

    df_predictor = df_parsed \
        .withColumn("timestamp", current_timestamp()) \
        .withColumn("code1", lit(None).cast(StringType())) \
        .withColumn("code2", lit(None).cast(StringType())) \
        .withColumn("timestamp_update", lit(None).cast(TimestampType()))

    return df_predictor

# Vista con i dati aggiornati da Event Hub Confirmer
@dlt.table(name="staging_data_confirmer", temporary=True)
def table_data_confirmer():
    df_raw = (
        spark.readStream
        .format("kafka")
        .options(**KAFKA_OPTIONS_CONFIRMER)
        .load()
    )

    df_confirmer = df_raw.selectExpr("CAST(value AS STRING) AS json_data") \
        .select(from_json(col("json_data"), schema_update).alias("data")) \
        .select("data.*") \
        .withColumn("timestamp_update", current_timestamp()) 

    return df_confirmer

@dlt.table(name="data_predictions")
def table_data_predictions():
    df_predictor = dlt.read("staging_data_predictor")
    df_confirmer = dlt.read("staging_data_confirmer")

    #Update dei record esistenti con dati da confirmer
    df_updated_records = df_predictor.alias("pred") \
        .join(df_confirmer.alias("conf"), "uuid", "left") \
        .select(
            col("pred.uuid"),
            col("pred.predictions"),
            coalesce(col("conf.code1"), col("pred.code1")).alias("code1"),
            coalesce(col("conf.code2"), col("pred.code2")).alias("code2"),
            when(
                col("conf.code1").isNotNull() | col("conf.code2").isNotNull(),
                current_timestamp()
            ).otherwise(col("pred.timestamp_update")).alias("timestamp_update")
        )

    return df_updated_records

 

 

Right now, data_predictions is a Materialized View, meaning it only contains the latest data in the pipeline execution and loses historical records when the pipeline restarts.Updates from the Confirmer Event Hub only affect currently available records in the pipeline, but do not modify historical data that was ingested in previous executions.

What is the best approach to achieve this in DLT without creating multiple persistent tables while ensuring data_predictions remains a Delta Table?

 

Any suggestions or best practices would be greatly appreciated!

 

0 REPLIES 0

Join Us as a Local Community Builder!

Passionate about hosting events and connecting people? Help us grow a vibrant local community—sign up today to get started!

Sign Up Now