i have a DLT pipeline, wanted to update a perticular column value from other records, trying it using windowing first_value geting below error.
approach#1: join
I tried selfjoin initially, failing with joins are not possible in DLT with below error.
org.apache.spark.sql.catalyst.ExtendedAnalysisException: Stream-stream LeftOuter join between two streaming DataFrame/Datasets is not supported without a watermark in the join keys, or a watermark on the nullable side and an appropriate range condition;
approach#2: windowing
org.apache.spark.sql.AnalysisException: [NON_TIME_WINDOW_NOT_SUPPORTED_IN_STREAMING] Window function is not supported in FIRST(s_featureno#89808, TRUE) (as column `s_featureno`) on streaming DataFrames/Datasets.
code snipped from my DLT as follows. please help how to solve it. initially tried to ran without watermark, then added water mark unable to troubleshoot this error.
final_df = df.withColumn(
"time_window",
window("kafka_ts", time_window)
).withColumn(
"s_featureno",
when(
col("event_type") == "WAYB",
col("s_featureno")
).otherwise(
first("s_featureno", ignorenulls=True).over(Window.partitionBy("sno", "s_nr", "SAKey").orderBy("kafka_ts"))
)
)
lets say sample input:
columns = ['sno', 's_nr', 'SAKey', 's_type', 's_featureno']
data = [
('s1', 11, 55, 'ichd', None),
('s1', 11, 55, 'wayb', 9999),
('s2', 12, 66, 'xyza', None),
('s2', 12, 77, 'xyza', None),
('s2', 12, 66, 'wayb', 1111)
]
expected output:
s1 11 55 ichd 9999
s1 11 55 wayb 9999
s2 12 66 xyza 1111
s2 12 66 wayb 1111
s2 12 77 xyza null