cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

windowing/Join function on streaming data failing in DLT

hnnhhnnh
New Contributor II

i have a DLT pipeline, wanted to update a perticular column value from other records, trying it using windowing first_value geting below error.
approach#1: join
I tried selfjoin initially, failing with joins are not possible in DLT with below error.
org.apache.spark.sql.catalyst.ExtendedAnalysisException: Stream-stream LeftOuter join between two streaming DataFrame/Datasets is not supported without a watermark in the join keys, or a watermark on the nullable side and an appropriate range condition;

approach#2: windowing
org.apache.spark.sql.AnalysisException: [NON_TIME_WINDOW_NOT_SUPPORTED_IN_STREAMING] Window function is not supported in FIRST(s_featureno#89808, TRUE) (as column `s_featureno`) on streaming DataFrames/Datasets.

code snipped from my DLT as follows. please help how to solve it. initially tried to ran without watermark, then added water mark unable to troubleshoot this error.

final_df = df.withColumn(
"time_window",
window("kafka_ts", time_window)
).withColumn(
"s_featureno",
when(
col("event_type") == "WAYB",
col("s_featureno")
).otherwise(
first("s_featureno", ignorenulls=True).over(Window.partitionBy("sno", "s_nr", "SAKey").orderBy("kafka_ts"))
)
)

lets say sample input:
columns = ['sno', 's_nr', 'SAKey', 's_type', 's_featureno']

data = [
('s1', 11, 55, 'ichd', None),
('s1', 11, 55, 'wayb', 9999),
('s2', 12, 66, 'xyza', None),
('s2', 12, 77, 'xyza', None),
('s2', 12, 66, 'wayb', 1111)
]

expected output:
s1 11 55 ichd 9999
s1 11 55 wayb 9999
s2 12 66 xyza 1111
s2 12 66 wayb 1111
s2 12 77 xyza null
3 REPLIES 3

jhonm_839
New Contributor III

Hi hnnhhnnh,

It looks like you're running into two key issues:

  1. Joins in DLT – Stream-stream joins require a watermark on the nullable side and a proper time-based condition. Without this, DLT won’t allow the join. Try adding a watermark and ensuring your join condition includes a time range.

  2. Window Function Limitation – The first() window function isn’t supported in streaming data unless it's within a time-based window. Instead of using first(), try using aggregation methods like last() to get the most recent non-null value.

A better approach would be to use grouping and aggregation rather than a window function. This way, you can retain the latest s_featureno value where available, without breaking DLT's streaming constraints. If your issue persists, check that your watermarking and event ordering are properly set. Hope this helps!

mlivshutz
New Contributor II

Could you recommend an post online to get more detail on using grouping and aggregation within a DLT?

hnnhhnnh
New Contributor II

hi @jhonm_839 
thanks for your response to the post. I appreciate your time.
I have watermark on dataframe in palce and used last(), min() etc.. aggregate functions its still filing.


final_df = df.withWatermark("kafka_ts", "1 hour").withColumn(
"s_featureno",
when(
col("event_type"== "WAYB",
col("s_featureno")
).otherwise(
last("s_featureno"ignorenulls=True).over(Window.partitionBy("sno""s_nr", "SAKey").orderBy("kafka_ts"))
)
)

still getting same error, can you please help me here.