DLT Streaming With Watermark fails, suggesting I should add watermarks
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
yesterday
Hi all,
I have the following Problem: I have two streaming tables containing time-series measurements from different sensor data, each feed by multiple sensors. (Imagine: Multiple Temperature Sensors for the first table, and multiple humidity sensors for the second table). Each sensor has an sensor_id. In addition, I have an table, which sensors should be merged together. I want to use DLT streaming tables for that. However, I get the following error:
com.databricks.pipelines.common.errors.DLTAnalysisException: Failed to start stream temp_and_humidity in either append mode or complete mode.
Append mode error: [STREAMING_OUTPUT_MODE.UNSUPPORTED_OPERATION] Invalid streaming output mode: append. This output mode is not supported for streaming aggregations without watermark on streaming DataFrames/DataSets. SQLSTATE: 42KDE
Complete mode error: Join between two streaming DataFrames/Datasets is not supported in Complete output mode, only in Append output mode;
Problem is: I added Watermarks, following this guide: https://kb.databricks.com/en_US/streaming/append-output-not-supported-no-watermark so I think the Append mode error should not happen.
My code looks roughly like this (I omitted some details here):
@Dlt.table
def temp_and_humidity():
temp = spark.readStream.table("LIVE.temp")
humidity = spark.readStream.table("LIVE.humidity")
associations = spark.read.table("LIVE.senosr_associations") # note, that this is a VIEW
# Basically, we want to do an AS OF JOIN here
# The algorithm works like this:
#. 1. do a UNION BY NAME between the two data frames
#. 2. GROUP BY a small window <- here we need a watermark as far as I understand
# Making sure, that the schemas are compatible
temp = temp.withColumn("humidity", F.lit(None))
humidity = humidity.withColumn("temp", F.lit(None)).withColumn("sensor_group_id", "sensor_id")
# I think, this join is not allowed in complete mode (which I do not want anyway!)
temp = temp.join(associations, ...).drop(...) # Some column name manipulations, adding the "sensor_group_id" column
# Merging the data
temp_and_humidity = temp.unionByName(humidity)
# Finally, we can aggregate, so that measurements "from the same time" are in one row
# We have prior knowledge, that each sensor sends data once per minute
returm (
temp_and_humidity
.withColumn("time_window", F.window("timestamp", "1 minute"))
.withWatermark("time_window", "1 minute"). # <- we add the watermark!
.groupBy("time_window", "sensor_group_id")
.agg(
F.first("humidity", ignorenulls=True),
F.first("temp", ignorenulls=True)
)
)
I have tried different variants (like setting the Watermark on the "timestamp" column instead of the "time_window"), with no success. Can anyone help me out? Some suggestions, what am I missing here?
Greetings, Daniel
- Labels:
-
Spark

