<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic Re: Join multiple streams with watermarks in Warehousing &amp; Analytics</title>
    <link>https://community.databricks.com/t5/warehousing-analytics/join-multiple-streams-with-watermarks/m-p/65717#M1269</link>
    <description>&lt;P&gt;Thank you very much for your help &lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/9"&gt;@Retired_mod&lt;/a&gt;&amp;nbsp;! However, I am afraid that I don't fully understand the solution, sorry! &lt;span class="lia-unicode-emoji" title=":disappointed_face:"&gt;😞&lt;/span&gt;&lt;/P&gt;&lt;P&gt;I checked your code and compared with mine and I don't see the differences. On each data stream, there is a column named `created_at` which is the time event. It works perfectly when joining two streams, but the problem raises when I try to join three (or more) streams. I understand that internally spark creates a single global watermark from the two created_at columns in a single join operation according to the &lt;A href="https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#policy-for-handling-multiple-watermarks" target="_self"&gt;doc&lt;/A&gt;. However, it seems that using more JOINs fails.&lt;/P&gt;&lt;P&gt;I tried a workaround which is writeStream the first JOIN, and then readStream and do the third JOIN. In this case it works, but it is a bit messy. I just wonder if I am doing something wrong or is it the expected behaviour?&lt;/P&gt;&lt;P&gt;Thank you very much!&lt;/P&gt;</description>
    <pubDate>Sun, 07 Apr 2024 09:23:19 GMT</pubDate>
    <dc:creator>jcozar</dc:creator>
    <dc:date>2024-04-07T09:23:19Z</dc:date>
    <item>
      <title>Join multiple streams with watermarks</title>
      <link>https://community.databricks.com/t5/warehousing-analytics/join-multiple-streams-with-watermarks/m-p/65540#M1261</link>
      <description>&lt;P&gt;Hi!&lt;/P&gt;&lt;P&gt;I receive three streams from a postgres CDC. These 3 tables, invoices users and products, need to be joined. I want to use a left join with respect the invoices stream. In order to compute correct results and release old states, I use watermarks and time join conditions on both sides of the join according to the&amp;nbsp;&lt;A href="https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#support-matrix-for-joins-in-streaming-queries" target="_self"&gt;official doc:&lt;/A&gt;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;LI-CODE lang="python"&gt;(
    spark
    .readStream
    .format("delta")
    .load(f"{DATA_PATH}/invoices/")
    .alias("invoices")
    .withWatermark("invoices.created_at", "4 seconds")
    .join(
        (
            spark
            .readStream
            .format("delta")
            .load(f"{DATA_PATH}/users/")
            .alias("users")
            .withWatermark("users.created_at", "4 seconds")
        ),
        on=(
            (F.col("invoices.user_id") == F.col("users.id"))
            &amp;amp; (F.col("invoices.created_at").between(F.expr("users.created_at - interval 2 seconds"), F.expr(f"users.created_at + interval 2 seconds")))
        ),
        how="left"
    )
    .join(
        (
            spark
            .readStream
            .format("delta")
            .load(f"{DATA_PATH}/products/")
            .alias("products")
            .withWatermark("products.created_at", "4 seconds")
        ),
        on=(
            (F.col("invoices.product_id") == F.col("products.id"))
            &amp;amp; (F.col("invoices.created_at").between(F.expr("products.created_at - interval 2 seconds"), F.expr(f"products.created_at + interval 2 seconds")))
        ),
        how="left"
    )
    .writeStream
    .outputMode("append")
    .format("memory")
    .queryName("display_experiment_3_1_1")
    .start()
)&lt;/LI-CODE&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;However it raises the following error&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;LI-CODE lang="markup"&gt;org.apache.spark.sql.AnalysisException: More than one event time columns are available. Please ensure there is at most one event time column per stream. event time columns: (created_at#111102-T4000ms,created_at#111089-T4000ms)&lt;/LI-CODE&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;I tested the previous code with just 2 streams and it works (invoices and users, or invoices and products). My question is, if I need to JOIN multiple tables, the rules described in the &lt;A href="https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#support-matrix-for-joins-in-streaming-queries" target="_self"&gt;official doc&lt;/A&gt;&amp;nbsp;are not applied JOIN by JOIN? In case of joining 3 streams, is it not possible to be&amp;nbsp;&lt;SPAN&gt;correct in results and also perform state cleanups?&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;Thank you very much!&lt;/P&gt;</description>
      <pubDate>Thu, 04 Apr 2024 22:14:13 GMT</pubDate>
      <guid>https://community.databricks.com/t5/warehousing-analytics/join-multiple-streams-with-watermarks/m-p/65540#M1261</guid>
      <dc:creator>jcozar</dc:creator>
      <dc:date>2024-04-04T22:14:13Z</dc:date>
    </item>
    <item>
      <title>Re: Join multiple streams with watermarks</title>
      <link>https://community.databricks.com/t5/warehousing-analytics/join-multiple-streams-with-watermarks/m-p/65717#M1269</link>
      <description>&lt;P&gt;Thank you very much for your help &lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/9"&gt;@Retired_mod&lt;/a&gt;&amp;nbsp;! However, I am afraid that I don't fully understand the solution, sorry! &lt;span class="lia-unicode-emoji" title=":disappointed_face:"&gt;😞&lt;/span&gt;&lt;/P&gt;&lt;P&gt;I checked your code and compared with mine and I don't see the differences. On each data stream, there is a column named `created_at` which is the time event. It works perfectly when joining two streams, but the problem raises when I try to join three (or more) streams. I understand that internally spark creates a single global watermark from the two created_at columns in a single join operation according to the &lt;A href="https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#policy-for-handling-multiple-watermarks" target="_self"&gt;doc&lt;/A&gt;. However, it seems that using more JOINs fails.&lt;/P&gt;&lt;P&gt;I tried a workaround which is writeStream the first JOIN, and then readStream and do the third JOIN. In this case it works, but it is a bit messy. I just wonder if I am doing something wrong or is it the expected behaviour?&lt;/P&gt;&lt;P&gt;Thank you very much!&lt;/P&gt;</description>
      <pubDate>Sun, 07 Apr 2024 09:23:19 GMT</pubDate>
      <guid>https://community.databricks.com/t5/warehousing-analytics/join-multiple-streams-with-watermarks/m-p/65717#M1269</guid>
      <dc:creator>jcozar</dc:creator>
      <dc:date>2024-04-07T09:23:19Z</dc:date>
    </item>
  </channel>
</rss>

