cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

DLT streaming with sliding window missing last windows interval

exilon
New Contributor

Hello, 

I have a DLT pipeline where I want to calculate the rolling average of a column for the last 24 hours which is updated every hour.

I'm using the below code to achieve this:

 

 

 

 

 

 

 

@Dlt.table()
def gold():
    df = dlt.read_stream("silver_table")
    
    # Define window for 24 hours with 1-hour slide
    window_spec_24h = window("fetch_ts", "24 hours", "1 hour")


    df.withWatermark("fetch_ts", "10 minutes").groupBy(df.Id, window_spec_24h).agg(F.avg("foo")).alias("average_foo_24h"))


    return df

 

 

 

 

 

 

 

 

My issue is, I'm always missing the last window in my result df. For instance, if my input df has the following fetch_ts values:

2024-02-23T15:04:00.000
2024-02-23T16:04:00.000
2024-02-23T16:05:00.000
2024-02-23T16:54:00.000
2024-02-23T17:06:00.000
2024-02-23T18:54:00.000

the output df has the following windows:

{"start":"2024-02-22T16:00:00.000Z","end":"2024-02-23T16:00:00.000Z"}
{"start":"2024-02-22T17:00:00.000Z","end":"2024-02-23T17:00:00.000Z"}
{"start":"2024-02-22T18:00:00.000Z","end":"2024-02-23T18:00:00.000Z"}

which means that my last row with the "2024-02-23T18:54:00.000" fetch_ts is getting excluded in the calculation.

Any idea why this is happening? Or is it by design and I'm missing something? Is there a way to add the last window ({"start":"2024-02-22T19:00:00.000Z","end":"2024-02-23T19:00:00.000Z"}) as well so that I can include the last row in my calculation?

Thanks and regards,

1 REPLY 1

Kaniz
Community Manager
Community Manager

Hi @exilonIt seems like you’re trying to calculate a rolling average for a specific time window in your DLT pipeline. Let’s address the issue you’re facing.

The behavior you’re observing is due to the way the window specification is defined. When you create a window with a sliding interval (in your case, 1 hour), the last window might not cover the entire 24-hour period. This is why the last row with the “2024-02-23T18:54:00.000” fetch_ts is excluded from the calculation.

To include the last window and ensure that all data points are considered, you can adjust the window specification. Instead of using a sliding interval, create a fixed-size window that covers exactly 24 hours. Here’s how you can modify your code:

@Dlt.table()
def gold():
    df = dlt.read_stream("silver_table")
    
    # Define fixed-size window for 24 hours
    window_spec_24h = window("fetch_ts", "24 hours")

    # Calculate the average of the "foo" column within the window
    result_df = df.withWatermark("fetch_ts", "10 minutes") \
                  .groupBy(df.Id, window_spec_24h) \
                  .agg(F.avg("foo").alias("average_foo_24h"))

    return result_df

By using a fixed-size window, you’ll ensure that the last window covers the entire 24-hour period, including the data point with the “2024-02-23T18:54:00.000” fetch_ts. This should resolve the issue you’re facing.

Remember to adjust the window specification according to your specific requirements, such as the desired aggregation interval and watermark duration. If you need further assistance, feel free to ask! 😊

 
Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.