Hi!
I receive three streams from a postgres CDC. These 3 tables, invoices users and products, need to be joined. I want to use a left join with respect the invoices stream. In order to compute correct results and release old states, I use watermarks and time join conditions on both sides of the join according to the official doc:
(
spark
.readStream
.format("delta")
.load(f"{DATA_PATH}/invoices/")
.alias("invoices")
.withWatermark("invoices.created_at", "4 seconds")
.join(
(
spark
.readStream
.format("delta")
.load(f"{DATA_PATH}/users/")
.alias("users")
.withWatermark("users.created_at", "4 seconds")
),
on=(
(F.col("invoices.user_id") == F.col("users.id"))
& (F.col("invoices.created_at").between(F.expr("users.created_at - interval 2 seconds"), F.expr(f"users.created_at + interval 2 seconds")))
),
how="left"
)
.join(
(
spark
.readStream
.format("delta")
.load(f"{DATA_PATH}/products/")
.alias("products")
.withWatermark("products.created_at", "4 seconds")
),
on=(
(F.col("invoices.product_id") == F.col("products.id"))
& (F.col("invoices.created_at").between(F.expr("products.created_at - interval 2 seconds"), F.expr(f"products.created_at + interval 2 seconds")))
),
how="left"
)
.writeStream
.outputMode("append")
.format("memory")
.queryName("display_experiment_3_1_1")
.start()
)
However it raises the following error
org.apache.spark.sql.AnalysisException: More than one event time columns are available. Please ensure there is at most one event time column per stream. event time columns: (created_at#111102-T4000ms,created_at#111089-T4000ms)
I tested the previous code with just 2 streams and it works (invoices and users, or invoices and products). My question is, if I need to JOIN multiple tables, the rules described in the official doc are not applied JOIN by JOIN? In case of joining 3 streams, is it not possible to be correct in results and also perform state cleanups?
Thank you very much!