Hello,
I have a DLT pipeline where I want to calculate the rolling average of a column for the last 24 hours which is updated every hour.
I'm using the below code to achieve this:
@Dlt.table()
def gold():
df = dlt.read_stream("silver_table")
# Define window for 24 hours with 1-hour slide
window_spec_24h = window("fetch_ts", "24 hours", "1 hour")
df.withWatermark("fetch_ts", "10 minutes").groupBy(df.Id, window_spec_24h).agg(F.avg("foo")).alias("average_foo_24h"))
return df
My issue is, I'm always missing the last window in my result df. For instance, if my input df has the following fetch_ts values:
2024-02-23T15:04:00.000
2024-02-23T16:04:00.000
2024-02-23T16:05:00.000
2024-02-23T16:54:00.000
2024-02-23T17:06:00.000
2024-02-23T18:54:00.000
the output df has the following windows:
{"start":"2024-02-22T16:00:00.000Z","end":"2024-02-23T16:00:00.000Z"}
{"start":"2024-02-22T17:00:00.000Z","end":"2024-02-23T17:00:00.000Z"}
{"start":"2024-02-22T18:00:00.000Z","end":"2024-02-23T18:00:00.000Z"}
which means that my last row with the "2024-02-23T18:54:00.000" fetch_ts is getting excluded in the calculation.
Any idea why this is happening? Or is it by design and I'm missing something? Is there a way to add the last window ({"start":"2024-02-22T19:00:00.000Z","end":"2024-02-23T19:00:00.000Z"}) as well so that I can include the last row in my calculation?
Thanks and regards,