cancel
Showing results for 
Search instead for 
Did you mean: 
Warehousing & Analytics
cancel
Showing results for 
Search instead for 
Did you mean: 

Join multiple streams with watermarks

jcozar
Contributor

Hi!

I receive three streams from a postgres CDC. These 3 tables, invoices users and products, need to be joined. I want to use a left join with respect the invoices stream. In order to compute correct results and release old states, I use watermarks and time join conditions on both sides of the join according to the official doc:

 

 

 

 

(
    spark
    .readStream
    .format("delta")
    .load(f"{DATA_PATH}/invoices/")
    .alias("invoices")
    .withWatermark("invoices.created_at", "4 seconds")
    .join(
        (
            spark
            .readStream
            .format("delta")
            .load(f"{DATA_PATH}/users/")
            .alias("users")
            .withWatermark("users.created_at", "4 seconds")
        ),
        on=(
            (F.col("invoices.user_id") == F.col("users.id"))
            & (F.col("invoices.created_at").between(F.expr("users.created_at - interval 2 seconds"), F.expr(f"users.created_at + interval 2 seconds")))
        ),
        how="left"
    )
    .join(
        (
            spark
            .readStream
            .format("delta")
            .load(f"{DATA_PATH}/products/")
            .alias("products")
            .withWatermark("products.created_at", "4 seconds")
        ),
        on=(
            (F.col("invoices.product_id") == F.col("products.id"))
            & (F.col("invoices.created_at").between(F.expr("products.created_at - interval 2 seconds"), F.expr(f"products.created_at + interval 2 seconds")))
        ),
        how="left"
    )
    .writeStream
    .outputMode("append")
    .format("memory")
    .queryName("display_experiment_3_1_1")
    .start()
)

 

 

 

However it raises the following error

 

 

 

org.apache.spark.sql.AnalysisException: More than one event time columns are available. Please ensure there is at most one event time column per stream. event time columns: (created_at#111102-T4000ms,created_at#111089-T4000ms)

 

 

 

I tested the previous code with just 2 streams and it works (invoices and users, or invoices and products). My question is, if I need to JOIN multiple tables, the rules described in the official doc are not applied JOIN by JOIN? In case of joining 3 streams, is it not possible to be correct in results and also perform state cleanups?

Thank you very much!

2 REPLIES 2

Kaniz
Community Manager
Community Manager

Hi @jcozarIt seems you’re encountering an issue with multiple event time columns in your Spark Structured Streaming join.

Let’s break down the problem and find a solution.

  1. Event Time Columns:

    • In Spark Structured Streaming, event time is crucial for handling streaming data. Each stream should have a single event time column.
    • The error message you received indicates that there are two event time columns available: created_at#111102-T4000ms and created_at#111089-T4000ms.
    • To resolve this, we need to ensure that each stream has only one event time column.
  2. Watermarks and Time Join Conditions:

    • You’ve correctly used watermarks and time join conditions to handle late data and release old states.
    • However, when joining multiple streams, both sides of the join must have a watermark and a time interval clause.
    • The time interval clause helps the streaming engine determine when no further matches can be made.
  3. Solution:

    • Make sure that each stream (invoices, users, and products) has a single event time column.
    • Adjust your code to include watermarks and time join conditions on both sides of each join.
    • Here’s an example of how you can modify your code:
(
    spark.readStream
    .format("delta")
    .load(f"{DATA_PATH}/invoices/")
    .alias("invoices")
    .withWatermark("invoices.created_at", "4 seconds")
    .join(
        (
            spark.readStream
            .format("delta")
            .load(f"{DATA_PATH}/users/")
            .alias("users")
            .withWatermark("users.created_at", "4 seconds")
        ),
        on=(
            (F.col("invoices.user_id") == F.col("users.id"))
            & (F.col("invoices.created_at").between(F.expr("users.created_at - interval 2 seconds"), F.expr("users.created_at + interval 2 seconds")))
        ),
        how="left"
    )
    .join(
        (
            spark.readStream
            .format("delta")
            .load(f"{DATA_PATH}/products/")
            .alias("products")
            .withWatermark("products.created_at", "4 seconds")
        ),
        on=(
            (F.col("invoices.product_id") == F.col("products.id"))
            & (F.col("invoices.created_at").between(F.expr("products.created_at - interval 2 seconds"), F.expr("products.created_at + interval 2 seconds")))
        ),
        how="left"
    )
    .writeStream
    .outputMode("append")
    .format("memory")
    .queryName("display_experiment_3_1_1")
    .start()
)

Remember to adjust the column names and paths according to your specific use case. If you encounter any further issues, feel free to ask!

 

Thank you very much for your help @Kaniz ! However, I am afraid that I don't fully understand the solution, sorry! 😞

I checked your code and compared with mine and I don't see the differences. On each data stream, there is a column named `created_at` which is the time event. It works perfectly when joining two streams, but the problem raises when I try to join three (or more) streams. I understand that internally spark creates a single global watermark from the two created_at columns in a single join operation according to the doc. However, it seems that using more JOINs fails.

I tried a workaround which is writeStream the first JOIN, and then readStream and do the third JOIN. In this case it works, but it is a bit messy. I just wonder if I am doing something wrong or is it the expected behaviour?

Thank you very much!