cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

DLT Streaming With Watermark fails, suggesting I should add watermarks

DaPo
New Contributor III

Hi all,

I have the following Problem: I have two streaming tables containing time-series measurements from different sensor data, each feed by multiple sensors. (Imagine: Multiple Temperature Sensors for the first table, and multiple humidity sensors for the second table). Each sensor has an sensor_id. In addition, I have an table, which sensors should be merged together. I want to use DLT streaming tables for that. However, I get the following error:

com.databricks.pipelines.common.errors.DLTAnalysisException: Failed to start stream temp_and_humidity in either append mode or complete mode.
Append mode error: [STREAMING_OUTPUT_MODE.UNSUPPORTED_OPERATION] Invalid streaming output mode: append. This output mode is not supported for streaming aggregations without watermark on streaming DataFrames/DataSets. SQLSTATE: 42KDE
Complete mode error: Join between two streaming DataFrames/Datasets is not supported in Complete output mode, only in Append output mode;

 

 Problem is: I added Watermarks, following this guide: https://kb.databricks.com/en_US/streaming/append-output-not-supported-no-watermark so I think the Append mode error should not happen.

My code looks roughly like this (I omitted some details here):

@Dlt.table
def temp_and_humidity():
    temp = spark.readStream.table("LIVE.temp")
    humidity = spark.readStream.table("LIVE.humidity")
    associations = spark.read.table("LIVE.senosr_associations") # note, that this is a VIEW
    # Basically, we want to do an AS OF JOIN here
    # The algorithm works like this:
    #.  1. do a UNION BY NAME between the two data frames
    #.  2. GROUP BY a small window <- here we need a watermark as far as I understand
    # Making sure, that the schemas are compatible
    temp = temp.withColumn("humidity", F.lit(None))
    humidity = humidity.withColumn("temp", F.lit(None)).withColumn("sensor_group_id", "sensor_id")

    # I think, this join is not allowed in complete mode (which I do not want anyway!)
    temp = temp.join(associations, ...).drop(...) # Some column name manipulations, adding the "sensor_group_id" column
 
    # Merging the data
    temp_and_humidity = temp.unionByName(humidity)

    # Finally, we can aggregate, so that measurements "from the same time" are in one row
    # We have prior knowledge, that each sensor sends data once per minute
    returm (
        temp_and_humidity
            .withColumn("time_window", F.window("timestamp", "1 minute"))
            .withWatermark("time_window", "1 minute").    # <- we add the watermark!
            .groupBy("time_window", "sensor_group_id")
            .agg(
                F.first("humidity", ignorenulls=True), 
                F.first("temp", ignorenulls=True)
            )
    )

I have tried different variants (like setting the Watermark on the "timestamp" column instead of the "time_window"), with no success. Can anyone help me out? Some suggestions, what am I missing here?

Greetings, Daniel

1 ACCEPTED SOLUTION

Accepted Solutions

mark_ott
Databricks Employee
Databricks Employee

To resolve the DLT streaming aggregation error about unsupported output modes and watermarks in Databricks, you need to carefully set watermarks on the original event timestamp rather than on computed columns like "time_window" and carefully consider join semantics between streaming and static datasets. Here is a breakdown of key points and specific steps to correct your code:

Key Concepts

  • Watermark must be set on a column that directly represents event time, usually "timestamp". Setting it on a derived column like "time_window" is not supported for streaming aggregation or output mode resolution.

  • Streaming joins: You can only join a streaming DataFrame with a static one in append mode. Joining two streaming DataFrames with aggregation usually requires watermarks on both sides and only on event columns. Complete output mode is not possible for joins, only append.

  • View vs Table: Even if associations is a VIEW, as long as its source is static (not itself a stream), it's fine for joining.

Corrected Code Steps

Here's an approach that tracks Databricks recommendations:

python
import pyspark.sql.functions as F from pyspark.sql import DataFrame @Dlt.table def temp_and_humidity(): # Load streams on event tables temp = spark.readStream.table("LIVE.temp") humidity = spark.readStream.table("LIVE.humidity") associations = spark.read.table("LIVE.sensor_associations") # static view/table # Ensure schemas are compatible before union temp = temp.withColumn("humidity", F.lit(None)) humidity = humidity.withColumn("temp", F.lit(None)).withColumnRenamed("sensor_id", "sensor_group_id") # Add watermark ON THE EVENT TIME COLUMN before any aggregation temp = temp.withWatermark("timestamp", "2 minutes") # use a reasonable delay humidity = humidity.withWatermark("timestamp", "2 minutes") # Join temp stream with associations (static) temp = temp.join(associations, temp["sensor_id"] == associations["temp_sensor_id"], "left") # Union the streams temp_and_humidity = temp.unionByName(humidity) # Group by time window and sensor group # Window is derived, but watermark is set on 'timestamp' return ( temp_and_humidity .withColumn("time_window", F.window("timestamp", "1 minute")) .groupBy("time_window", "sensor_group_id") .agg( F.first("humidity", ignorenulls=True), F.first("temp", ignorenulls=True) ) )

