I'm working on a Delta Live Tables (DLT) pipeline in Databricks Serverless mode.
I receive a stream of data from Event Hubs, where each incoming record contains a unique identifier (uuid) along with some attributes (code1, code2).
My goal is to update an existing table (data_event) using these records: if a record with the same uuid already exists (matching between uuid log and table record uuid), I want to update its values (code1, code2, and fill the column timestamp_update with the current time of the table update).
schema_update = StructType([
StructField("uuid", StringType(), False),
StructField("code1", StringType(), True),
StructField("code2", StringType(), True)
])
@dlt.view
def raw_data_view():
df_raw = (
spark.readStream
.format("kafka")
.options(**KAFKA_OPTIONS)
.load()
)
df_parsed = (
df_raw.selectExpr("CAST(value AS STRING) AS json_data")
.select(from_json(col("json_data"), schema_update).alias("data"))
.select("data.*")
.withColumn("timestamp_update", current_timestamp())
)
return df_parsed
dlt.apply_changes(
target="data_event",
source="raw_data_view",
keys=["uuid"],
sequence_by="timestamp_update",
stored_as_scd_type=1
)
I get this error:
com.databricks.pipelines.common.errors.DLTSparkException: [STREAMING_TARGET_NOT_DEFINED] Cannot found target table `test`.`testtables`.`data_event` for the APPLY CHANGES command. Target table `test`.`testtables`.`data_event` is not defined in the pipeline.