Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
Showing results for 
Search instead for 
Did you mean: 

DLT Streaming With Watermark fails, suggesting I should add watermarks

New Contributor II

Hi all,

I have the following Problem: I have two streaming tables containing time-series measurements from different sensor data, each feed by multiple sensors. (Imagine: Multiple Temperature Sensors for the first table, and multiple humidity sensors for the second table). Each sensor has an sensor_id. In addition, I have an table, which sensors should be merged together. I want to use DLT streaming tables for that. However, I get the following error:

com.databricks.pipelines.common.errors.DLTAnalysisException: Failed to start stream temp_and_humidity in either append mode or complete mode.
Append mode error: [STREAMING_OUTPUT_MODE.UNSUPPORTED_OPERATION] Invalid streaming output mode: append. This output mode is not supported for streaming aggregations without watermark on streaming DataFrames/DataSets. SQLSTATE: 42KDE
Complete mode error: Join between two streaming DataFrames/Datasets is not supported in Complete output mode, only in Append output mode;


 Problem is: I added Watermarks, following this guide: so I think the Append mode error should not happen.

My code looks roughly like this (I omitted some details here):

def temp_and_humidity():
    temp = spark.readStream.table("LIVE.temp")
    humidity = spark.readStream.table("LIVE.humidity")
    associations ="LIVE.senosr_associations") # note, that this is a VIEW
    # Basically, we want to do an AS OF JOIN here
    # The algorithm works like this:
    #.  1. do a UNION BY NAME between the two data frames
    #.  2. GROUP BY a small window <- here we need a watermark as far as I understand
    # Making sure, that the schemas are compatible
    temp = temp.withColumn("humidity", F.lit(None))
    humidity = humidity.withColumn("temp", F.lit(None)).withColumn("sensor_group_id", "sensor_id")

    # I think, this join is not allowed in complete mode (which I do not want anyway!)
    temp = temp.join(associations, ...).drop(...) # Some column name manipulations, adding the "sensor_group_id" column
    # Merging the data
    temp_and_humidity = temp.unionByName(humidity)

    # Finally, we can aggregate, so that measurements "from the same time" are in one row
    # We have prior knowledge, that each sensor sends data once per minute
    returm (
            .withColumn("time_window", F.window("timestamp", "1 minute"))
            .withWatermark("time_window", "1 minute").    # <- we add the watermark!
            .groupBy("time_window", "sensor_group_id")
                F.first("humidity", ignorenulls=True), 
                F.first("temp", ignorenulls=True)

I have tried different variants (like setting the Watermark on the "timestamp" column instead of the "time_window"), with no success. Can anyone help me out? Some suggestions, what am I missing here?

Greetings, Daniel


Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group