cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

DLT streaming table and LEFT JOIN

Daba
New Contributor III

I'm trying to build gold level streaming live table based on two streaming silver live tables with left join.

This attempt fails with the next error:

"Append mode error: Stream-stream LeftOuter join between two streaming DataFrame/Datasets is not supported without a watermark in the join keys, or a watermark on the nullable side and an appropriate range condition".

My question is how I can implement the watermark with SQL interface? Where can I find examples?

Thanks.

5 REPLIES 5

Kaniz_Fatma
Community Manager
Community Manager

Hi @Alexander Plepler​, You cannot use append mode on an aggregated DataFrame without a watermark. This is by design.

You must apply a watermark to the DataFrame if you want to use append mode on an aggregated DataFrame.

The aggregation must have an event-time column or a window on the event-time column.

Group the data by window and word and compute the count of each group. 

.withWatermark() must be called on the same column as the timestamp column used in the aggregation.

The example code shows how this can be done.

Replace the value <type> with the kind of element you are processing. For example, you would use Row if you are processing by row.

Replace the value <words> with the streaming DataFrame of schema { timestamp: Timestamp, word: String }.

JAVA

Dataset<type> windowedCounts = <words>
    .withWatermark("timestamp", "10 minutes")
    .groupBy(
        functions.window(words.col("timestamp"), "10 minutes", "5 minutes"),
        words.col("word"))
    .count();

PYTHON

windowedCounts = <words> \
    .withWatermark("timestamp", "10 minutes") \
    .groupBy(
        window(words.timestamp, "10 minutes", "5 minutes"),
        words.word) \
    .count()

SCALA

import spark.implicits._
 
val windowedCounts = <words>
    .withWatermark("timestamp", "10 minutes")
    .groupBy(
        window($"timestamp", "10 minutes", "5 minutes"),
        $"word")
    .count()

You must call .withWatermark() before you perform the aggregation. Attempting otherwise fails with an error message.

For example, 

df.groupBy("time").count().withWatermark("time", "1 min")

returns an exception.

Please refer to the Apache Spark™ documentation on conditions for watermarking to clean the aggregation slate for more information.

SOURCE

Daba
New Contributor III

Thanks Fatma,

I do understand the need for watermarks, but I'm just wondering if this supported by SQL syntax?

TomRenish
New Contributor III

I'm wrestling with similar questions. This article makes it pretty clear that you'll need to use scala/python/java/r: streaming writes

Hi @Alexander Plepler​, This article introduces the basic concepts of watermarking and provides recommendations for using watermarks in everyday stateful streaming operations.

NateAnth
Valued Contributor
Valued Contributor
Join 100K+ Data Experts: Register Now & Grow with Us!

Excited to expand your horizons with us? Click here to Register and begin your journey to success!

Already a member? Login and join your local regional user group! If there isn’t one near you, fill out this form and we’ll create one for you to join!