cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

Custom stream-stream join using transformWithState - expanded example

alexbush-mas
Visitor

Hi all,

Just wondering if anyone has more information / an expanded example of the "Custom stream-stream join using transformWithState" example on Stateful applications page: Example stateful applications | Databricks on AWS

I'm looking to implement something similar, mainly for the following:

  • You need to continue to perform joins for late-arriving rows (after watermark expiry).

Effectively, I want to combine the SCD1 and Join applications to be able to perform Stream-Stream joins without time-based event expiry and to manage the updating of records myself. There are between 200,000 and 1,000,000 active rows per stream across three streams evenly distributed across keys, so I'm fairly confident RocksDB can handle the size of the state I need to maintain.

What's missing from the example is the stream setup. The example groups and 'Joins' within a single stream:

 

# Apply transformWithState to the input DataFrame
(df.groupBy("user_id")
  .transformWithStateInPandas(
      statefulProcessor=CustomStreamJoinProcessor(),
      outputStructType=output_schema,
      outputMode="Append",
      timeMode="ProcessingTime"
  )
  .writeStream...  # Continue with stream writing configuration
)

 

To combine the streams, are they doing something like the following?

 

profile_df = spark.readStream.(...).load()
preferences_df = spark.readStream.(...).load()
activity_df = spark.readStream.(...).load()
df = (
    profile_df
      .unionByName(preferences_df, allowMissingColumns=True)
      .unionByName(activity_df, allowMissingColumns=True)
)

You can see the example determines the input row type by determining if the column is present in the pandas dataframe or not. Does that mean the null columns from the union aren't present in the pandas data? Otherwise, I'm not quite show how each row can have a different schema since the dataframe itself will have a fixed schema:

        df = pd.concat(rows, ignore_index=True)
        output_rows = []

        for _, row in df.iterrows():
            user_id = row["user_id"]

            if "event_type" in row:  # User activity event
                self.activity_state.updateValue(user_id, row.to_dict())
                # Set a timer to process this event after a 10-second delay
                self.handle.registerTimer(timerValues.get_current_processing_time_in_ms() + (10 * 1000))

            elif "name" in row:  # Profile update
                self.profile_state.updateValue(user_id, row.to_dict())

            elif "preferred_category" in row:  # Preference update
                self.preferences_state.updateValue(user_id, row.to_dict())

It would be great if there was a complete worked notebook example like there are for some of the other stateful examples on that page.

 

0 REPLIES 0

Join Us as a Local Community Builder!

Passionate about hosting events and connecting people? Help us grow a vibrant local communityโ€”sign up today to get started!

Sign Up Now