cancel
Showing results for 
Search instead for 
Did you mean: 
Community Articles
Dive into a collaborative space where members like YOU can exchange knowledge, tips, and best practices. Join the conversation today and unlock a wealth of collective wisdom to enhance your experience and drive success.
cancel
Showing results for 
Search instead for 
Did you mean: 

Designing Reliable Stream–Stream Joins with Watermarks in Databricks

Divaker_Soni
New Contributor II

Stream–stream joins are one of the most powerful features in Databricks Structured Streaming – and also one of the easiest to misconfigure. As soon as you move from simple append-only pipelines to real-time correlations across multiple streams (orders vs payments, clicks vs impressions, IoT readings vs alerts), you run into questions like:

  • How do you prevent unbounded state growth in memory?

  • How do you control how long Spark should wait for late events?

  • What happens when the business wants a left join, but the platform complains about missing watermarks?

This post walks through how to design robust stream–stream joins with watermarks in Databricks, focusing on:

  • Why watermarks are effectively mandatory for non-trivial stream–stream joins.

  • How watermarks interact with join types and time-range conditions.

  • What the common “append mode + outer join” error is really telling you.

  • Practical design patterns that work in production (stream–stream, stream–static, and late-event handling).


1. Why Stream–Stream Joins Need Watermarks

A stream–stream join keeps state: Spark must hold past records from both sides until it is confident there is no more matching data for them. Without a bound, that state can grow without limit.

Watermarks give Spark a time fence on event-time columns. Conceptually, a watermark on column event_time with "10 minutes" says:

“I will not get data older than max(event_time_seen) – 10 minutes.”

This allows Spark to safely drop old state from the join buffer once rows fall behind the watermark (plus any join window you define).

Example (Scala):

 
scala
val orders = spark.readStream .format("cloudFiles") .option("cloudFiles.format", "json") .load("/raw/orders") .withColumn("event_time", col("event_time").cast("timestamp")) .withWatermark("event_time", "10 minutes") val payments = spark.readStream .format("cloudFiles") .option("cloudFiles.format", "json") .load("/raw/payments") .withColumn("event_time", col("event_time").cast("timestamp")) .withWatermark("event_time", "20 minutes")

Here Spark:

  • Tracks the maximum event_time seen for each stream.

  • Periodically evicts join state that is older than the corresponding watermark plus the configured join window.

Without this contract, Spark has no guarantee that data will not arrive infinitely late, so it cannot safely clear state for a stream–stream join.


2. Understanding the Common Outer-Join Error

Many people eventually hit an error along the lines of:

Append mode is not supported for stream-stream left outer join between two streaming DataFrames/Datasets without watermark in the join keys, or watermark on the nullable side and an appropriate range condition.

Translated into practical terms:

  • You are doing a left (or right/full) stream–stream join in append mode.

  • Spark does not see a watermark on the time column involved in the join condition, or there is no time-range condition that uses that column.

For outer joins in append mode, Spark must know when a row on the preserved side is “final” – meaning no more matches can arrive. “Final” is defined using:

  • A watermark on the nullable side (the side that can produce nulls after the join).

  • A time-range condition in the join predicate that constrains how late a matching row may arrive.

If either of these is missing, Spark cannot safely decide when to emit a final result and drop state, so it rejects the query in append mode.


3. Building a Correct Stream–Stream Join

Consider a classic pattern:

  • orders_stream: main business events (purchase orders).

  • shipments_stream: related events (shipment confirmations, tracking updates).

You want a joined stream of (order, shipment) records with:

  • All orders.

  • Matching shipments that arrive within a reasonable time window.

  • A clear strategy for shipments that arrive too late.

3.1 Inner join with watermarks

Start with a time-bounded inner join where both sides are streaming:

 
scala
import org.apache.spark.sql.functions._ val orders = spark.readStream .format("cloudFiles") .option("cloudFiles.format", "json") .load("/raw/orders") .withColumn("event_time", col("event_time").cast("timestamp")) .withWatermark("event_time", "10 minutes") val shipments = spark.readStream .format("cloudFiles") .option("cloudFiles.format", "json") .load("/raw/shipments") .withColumn("event_time", col("event_time").cast("timestamp")) .withWatermark("event_time", "20 minutes") val joined = orders.join( shipments, expr(""" orders.order_id = shipments.order_id AND shipments.event_time BETWEEN orders.event_time AND orders.event_time + INTERVAL 10 MINUTES """), joinType = "inner" )

Key points:

  • Both sides have watermarks (10 and 20 minutes).

  • The join condition includes a time-range predicate on the event-time column.

This defines a finite window where matches are allowed:

  • Shipments may arrive after orders, but only within 10 minutes of orders.event_time.

  • Once orders are older than the effective watermark for shipments minus the allowed window, Spark can safely evict them from state.

3.2 Left outer join with watermark on the nullable side

Now imagine the requirement changes:

“Keep all orders even if there is no shipment (yet).”

This requires a left outer join. Spark only supports this in append mode if:

  • The nullable side (here, shipments) has a watermark on an event-time column.

  • The join condition includes a corresponding time-range predicate.

