cancel
Showing results for 
Search instead for 
Did you mean: 
Warehousing & Analytics
cancel
Showing results for 
Search instead for 
Did you mean: 

Stream static join with aggregation

rocky5
New Contributor III

Hi,

I am trying to make Stream - Static join with aggregation with no luck. I have a streaming table where I am getting events with two nasted arrays

ID   Array1   Array2

1     [1,2]     [3,4]

I need make two joins to static dictionary tables (without any timestamps) based on IDs contained in arrays, second join is based on first, once I will get descriptive values I need to assign and sum scores, so result should be like that:

ID   Score1   Score2

1      10           12     

so one record per id. I am using Databricks SQL. Any ideas how I can make it? Exploding arrays cause cartesian results, when I join to scoring table results will be incorrect. I was thinking to join by array and get an array in both cases, but I am getting errors that I need to apply watermark (even to static tables!). Appreciate any ideas....

1 REPLY 1

Kaniz
Community Manager
Community Manager

Hi @rocky5 . You want to perform a stream-static join with aggregation in Databricks SQL, where you have a streaming table with nested arrays and need to join it with static dictionary tables based on IDs contained in those arrays.

Here are the steps to achieve your desired result:

  1. Explode the Arrays:

    • First, you’ll need to explode the arrays in your streaming table to create separate rows for each element within the arrays. This will avoid the cartesian results issue.
    • Use the EXPLODE function to achieve this. For example:
      SELECT ID, EXPLODE(Array1) AS Array1Element, EXPLODE(Array2) AS Array2Element
      FROM your_streaming_table
      
  2. Join with Static Tables:

    • Next, join the exploded streaming table with your static dictionary tables based on the IDs.
    • You can use regular joins (INNER, LEFT, RIGHT, or FULL) to join the exploded table with your static tables.
    • For example:
      SELECT
          s.ID,
          d1.Score1,
          d2.Score2
      FROM
          (SELECT ID, EXPLODE(Array1) AS Array1Element, EXPLODE(Array2) AS Array2Element FROM your_streaming_table) s
      LEFT JOIN
          dictionary_table1 d1
      ON
          s.Array1Element = d1.ID
      LEFT JOIN
          dictionary_table2 d2
      ON
          s.Array2Element = d2.ID
      
  3. Aggregate and Sum Scores:

    • After joining with the static tables, you’ll have multiple rows for each ID due to the exploded arrays.
    • Group by the ID and aggregate the scores using SUM:
      SELECT
          s.ID,
          SUM(d1.Score1) AS Score1,
          SUM(d2.Score2) AS Score2
      FROM
          (SELECT ID, EXPLODE(Array1) AS Array1Element, EXPLODE(Array2) AS Array2Element FROM your_streaming_table) s
      LEFT JOIN
          dictionary_table1 d1
      ON
          s.Array1Element = d1.ID
      LEFT JOIN
          dictionary_table2 d2
      ON
          s.Array2Element = d2.ID
      GROUP BY
          s.ID
      
  4. Result:

    • The final result will have one record per ID with the aggregated scores.

Remember to replace your_streaming_table, dictionary_table1, and dictionary_table2 with your actual table names. Also, ensure that the static tables are loaded into memory before running the query.

If you encounter issues related to watermarks, consider using Delta Live Tables or ensuring that your static tables have reliable watermark columns. Stream-static joins are a good choice when denormalizing continuous streams with static dimension ta...12.

I hope this helps! Let me know if you have any further questions or need additional assistance. 😊