02-18-2025 09:09 AM - edited 02-18-2025 09:12 AM
I am working with Databricks Delta Live Tables (DLT) and need to ingest data from two different Event Hubs. My goal is to:
Challenges I'm facing:
What I need:
My dlt code:
.table(name="staging_data_predictor", temporary=True)
def table_data_predictor():
df_raw = (
spark.readStream
.format("kafka")
.options(**KAFKA_OPTIONS_PREDICTOR)
.load()
)
df_parsed = df_raw.selectExpr("CAST(value AS STRING) AS json_data") \
.select(from_json(col("json_data"), schema).alias("data")) \
.select(
col("data.uuid"),
to_json(col("data.predictions")).alias("predictions"),
col("data.invoice_data.*"),
)
df_predictor = df_parsed \
.withColumn("timestamp", current_timestamp()) \
.withColumn("code1", lit(None).cast(StringType())) \
.withColumn("code2", lit(None).cast(StringType())) \
.withColumn("timestamp_update", lit(None).cast(TimestampType()))
return df_predictor
# Vista con i dati aggiornati da Event Hub Confirmer
@dlt.table(name="staging_data_confirmer", temporary=True)
def table_data_confirmer():
df_raw = (
spark.readStream
.format("kafka")
.options(**KAFKA_OPTIONS_CONFIRMER)
.load()
)
df_confirmer = df_raw.selectExpr("CAST(value AS STRING) AS json_data") \
.select(from_json(col("json_data"), schema_update).alias("data")) \
.select("data.*") \
.withColumn("timestamp_update", current_timestamp())
return df_confirmer
@dlt.table(name="data_predictions")
def table_data_predictions():
df_predictor = dlt.read("staging_data_predictor")
df_confirmer = dlt.read("staging_data_confirmer")
#Update dei record esistenti con dati da confirmer
df_updated_records = df_predictor.alias("pred") \
.join(df_confirmer.alias("conf"), "uuid", "left") \
.select(
col("pred.uuid"),
col("pred.predictions"),
coalesce(col("conf.code1"), col("pred.code1")).alias("code1"),
coalesce(col("conf.code2"), col("pred.code2")).alias("code2"),
when(
col("conf.code1").isNotNull() | col("conf.code2").isNotNull(),
current_timestamp()
).otherwise(col("pred.timestamp_update")).alias("timestamp_update")
)
return df_updated_records
Right now, data_predictions is a Materialized View, meaning it only contains the latest data in the pipeline execution and loses historical records when the pipeline restarts.Updates from the Confirmer Event Hub only affect currently available records in the pipeline, but do not modify historical data that was ingested in previous executions.
What is the best approach to achieve this in DLT without creating multiple persistent tables while ensuring data_predictions remains a Delta Table?
Any suggestions or best practices would be greatly appreciated!
Sunday
To achieve robust, persistent CDC (Change Data Capture)–style updates in Databricks DLT with your scenario—while keeping data_predictions as a Delta Table (not a Materialized View)—you need to carefully avoid streaming joins and side effects across streaming tables in a single DLT pipeline, because of DLT’s constraints on table references and state handling.
Below is a best-practice architecture and pipeline pattern for your use case:
Ingest Predictor and Confirmer streams each to their own staging Delta Tables (no materialized views or joins here).
Make data_predictions a Delta Table with historical CDC semantics (append-only with late-arriving data).
Perform updates via MERGE INTO (upserts) using a batch job or a separate DLT pipeline instead of streaming/SQL views, as true updates on existing Delta Tables from multiple sources in a single DLT pipeline are not well supported.
Retain historical data by enabling Delta Lake time travel and mergeSchema options, so even after pipeline restarts, your table’s history is intact and queryable.
@Dlt.table(name="raw_predictor")
def raw_predictor():
df = (
spark.readStream
.format("kafka")
.options(**KAFKA_OPTIONS_PREDICTOR)
.load()
.selectExpr("CAST(value AS STRING) as json_data")
.select(from_json(col("json_data"), schema).alias("data"))
# flatten, select, etc...
.withColumn("etl_ingest_ts", current_timestamp())
)
return df
@Dlt.table(name="raw_confirmer")
def raw_confirmer():
df = (
spark.readStream
.format("kafka")
.options(**KAFKA_OPTIONS_CONFIRMER)
.load()
.selectExpr("CAST(value AS STRING) as json_data")
.select(from_json(col("json_data"), schema_update).alias("data"))
# flatten, select, etc...
.withColumn("confirmer_etl_ts", current_timestamp())
)
return df
These are persisted, append-only Delta tables—not materialized views.
data_predictions) via a Batch UpsertCreate a separate batch pipeline or notebook that periodically runs a MERGE INTO operation to keep data_predictions current with the latest from both staging tables, using a uuid key.
Example batch upsert job:
# in a notebook or DLT batch pipeline cell
# Load tables
df_predictor = spark.read.table("raw_predictor")
df_confirmer = spark.read.table("raw_confirmer")
# Prepare the upsert source by joining the latest confirmer data
from pyspark.sql import functions as F
df_upsert = df_predictor.join(
df_confirmer,
on="uuid",
how="left"
).select(
"uuid",
"predictions",
F.coalesce(df_confirmer["code1"], df_predictor["code1"]).alias("code1"),
F.coalesce(df_confirmer["code2"], df_predictor["code2"]).alias("code2"),
df_predictor["etl_ingest_ts"],
F.when(df_confirmer["code1"].isNotNull() | df_confirmer["code2"].isNotNull(), df_confirmer["confirmer_etl_ts"]).otherwise(df_predictor["timestamp_update"]).alias("timestamp_update")
)
# Merge (upsert) into persistent Delta Table
from delta.tables import DeltaTable
deltaTable = DeltaTable.forPath(spark, "/mnt/path/to/data_predictions")
(
deltaTable.alias("t")
.merge(
df_upsert.alias("s"),
"t.uuid = s.uuid"
)
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute()
)
Schedule this merge as a Databricks Job every few minutes or hours (frequency depends on latency requirements).
Delta will retain full historical records. Use Delta Time Travel (VERSION AS OF, TIMESTAMP AS OF), schema evolution, and keep the data_predictions table as a true Delta Table, not a view. Pipelines restarts will not destroy historical data.
Never use streaming joins or self-updating tables inside DLT for this CDC merge. DLT wants a strictly acyclic DAG for tables, and streaming joins typically result in views—not Delta Tables—and lose history.
Consider watermarking and data retention on the raw tables, if storage is a concern.
Late-arriving data is handled: Because all upserts/merges are re-run on new Confirmer data, historical Predictor rows are updated as soon as late Confirmer data arrives.
You can time travel historical states using Delta’s time travel features:
SELECT * FROM data_predictions VERSION AS OF 10
If you want the data_predictions to always reflect only the most recent state per uuid, keep only the latest row per uuid during the upsert.
| Source | DLT Table | Upsert Engine | Gold Table (Delta Table) |
|---|---|---|---|
| Event Hub 1 | raw_predictor (Delta) | Batch Notebook/Job | data_predictions (Delta Table) |
| Event Hub 2 | raw_confirmer (Delta) | (merge on uuid) | (all history retained by Delta) |
[Delta Live Tables Limitations & Best Practices]
[How to use CDC & Upserts with Delta Tables]
Ingest both streams to separate persistent Delta tables.
Run scheduled upsert jobs to consolidate into a single Delta Table (data_predictions) with history.
Avoid streaming joins and self-updates within DLT—use batch merge logic.
Leverage Delta Lake for historical queries and late data handling.
This approach meets your requirements and works reliably for CDC and late-arriving updates in DLT.
Passionate about hosting events and connecting people? Help us grow a vibrant local community—sign up today to get started!
Sign Up Now