Hi team,
I'm struck in a Spark Structured streaming use-case.
Requirement: To read two streaming data frames, perform a left join on it and display the results.
Issue: While performing a left join, the resultant data frame contains only rows where there was a match and discards the rest of unmatched rows.
Ex:
Left table - 200 rows, Right table - 150 rows
Final output - 150 rows(These are the ones with key matches).
Code Snippet:
As stream-stream join requires defining watermark column and event-time range conditions, I have applied the same in the below code.
My data sources used in this example are static tables(table 1, table 2), but just to process just the incremental data I used spark.readStream() while performing the read. Kindly note there are no duplicates on either tables.
# COMMAND ----------
bronze_df1 = spark.readStream.table("catalog.schema.table1").withColumn("load_timestamp", current_timestamp())
display(bronze_df1) #200 rows
# COMMAND ----------
bronze_df2 = spark.readStream.table("catalog.schema.table2").withColumn("load_timestamp", current_timestamp())
display(bronze_df2) #150 rows
# COMMAND ----------
bronze_df1 = bronze_df1.withWatermark("load_timestamp","5 minutes")
bronze_df1.createOrReplaceTempView("bronze_df1_view") # We can create a view out of streaming dataframe as per Spark Streaming documentation
display(spark.sql("select * from bronze_df1_view")) #200 rows
# COMMAND ----------
bronze_df2 = bronze_df2.withWatermark("load_timestamp","5 minutes")
bronze_df2.createOrReplaceTempView("bronze_df2_view")
display(spark.sql("select * from bronze_df2_view"))
# COMMAND ----------
# Define the SQL query to transform the
final_df = spark.sql(f"""
select
a.id,
a.asset_type,
a.country_code,
a.state_code,
b.id, --This can be null,
a.load_timestamp
FROM bronze_df1_view a
LEFT OUTER JOIN bronze_df2_view b
ON
a.id = b.id and
b.load_timestamp between a.load_timestamp - interval 10 minutes and a.load_timestamp + interval 10 minutes
--Giving a bigger event-time range just to ensure none of the data gets ruled out
""")
display(final_df) #As soon as the execution starts, I could see 150 matched rows and rest of the unmatched rows never get printed on the console despite letting the job run for a longer time.
P.S: As mentioned earlier, mine is a very simple streaming use-case (Just to process incremental data). Since the source tables are static tables, I don't expect any data arriving late or out-of-order. I have tried using multiple values in watermark column ranging from 0 seconds to 1 hours, yet every time I see left join results are same as inner join.
P.P.S: I have been stuck into this issue for over a month now, any heads up you can provide will be much appreciated. Thanks!