cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

DLT streaming table and LEFT JOIN

Daba
New Contributor III

I'm trying to build gold level streaming live table based on two streaming silver live tables with left join.

This attempt fails with the next error:

"Append mode error: Stream-stream LeftOuter join between two streaming DataFrame/Datasets is not supported without a watermark in the join keys, or a watermark on the nullable side and an appropriate range condition".

My question is how I can implement the watermark with SQL interface? Where can I find examples?

Thanks.

5 REPLIES 5

Kaniz
Community Manager
Community Manager

Hi @Alexander Plepler​, You cannot use append mode on an aggregated DataFrame without a watermark. This is by design.

You must apply a watermark to the DataFrame if you want to use append mode on an aggregated DataFrame.

The aggregation must have an event-time column or a window on the event-time column.

Group the data by window and word and compute the count of each group. 

.withWatermark() must be called on the same column as the timestamp column used in the aggregation.

The example code shows how this can be done.

Replace the value <type> with the kind of element you are processing. For example, you would use Row if you are processing by row.

Replace the value <words> with the streaming DataFrame of schema { timestamp: Timestamp, word: String }.

JAVA

Dataset<type> windowedCounts = <words>
    .withWatermark("timestamp", "10 minutes")
    .groupBy(
        functions.window(words.col("timestamp"), "10 minutes", "5 minutes"),
        words.col("word"))
    .count();

PYTHON

windowedCounts = <words> \
    .withWatermark("timestamp", "10 minutes") \
    .groupBy(
        window(words.timestamp, "10 minutes", "5 minutes"),
        words.word) \
    .count()

SCALA

import spark.implicits._
 
val windowedCounts = <words>
    .withWatermark("timestamp", "10 minutes")
    .groupBy(
        window($"timestamp", "10 minutes", "5 minutes"),
        $"word")
    .count()

You must call .withWatermark() before you perform the aggregation. Attempting otherwise fails with an error message.

For example, 

df.groupBy("time").count().withWatermark("time", "1 min")

returns an exception.

Please refer to the Apache Spark™ documentation on conditions for watermarking to clean the aggregation slate for more information.

SOURCE

Daba
New Contributor III

Thanks Fatma,

I do understand the need for watermarks, but I'm just wondering if this supported by SQL syntax?

TomRenish
New Contributor III

I'm wrestling with similar questions. This article makes it pretty clear that you'll need to use scala/python/java/r: streaming writes

Kaniz
Community Manager
Community Manager

Hi @Alexander Plepler​, This article introduces the basic concepts of watermarking and provides recommendations for using watermarks in everyday stateful streaming operations.

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.