cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

DLT streaming with sliding window missing last windows interval

exilon
New Contributor

Hello, 

I have a DLT pipeline where I want to calculate the rolling average of a column for the last 24 hours which is updated every hour.

I'm using the below code to achieve this:

 

 

 

 

 

 

 

@Dlt.table()
def gold():
    df = dlt.read_stream("silver_table")
    
    # Define window for 24 hours with 1-hour slide
    window_spec_24h = window("fetch_ts", "24 hours", "1 hour")


    df.withWatermark("fetch_ts", "10 minutes").groupBy(df.Id, window_spec_24h).agg(F.avg("foo")).alias("average_foo_24h"))


    return df

 

 

 

 

 

 

 

 

My issue is, I'm always missing the last window in my result df. For instance, if my input df has the following fetch_ts values:

2024-02-23T15:04:00.000
2024-02-23T16:04:00.000
2024-02-23T16:05:00.000
2024-02-23T16:54:00.000
2024-02-23T17:06:00.000
2024-02-23T18:54:00.000

the output df has the following windows:

{"start":"2024-02-22T16:00:00.000Z","end":"2024-02-23T16:00:00.000Z"}
{"start":"2024-02-22T17:00:00.000Z","end":"2024-02-23T17:00:00.000Z"}
{"start":"2024-02-22T18:00:00.000Z","end":"2024-02-23T18:00:00.000Z"}

which means that my last row with the "2024-02-23T18:54:00.000" fetch_ts is getting excluded in the calculation.

Any idea why this is happening? Or is it by design and I'm missing something? Is there a way to add the last window ({"start":"2024-02-22T19:00:00.000Z","end":"2024-02-23T19:00:00.000Z"}) as well so that I can include the last row in my calculation?

Thanks and regards,

1 REPLY 1

Kaniz_Fatma
Community Manager
Community Manager

Hi @exilonIt seems like you’re trying to calculate a rolling average for a specific time window in your DLT pipeline. Let’s address the issue you’re facing.

The behavior you’re observing is due to the way the window specification is defined. When you create a window with a sliding interval (in your case, 1 hour), the last window might not cover the entire 24-hour period. This is why the last row with the “2024-02-23T18:54:00.000” fetch_ts is excluded from the calculation.

To include the last window and ensure that all data points are considered, you can adjust the window specification. Instead of using a sliding interval, create a fixed-size window that covers exactly 24 hours. Here’s how you can modify your code:

@Dlt.table()
def gold():
    df = dlt.read_stream("silver_table")
    
    # Define fixed-size window for 24 hours
    window_spec_24h = window("fetch_ts", "24 hours")

    # Calculate the average of the "foo" column within the window
    result_df = df.withWatermark("fetch_ts", "10 minutes") \
                  .groupBy(df.Id, window_spec_24h) \
                  .agg(F.avg("foo").alias("average_foo_24h"))

    return result_df

By using a fixed-size window, you’ll ensure that the last window covers the entire 24-hour period, including the data point with the “2024-02-23T18:54:00.000” fetch_ts. This should resolve the issue you’re facing.

Remember to adjust the window specification according to your specific requirements, such as the desired aggregation interval and watermark duration. If you need further assistance, feel free to ask! 😊

 

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group