cancel
Showing results for 
Search instead for 
Did you mean: 
Warehousing & Analytics
Engage in discussions on data warehousing, analytics, and BI solutions within the Databricks Community. Share insights, tips, and best practices for leveraging data for informed decision-making.
cancel
Showing results for 
Search instead for 
Did you mean: 

Join multiple streams with watermarks

jcozar
Contributor

Hi!

I receive three streams from a postgres CDC. These 3 tables, invoices users and products, need to be joined. I want to use a left join with respect the invoices stream. In order to compute correct results and release old states, I use watermarks and time join conditions on both sides of the join according to the official doc:

 

 

 

 

(
    spark
    .readStream
    .format("delta")
    .load(f"{DATA_PATH}/invoices/")
    .alias("invoices")
    .withWatermark("invoices.created_at", "4 seconds")
    .join(
        (
            spark
            .readStream
            .format("delta")
            .load(f"{DATA_PATH}/users/")
            .alias("users")
            .withWatermark("users.created_at", "4 seconds")
        ),
        on=(
            (F.col("invoices.user_id") == F.col("users.id"))
            & (F.col("invoices.created_at").between(F.expr("users.created_at - interval 2 seconds"), F.expr(f"users.created_at + interval 2 seconds")))
        ),
        how="left"
    )
    .join(
        (
            spark
            .readStream
            .format("delta")
            .load(f"{DATA_PATH}/products/")
            .alias("products")
            .withWatermark("products.created_at", "4 seconds")
        ),
        on=(
            (F.col("invoices.product_id") == F.col("products.id"))
            & (F.col("invoices.created_at").between(F.expr("products.created_at - interval 2 seconds"), F.expr(f"products.created_at + interval 2 seconds")))
        ),
        how="left"
    )
    .writeStream
    .outputMode("append")
    .format("memory")
    .queryName("display_experiment_3_1_1")
    .start()
)

 

 

 

However it raises the following error

 

 

 

org.apache.spark.sql.AnalysisException: More than one event time columns are available. Please ensure there is at most one event time column per stream. event time columns: (created_at#111102-T4000ms,created_at#111089-T4000ms)

 

 

 

I tested the previous code with just 2 streams and it works (invoices and users, or invoices and products). My question is, if I need to JOIN multiple tables, the rules described in the official doc are not applied JOIN by JOIN? In case of joining 3 streams, is it not possible to be correct in results and also perform state cleanups?

Thank you very much!

1 REPLY 1

Thank you very much for your help @Retired_mod ! However, I am afraid that I don't fully understand the solution, sorry! 😞

I checked your code and compared with mine and I don't see the differences. On each data stream, there is a column named `created_at` which is the time event. It works perfectly when joining two streams, but the problem raises when I try to join three (or more) streams. I understand that internally spark creates a single global watermark from the two created_at columns in a single join operation according to the doc. However, it seems that using more JOINs fails.

I tried a workaround which is writeStream the first JOIN, and then readStream and do the third JOIN. In this case it works, but it is a bit messy. I just wonder if I am doing something wrong or is it the expected behaviour?

Thank you very much!

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group