Critical Adjustments

  • Watermark must be set on "timestamp" before aggregation, not on “time_window”.

  • Join static with stream only: When joining, ensure at least one side is static (your associations is OK).

  • Avoid joining two streams before aggregation: If you need to join the results of stream aggregations, consider changing your logic or introducing a static reference.

Troubleshooting Checklist

  • Inspect "timestamp" columns for correct type (should be TimestampType).

  • Ensure watermarks are set before aggregation (groupBy).

  • Only join streams with static tables in append mode.

Reference

For more details and troubleshooting, Databricks provides an official guide on watermarks and aggregation in streaming. Double-check their advice and make sure you are not computing watermarks on windows.


With these changes, your pipeline should conform to Databricks' streaming requirements and the exceptions about unsupported output mode should be resolved.

View solution in original post

2 REPLIES 2

mark_ott
Databricks Employee
Databricks Employee

To resolve the DLT streaming aggregation error about unsupported output modes and watermarks in Databricks, you need to carefully set watermarks on the original event timestamp rather than on computed columns like "time_window" and carefully consider join semantics between streaming and static datasets. Here is a breakdown of key points and specific steps to correct your code:

Key Concepts

  • Watermark must be set on a column that directly represents event time, usually "timestamp". Setting it on a derived column like "time_window" is not supported for streaming aggregation or output mode resolution.

  • Streaming joins: You can only join a streaming DataFrame with a static one in append mode. Joining two streaming DataFrames with aggregation usually requires watermarks on both sides and only on event columns. Complete output mode is not possible for joins, only append.

  • View vs Table: Even if associations is a VIEW, as long as its source is static (not itself a stream), it's fine for joining.

Corrected Code Steps

Here's an approach that tracks Databricks recommendations:

python
import pyspark.sql.functions as F from pyspark.sql import DataFrame @Dlt.table def temp_and_humidity(): # Load streams on event tables temp = spark.readStream.table("LIVE.temp") humidity = spark.readStream.table("LIVE.humidity") associations = spark.read.table("LIVE.sensor_associations") # static view/table # Ensure schemas are compatible before union temp = temp.withColumn("humidity", F.lit(None)) humidity = humidity.withColumn("temp", F.lit(None)).withColumnRenamed("sensor_id", "sensor_group_id") # Add watermark ON THE EVENT TIME COLUMN before any aggregation temp = temp.withWatermark("timestamp", "2 minutes") # use a reasonable delay humidity = humidity.withWatermark("timestamp", "2 minutes") # Join temp stream with associations (static) temp = temp.join(associations, temp["sensor_id"] == associations["temp_sensor_id"], "left") # Union the streams temp_and_humidity = temp.unionByName(humidity) # Group by time window and sensor group # Window is derived, but watermark is set on 'timestamp' return ( temp_and_humidity .withColumn("time_window", F.window("timestamp", "1 minute")) .groupBy("time_window", "sensor_group_id") .agg( F.first("humidity", ignorenulls=True), F.first("temp", ignorenulls=True) ) )

Critical Adjustments

  • Watermark must be set on "timestamp" before aggregation, not on “time_window”.

  • Join static with stream only: When joining, ensure at least one side is static (your associations is OK).

  • Avoid joining two streams before aggregation: If you need to join the results of stream aggregations, consider changing your logic or introducing a static reference.

Troubleshooting Checklist

  • Inspect "timestamp" columns for correct type (should be TimestampType).

  • Ensure watermarks are set before aggregation (groupBy).

  • Only join streams with static tables in append mode.

Reference

For more details and troubleshooting, Databricks provides an official guide on watermarks and aggregation in streaming. Double-check their advice and make sure you are not computing watermarks on windows.


With these changes, your pipeline should conform to Databricks' streaming requirements and the exceptions about unsupported output mode should be resolved.

DaPo
New Contributor III

Hi mark_ott,
thanks for the reply (even after 6 month, this is appreciated). Since I made this post, I found a different solution and was able to move on (should have put it here, but I do not remember, how I solved the problem).

I am pretty sure, that I tried your suggestions back than:

  1.  Originally, the watermark was on the original timestamp column. Only after trying different options, I put it on the window column, as a last resort.
  2. The associations table was certainly static, and should be fine for a streaming table.

I guess, the point is to put the watermarks on the two source table, before applying the UNION.

Anyway, thanks for the reply. It is certainly appreciated.

Join Us as a Local Community Builder!

Passionate about hosting events and connecting people? Help us grow a vibrant local community—sign up today to get started!

Sign Up Now