06-07-2022 09:16 AM
I'm trying to build gold level streaming live table based on two streaming silver live tables with left join.
This attempt fails with the next error:
"Append mode error: Stream-stream LeftOuter join between two streaming DataFrame/Datasets is not supported without a watermark in the join keys, or a watermark on the nullable side and an appropriate range condition".
My question is how I can implement the watermark with SQL interface? Where can I find examples?
Thanks.
06-10-2022 01:44 AM
Hi @Alexander Plepler, You cannot use append mode on an aggregated DataFrame without a watermark. This is by design.
You must apply a watermark to the DataFrame if you want to use append mode on an aggregated DataFrame.
The aggregation must have an event-time column or a window on the event-time column.
Group the data by window and word and compute the count of each group.
.withWatermark() must be called on the same column as the timestamp column used in the aggregation.
The example code shows how this can be done.
Replace the value <type> with the kind of element you are processing. For example, you would use Row if you are processing by row.
Replace the value <words> with the streaming DataFrame of schema { timestamp: Timestamp, word: String }.
JAVA
Dataset<type> windowedCounts = <words>
.withWatermark("timestamp", "10 minutes")
.groupBy(
functions.window(words.col("timestamp"), "10 minutes", "5 minutes"),
words.col("word"))
.count();
PYTHON
windowedCounts = <words> \
.withWatermark("timestamp", "10 minutes") \
.groupBy(
window(words.timestamp, "10 minutes", "5 minutes"),
words.word) \
.count()
SCALA
import spark.implicits._
val windowedCounts = <words>
.withWatermark("timestamp", "10 minutes")
.groupBy(
window($"timestamp", "10 minutes", "5 minutes"),
$"word")
.count()
You must call .withWatermark() before you perform the aggregation. Attempting otherwise fails with an error message.
For example,
df.groupBy("time").count().withWatermark("time", "1 min")
returns an exception.
Please refer to the Apache Spark™ documentation on conditions for watermarking to clean the aggregation slate for more information.
06-12-2022 01:02 AM
Thanks Fatma,
I do understand the need for watermarks, but I'm just wondering if this supported by SQL syntax?
02-24-2023 11:45 AM
I'm wrestling with similar questions. This article makes it pretty clear that you'll need to use scala/python/java/r: streaming writes
02-24-2023 11:49 AM
Hi @Alexander Plepler, This article introduces the basic concepts of watermarking and provides recommendations for using watermarks in everyday stateful streaming operations.
02-24-2023 02:18 PM
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.
If there isn’t a group near you, start one and help create a community that brings people together.
Request a New Group