cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

Custom stream-stream join using transformWithState - expanded example

alexbush-mas
New Contributor

Hi all,

Just wondering if anyone has more information / an expanded example of the "Custom stream-stream join using transformWithState" example on Stateful applications page: Example stateful applications | Databricks on AWS

I'm looking to implement something similar, mainly for the following:

  • You need to continue to perform joins for late-arriving rows (after watermark expiry).

Effectively, I want to combine the SCD1 and Join applications to be able to perform Stream-Stream joins without time-based event expiry and to manage the updating of records myself. There are between 200,000 and 1,000,000 active rows per stream across three streams evenly distributed across keys, so I'm fairly confident RocksDB can handle the size of the state I need to maintain.

What's missing from the example is the stream setup. The example groups and 'Joins' within a single stream:

 

# Apply transformWithState to the input DataFrame
(df.groupBy("user_id")
  .transformWithStateInPandas(
      statefulProcessor=CustomStreamJoinProcessor(),
      outputStructType=output_schema,
      outputMode="Append",
      timeMode="ProcessingTime"
  )
  .writeStream...  # Continue with stream writing configuration
)

 

To combine the streams, are they doing something like the following?

 

profile_df = spark.readStream.(...).load()
preferences_df = spark.readStream.(...).load()
activity_df = spark.readStream.(...).load()
df = (
    profile_df
      .unionByName(preferences_df, allowMissingColumns=True)
      .unionByName(activity_df, allowMissingColumns=True)
)

You can see the example determines the input row type by determining if the column is present in the pandas dataframe or not. Does that mean the null columns from the union aren't present in the pandas data? Otherwise, I'm not quite show how each row can have a different schema since the dataframe itself will have a fixed schema:

        df = pd.concat(rows, ignore_index=True)
        output_rows = []

        for _, row in df.iterrows():
            user_id = row["user_id"]

            if "event_type" in row:  # User activity event
                self.activity_state.updateValue(user_id, row.to_dict())
                # Set a timer to process this event after a 10-second delay
                self.handle.registerTimer(timerValues.get_current_processing_time_in_ms() + (10 * 1000))

            elif "name" in row:  # Profile update
                self.profile_state.updateValue(user_id, row.to_dict())

            elif "preferred_category" in row:  # Preference update
                self.preferences_state.updateValue(user_id, row.to_dict())

It would be great if there was a complete worked notebook example like there are for some of the other stateful examples on that page.

 

1 REPLY 1

Khaja_Zaffer
Contributor

Hello @alexbush-mas 

Good day!! 

Unioning the streams is the standard method for feeding multiple input streams into a single transformWithStateInPandas operation for custom stream-stream joins, so your intuition is correct. After grouping the input by the join key (user_id), the example in the Databricks documentation https://learn.microsoft.com/en-gb/azure/databricks/stateful-applications/examples (under Example stateful applications) treats it as a single DataFrame; however, it specifically refers to this pattern for "joins across multiple streams." Prior to the groupBy and transform, the union takes place upstream.

I am trying to replicate the same but not sure the time line. I will give it a try by Monday.