cancel
Showing results for 
Search instead for 
Did you mean: 
Warehousing & Analytics
Engage in discussions on data warehousing, analytics, and BI solutions within the Databricks Community. Share insights, tips, and best practices for leveraging data for informed decision-making.
cancel
Showing results for 
Search instead for 
Did you mean: 

Join multiple streams with watermarks

jcozar
Contributor

Hi!

I receive three streams from a postgres CDC. These 3 tables, invoices users and products, need to be joined. I want to use a left join with respect the invoices stream. In order to compute correct results and release old states, I use watermarks and time join conditions on both sides of the join according to the official doc:

 

 

 

 

(
    spark
    .readStream
    .format("delta")
    .load(f"{DATA_PATH}/invoices/")
    .alias("invoices")
    .withWatermark("invoices.created_at", "4 seconds")
    .join(
        (
            spark
            .readStream
            .format("delta")
            .load(f"{DATA_PATH}/users/")
            .alias("users")
            .withWatermark("users.created_at", "4 seconds")
        ),
        on=(
            (F.col("invoices.user_id") == F.col("users.id"))
            & (F.col("invoices.created_at").between(F.expr("users.created_at - interval 2 seconds"), F.expr(f"users.created_at + interval 2 seconds")))
        ),
        how="left"
    )
    .join(
        (
            spark
            .readStream
            .format("delta")
            .load(f"{DATA_PATH}/products/")
            .alias("products")
            .withWatermark("products.created_at", "4 seconds")
        ),
        on=(
            (F.col("invoices.product_id") == F.col("products.id"))
            & (F.col("invoices.created_at").between(F.expr("products.created_at - interval 2 seconds"), F.expr(f"products.created_at + interval 2 seconds")))
        ),
        how="left"
    )
    .writeStream
    .outputMode("append")
    .format("memory")
    .queryName("display_experiment_3_1_1")
    .start()
)

 

 

 

However it raises the following error

 

 

 

org.apache.spark.sql.AnalysisException: More than one event time columns are available. Please ensure there is at most one event time column per stream. event time columns: (created_at#111102-T4000ms,created_at#111089-T4000ms)

 

 

 

I tested the previous code with just 2 streams and it works (invoices and users, or invoices and products). My question is, if I need to JOIN multiple tables, the rules described in the official doc are not applied JOIN by JOIN? In case of joining 3 streams, is it not possible to be correct in results and also perform state cleanups?

Thank you very much!

1 REPLY 1

Thank you very much for your help @Retired_mod ! However, I am afraid that I don't fully understand the solution, sorry! 😞

I checked your code and compared with mine and I don't see the differences. On each data stream, there is a column named `created_at` which is the time event. It works perfectly when joining two streams, but the problem raises when I try to join three (or more) streams. I understand that internally spark creates a single global watermark from the two created_at columns in a single join operation according to the doc. However, it seems that using more JOINs fails.

I tried a workaround which is writeStream the first JOIN, and then readStream and do the third JOIN. In this case it works, but it is a bit messy. I just wonder if I am doing something wrong or is it the expected behaviour?

Thank you very much!

Join Us as a Local Community Builder!

Passionate about hosting events and connecting people? Help us grow a vibrant local community—sign up today to get started!

Sign Up Now