Hi all,
Just wondering if anyone has more information / an expanded example of the "Custom stream-stream join using transformWithState" example on Stateful applications page: Example stateful applications | Databricks on AWS
I'm looking to implement something similar, mainly for the following:
- You need to continue to perform joins for late-arriving rows (after watermark expiry).
Effectively, I want to combine the SCD1 and Join applications to be able to perform Stream-Stream joins without time-based event expiry and to manage the updating of records myself. There are between 200,000 and 1,000,000 active rows per stream across three streams evenly distributed across keys, so I'm fairly confident RocksDB can handle the size of the state I need to maintain.
What's missing from the example is the stream setup. The example groups and 'Joins' within a single stream:
# Apply transformWithState to the input DataFrame
(df.groupBy("user_id")
.transformWithStateInPandas(
statefulProcessor=CustomStreamJoinProcessor(),
outputStructType=output_schema,
outputMode="Append",
timeMode="ProcessingTime"
)
.writeStream... # Continue with stream writing configuration
)
To combine the streams, are they doing something like the following?
profile_df = spark.readStream.(...).load()
preferences_df = spark.readStream.(...).load()
activity_df = spark.readStream.(...).load()
df = (
profile_df
.unionByName(preferences_df, allowMissingColumns=True)
.unionByName(activity_df, allowMissingColumns=True)
)
You can see the example determines the input row type by determining if the column is present in the pandas dataframe or not. Does that mean the null columns from the union aren't present in the pandas data? Otherwise, I'm not quite show how each row can have a different schema since the dataframe itself will have a fixed schema:
df = pd.concat(rows, ignore_index=True)
output_rows = []
for _, row in df.iterrows():
user_id = row["user_id"]
if "event_type" in row: # User activity event
self.activity_state.updateValue(user_id, row.to_dict())
# Set a timer to process this event after a 10-second delay
self.handle.registerTimer(timerValues.get_current_processing_time_in_ms() + (10 * 1000))
elif "name" in row: # Profile update
self.profile_state.updateValue(user_id, row.to_dict())
elif "preferred_category" in row: # Preference update
self.preferences_state.updateValue(user_id, row.to_dict())
It would be great if there was a complete worked notebook example like there are for some of the other stateful examples on that page.