03-19-2025 10:30 AM
Hi all,
I have the following Problem: I have two streaming tables containing time-series measurements from different sensor data, each feed by multiple sensors. (Imagine: Multiple Temperature Sensors for the first table, and multiple humidity sensors for the second table). Each sensor has an sensor_id. In addition, I have an table, which sensors should be merged together. I want to use DLT streaming tables for that. However, I get the following error:
com.databricks.pipelines.common.errors.DLTAnalysisException: Failed to start stream temp_and_humidity in either append mode or complete mode.
Append mode error: [STREAMING_OUTPUT_MODE.UNSUPPORTED_OPERATION] Invalid streaming output mode: append. This output mode is not supported for streaming aggregations without watermark on streaming DataFrames/DataSets. SQLSTATE: 42KDE
Complete mode error: Join between two streaming DataFrames/Datasets is not supported in Complete output mode, only in Append output mode;
Problem is: I added Watermarks, following this guide: https://kb.databricks.com/en_US/streaming/append-output-not-supported-no-watermark so I think the Append mode error should not happen.
My code looks roughly like this (I omitted some details here):
@Dlt.table
def temp_and_humidity():
temp = spark.readStream.table("LIVE.temp")
humidity = spark.readStream.table("LIVE.humidity")
associations = spark.read.table("LIVE.senosr_associations") # note, that this is a VIEW
# Basically, we want to do an AS OF JOIN here
# The algorithm works like this:
#. 1. do a UNION BY NAME between the two data frames
#. 2. GROUP BY a small window <- here we need a watermark as far as I understand
# Making sure, that the schemas are compatible
temp = temp.withColumn("humidity", F.lit(None))
humidity = humidity.withColumn("temp", F.lit(None)).withColumn("sensor_group_id", "sensor_id")
# I think, this join is not allowed in complete mode (which I do not want anyway!)
temp = temp.join(associations, ...).drop(...) # Some column name manipulations, adding the "sensor_group_id" column
# Merging the data
temp_and_humidity = temp.unionByName(humidity)
# Finally, we can aggregate, so that measurements "from the same time" are in one row
# We have prior knowledge, that each sensor sends data once per minute
returm (
temp_and_humidity
.withColumn("time_window", F.window("timestamp", "1 minute"))
.withWatermark("time_window", "1 minute"). # <- we add the watermark!
.groupBy("time_window", "sensor_group_id")
.agg(
F.first("humidity", ignorenulls=True),
F.first("temp", ignorenulls=True)
)
)I have tried different variants (like setting the Watermark on the "timestamp" column instead of the "time_window"), with no success. Can anyone help me out? Some suggestions, what am I missing here?
Greetings, Daniel
yesterday
To resolve the DLT streaming aggregation error about unsupported output modes and watermarks in Databricks, you need to carefully set watermarks on the original event timestamp rather than on computed columns like "time_window" and carefully consider join semantics between streaming and static datasets. Here is a breakdown of key points and specific steps to correct your code:
Watermark must be set on a column that directly represents event time, usually "timestamp". Setting it on a derived column like "time_window" is not supported for streaming aggregation or output mode resolution.
Streaming joins: You can only join a streaming DataFrame with a static one in append mode. Joining two streaming DataFrames with aggregation usually requires watermarks on both sides and only on event columns. Complete output mode is not possible for joins, only append.
View vs Table: Even if associations is a VIEW, as long as its source is static (not itself a stream), it's fine for joining.
Here's an approach that tracks Databricks recommendations:
import pyspark.sql.functions as F
from pyspark.sql import DataFrame
@Dlt.table
def temp_and_humidity():
# Load streams on event tables
temp = spark.readStream.table("LIVE.temp")
humidity = spark.readStream.table("LIVE.humidity")
associations = spark.read.table("LIVE.sensor_associations") # static view/table
# Ensure schemas are compatible before union
temp = temp.withColumn("humidity", F.lit(None))
humidity = humidity.withColumn("temp", F.lit(None)).withColumnRenamed("sensor_id", "sensor_group_id")
# Add watermark ON THE EVENT TIME COLUMN before any aggregation
temp = temp.withWatermark("timestamp", "2 minutes") # use a reasonable delay
humidity = humidity.withWatermark("timestamp", "2 minutes")
# Join temp stream with associations (static)
temp = temp.join(associations, temp["sensor_id"] == associations["temp_sensor_id"], "left")
# Union the streams
temp_and_humidity = temp.unionByName(humidity)
# Group by time window and sensor group
# Window is derived, but watermark is set on 'timestamp'
return (
temp_and_humidity
.withColumn("time_window", F.window("timestamp", "1 minute"))
.groupBy("time_window", "sensor_group_id")
.agg(
F.first("humidity", ignorenulls=True),
F.first("temp", ignorenulls=True)
)
)
Watermark must be set on "timestamp" before aggregation, not on “time_window”.
Join static with stream only: When joining, ensure at least one side is static (your associations is OK).
Avoid joining two streams before aggregation: If you need to join the results of stream aggregations, consider changing your logic or introducing a static reference.
Inspect "timestamp" columns for correct type (should be TimestampType).
Ensure watermarks are set before aggregation (groupBy).
Only join streams with static tables in append mode.
For more details and troubleshooting, Databricks provides an official guide on watermarks and aggregation in streaming. Double-check their advice and make sure you are not computing watermarks on windows.
With these changes, your pipeline should conform to Databricks' streaming requirements and the exceptions about unsupported output mode should be resolved.
yesterday
To resolve the DLT streaming aggregation error about unsupported output modes and watermarks in Databricks, you need to carefully set watermarks on the original event timestamp rather than on computed columns like "time_window" and carefully consider join semantics between streaming and static datasets. Here is a breakdown of key points and specific steps to correct your code:
Watermark must be set on a column that directly represents event time, usually "timestamp". Setting it on a derived column like "time_window" is not supported for streaming aggregation or output mode resolution.
Streaming joins: You can only join a streaming DataFrame with a static one in append mode. Joining two streaming DataFrames with aggregation usually requires watermarks on both sides and only on event columns. Complete output mode is not possible for joins, only append.
View vs Table: Even if associations is a VIEW, as long as its source is static (not itself a stream), it's fine for joining.
Here's an approach that tracks Databricks recommendations:
import pyspark.sql.functions as F
from pyspark.sql import DataFrame
@Dlt.table
def temp_and_humidity():
# Load streams on event tables
temp = spark.readStream.table("LIVE.temp")
humidity = spark.readStream.table("LIVE.humidity")
associations = spark.read.table("LIVE.sensor_associations") # static view/table
# Ensure schemas are compatible before union
temp = temp.withColumn("humidity", F.lit(None))
humidity = humidity.withColumn("temp", F.lit(None)).withColumnRenamed("sensor_id", "sensor_group_id")
# Add watermark ON THE EVENT TIME COLUMN before any aggregation
temp = temp.withWatermark("timestamp", "2 minutes") # use a reasonable delay
humidity = humidity.withWatermark("timestamp", "2 minutes")
# Join temp stream with associations (static)
temp = temp.join(associations, temp["sensor_id"] == associations["temp_sensor_id"], "left")
# Union the streams
temp_and_humidity = temp.unionByName(humidity)
# Group by time window and sensor group
# Window is derived, but watermark is set on 'timestamp'
return (
temp_and_humidity
.withColumn("time_window", F.window("timestamp", "1 minute"))
.groupBy("time_window", "sensor_group_id")
.agg(
F.first("humidity", ignorenulls=True),
F.first("temp", ignorenulls=True)
)
)
Watermark must be set on "timestamp" before aggregation, not on “time_window”.
Join static with stream only: When joining, ensure at least one side is static (your associations is OK).
Avoid joining two streams before aggregation: If you need to join the results of stream aggregations, consider changing your logic or introducing a static reference.
Inspect "timestamp" columns for correct type (should be TimestampType).
Ensure watermarks are set before aggregation (groupBy).
Only join streams with static tables in append mode.
For more details and troubleshooting, Databricks provides an official guide on watermarks and aggregation in streaming. Double-check their advice and make sure you are not computing watermarks on windows.
With these changes, your pipeline should conform to Databricks' streaming requirements and the exceptions about unsupported output mode should be resolved.
yesterday
Hi mark_ott,
thanks for the reply (even after 6 month, this is appreciated). Since I made this post, I found a different solution and was able to move on (should have put it here, but I do not remember, how I solved the problem).
I am pretty sure, that I tried your suggestions back than:
I guess, the point is to put the watermarks on the two source table, before applying the UNION.
Anyway, thanks for the reply. It is certainly appreciated.
Passionate about hosting events and connecting people? Help us grow a vibrant local community—sign up today to get started!
Sign Up Now