Example:

 
scala
val joinedLeft = orders.join( shipments, expr(""" orders.order_id = shipments.order_id AND shipments.event_time BETWEEN orders.event_time AND orders.event_time + INTERVAL 10 MINUTES """), joinType = "leftOuter" )

Logically:

  • Spark holds orders in state and waits for shipments within the configured window.

  • After the watermark window for shipments.event_time has passed, Spark considers that order “final” and emits the row (with null shipment columns if no match arrived).

  • State for that order is then dropped.

If the watermark or the time-range condition is missing, you get the outer-join append-mode error described above.


4. When You Need “Full History” on One Side

A very common real-world constraint is:

“I cannot apply a watermark on the shipments stream; I need full historical shipment data for lookups.”

In other words, you want to enrich a streaming orders flow with a full, ever-growing reference dataset (for example, a catalog of shipment codes or carrier details). With two truly streaming sides, you cannot have “infinite history, no watermark, and left outer join in append mode” at the same time; state must be bounded.

Patterns that work in practice:

4.1 Convert the reference side to a static table (stream–static join)

If one side can be materialized:

  1. Ingest/update the reference stream into a Delta table (possibly with merge / SCD logic).

  2. Use that table as a static side in a streaming job:

 
scala
val shipmentCatalog = spark.read.table("reference.shipment_codes") val orders = spark.readStream .format("cloudFiles") .option("cloudFiles.format", "json") .load("/raw/orders") .withColumn("event_time", col("event_time").cast("timestamp")) .withWatermark("event_time", "10 minutes") val joined = orders.join( shipmentCatalog, Seq("shipment_code"), "leftOuter" // stream-static left join )

Benefits:

  • No watermark needed on the static side.

  • No time-range requirement in the join condition.

  • State growth is bounded by the streaming side only.

This is usually the best choice when you truly need the full history from one side.

4.2 Two-stage architecture for “full history + streaming.”

If the reference source is naturally streaming, but you still need its full history:

  • Stage 1: Ingest the reference stream into a Delta table via a simple streaming or DLT pipeline.

  • Stage 2: Use that Delta table as the static side in a stream–static join with the main streaming flow (orders).

This separates “stateful ingestion” from “stateful joining,” which typically simplifies operations, monitoring, and recovery.


5. Handling Late Arrivals: Side Outputs and Reprocessing

Even with carefully chosen watermarks, late events are inevitable (replays, network delays, upstream backlog, etc.). Production-grade designs rarely just drop them silently.

A robust strategy:

  • Define a main join window where “on-time” events land in the primary output.

  • Route late events to a dedicated “late_events” table for offline reconciliation.

Downstream, a batch job can:

  • Try to reconcile late events with existing orders.

  • Fix the main table via updates/merges.

  • Mark late records as processed and, optionally, generate alerts if late volume spikes.

One common pattern is:

  • Main stream: events that satisfy the join’s time-window predicate.

  • Late stream: events that do not satisfy it or arrive after the watermark, often modeled with filters or left-anti joins.

In DLT or PySpark, pipelines often add flags such as valid_flag, late_flag, and reason_code, plus a dedicated late_events table to make downstream operations explicit.


6. Practical Checklist for Stream–Stream Joins

When designing your next stream–stream join in Databricks, walk through this checklist:

  • Do both sides really need to be streaming?

    • If one side can be materialized as a Delta table, prefer a stream–static join.

  • Which join type do you actually need?

    • Inner joins are simpler and more forgiving.

    • Outer joins in append mode require watermarks and time-range predicates.

  • What is your event-time column?

    • Ensure it is a true timestamp and normalize time zones early (UTC is safest).

  • Where are the watermarks?

    • At least one side for inner joins.

    • On the nullable side for outer joins, plus a time-range condition.

  • What is the acceptable late-arrival window?

    • Express it explicitly in the watermark definition and in the join condition.

    • Align it with business SLAs (for example, “shipments may arrive up to 15 minutes late”).

  • What is the plan for late events?

    • Separate table for late records.

    • Reconciliation/merge process.

    • Alerting when late volume exceeds a threshold.

  • How will you monitor the state and performance?

    • Use the Structured Streaming UI and metrics.

    • Track state-store memory, processing latency, and dropped-late counts.

    • Adjust watermarks and windows using real traffic patterns.


7. Closing Thoughts

Reliable stream–stream joins in Databricks are less about clever code and more about clear contracts around time:

  • How out-of-order is your data allowed to be?

  • How late can events arrive and still be considered valid?

  • When is a join result considered final?

Watermarks and time-range conditions encode these contracts into the platform so that:

  • State remains bounded and predictable.

  • Join outputs are logically consistent.

  • Late data is handled deliberately instead of being lost by accident.

If you already have a concrete scenario (for example, orders and shipments with late arrivals and reconciliation), you can extend this post with:

  • A full code walkthrough from raw ingestion to curated DLT tables.

  • Diagrams of “before vs after” designs.

  • Metrics snapshots to show the impact of different watermark and window choices on state size and latency.

That level of specificity tends to resonate well with Databricks Community readers who are searching for solutions to real production issues.

0 REPLIES 0

Join Us as a Local Community Builder!

Passionate about hosting events and connecting people? Help us grow a vibrant local community—sign up today to get started!

Sign Up Now