Greetings @IM_01 , I did a little research and I have some helpful hints to share.
What youโre seeing isnโt a bug, and itโs not specific to Lakeflow SDP. Itโs just how Spark Structured Streaming works.
At a high level, Structured Streaming only supports time-based windows built with window() on a timestamp column. Once you move into arbitrary SQL window functions โ things like row_number() over (...), min() over (...), sum() over (...) โ youโre outside what streaming can handle. Thatโs exactly why youโre hitting NON_TIME_WINDOW_NOT_SUPPORTED_IN_STREAMING.
So the real question becomes: what are you actually trying to compute? From there, the path usually falls into one of three patterns.
First, if youโre really after per-key, per-time-window aggregates, youโre in good shape โ you just need to express it the โstreaming way.โ That means grouping by a time window and using watermarking to manage late data. Something like this:
from pyspark.sql.functions import window, col, sum, min
agg_df = (
df
.withWatermark("event_time", "10 minutes")
.groupBy(
window(col("event_time"), "5 minutes"),
col("key_col")
)
.agg(
sum("value").alias("value_sum"),
min("value").alias("value_min")
)
)
This keeps everything fully streaming and within the supported model.
Second, if you truly need analytic window functions โ ranking, running totals, that kind of thing โ streaming isnโt the right place to do it directly.
Youโve got two practical options.
The cleanest pattern is a two-step design. Use Lakeflow SDP (or standard streaming) for what itโs good at โ filtering, deduping, time-windowed aggregations โ and land the results in a Delta table. Then run a batch job (or non-streaming Lakeflow pipeline) on top of that where you can freely use row_number(), min() over (...), etc. You just schedule that second step based on how fresh the data needs to be.
The other option is foreachBatch. If your logic doesnโt need state across micro-batches, you can treat each batch like a static DataFrame and apply window functions there. Just be careful: if your logic depends on historical context, youโll need to pull in existing data (e.g., from your target table) and union it with the current batch before applying the window logic.
Third, a lot of the time row_number() is being used for a simpler goal โ โgive me the latest record per key.โ If thatโs the case, you donโt need window functions at all. Streaming already gives you better-native patterns:
-
Stateful aggregation (e.g., max_by-style logic)
-
Watermarked dedup with .dropDuplicates(key_cols + [time_col])
It naturally follows that the constraint here isnโt really a limitation โ itโs a nudge toward using patterns that are actually scalable in a streaming system.
Hope this helps, Louis.