cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

Traversing to previous rows and getting the data based on condition

RajNath
New Contributor II

Sample Input data set

ClusterIdEventEventTime
1212-18-r9u1kzn1RUNNING2024-02-02T11:38:30.168+00:00
1212-18-r9u1kzn1TERMINATING2024-02-02T13:43:33.933+00:00
1212-18-r9u1kzn1STARTING2024-02-02T15:50:05.174+00:00
1212-18-r9u1kzn1RUNNING2024-02-02T15:54:21.510+00:00
1212-18-r9u1kzn1RUNNING2024-02-02T16:09:20.576+00:00
1212-18-r9u1kzn1RUNNING2024-02-02T16:19:58.744+00:00
1212-18-r9u1kzn1TERMINATING2024-02-02T17:18:33.863+00:00
1212-18-r9u1kzn1STARTING2024-02-02T17:22:38.635+00:00
1212-18-r9u1kzn1RUNNING2024-02-02T17:23:40.781+00:00
1212-18-r9u1kzn1TERMINATING2024-02-02T18:03:33.953+00:00
1212-18-r9u1kzn1STARTING2024-02-02T21:10:21.651+00:00
1212-18-r9u1kzn1RUNNING2024-02-02T21:13:59.842+00:00
1212-18-r9u1kzn1TERMINATING2024-02-02T22:43:34.022+00:00

Below is sample expected output. In this RunningEventTime will show the event time corresponding to the previous running event time for the event "TERMINATING". In case "STARTING" event is present then for that event time should be showing in "StartingEventTime" column.

ClusterIdEventEventTimeRunningEventTimeStartingEventTime
1212-18-r9u1kzn1RUNNING2024-02-02T11:38:30.168+00:00  
1212-18-r9u1kzn1TERMINATING2024-02-02T13:43:33.933+00:002024-02-02T11:38:30.168+00:00 
1212-18-r9u1kzn1STARTING2024-02-02T15:50:05.174+00:00  
1212-18-r9u1kzn1RUNNING2024-02-02T15:54:21.510+00:00  
1212-18-r9u1kzn1RUNNING2024-02-02T16:09:20.576+00:00  
1212-18-r9u1kzn1RUNNING2024-02-02T16:19:58.744+00:00  
1212-18-r9u1kzn1TERMINATING2024-02-02T17:18:33.863+00:00 2024-02-02T15:50:05.174+00:00
1212-18-r9u1kzn1STARTING2024-02-02T17:22:38.635+00:00  
1212-18-r9u1kzn1RUNNING2024-02-02T17:23:40.781+00:00  
1212-18-r9u1kzn1TERMINATING2024-02-02T18:03:33.953+00:00 2024-02-02T17:22:38.635+00:00
1212-18-r9u1kzn1STARTING2024-02-02T21:10:21.651+00:00  
1212-18-r9u1kzn1RUNNING2024-02-02T21:13:59.842+00:00  
1212-18-r9u1kzn1TERMINATING2024-02-02T22:43:34.022+00:00 2024-02-02T21:10:21.651+00:00

I tried few option such using self join but that is not ideal when data set is large. Another option i tried is looping but here also same problem. It will not be good for large data sets. I tried windowing function "lag" but could not make it work. Any suggestion or hint would be really helpful.

2 REPLIES 2

Kaniz
Community Manager
Community Manager

Hi @RajNath , Handling event times and aggregations in large datasets can be challenging, but Structured Streaming in Databricks provides powerful tools to address this.

Let’s break down your requirements and explore how you can achieve them:

  1. RunningEventTime for TERMINATING Events:

    • You want to find the event time corresponding to the previous running event time for the “TERMINATING” events.

    • To achieve this, you can use the window function to create time-based windows over the event time column. Specifically, you’ll create tumbling windows of a certain duration (e.g., 5 minutes) and aggregate within those windows.

    • Here’s an example of how you can calculate the average signal strength for each device over 5-minute windows:

      from pyspark.sql.functions import window, avg
      
      # Assuming you have a streaming DataFrame 'eventsDF' with schema [eventTime: timestamp, deviceId: string, signal: bigint]
      windowedAvgSignalDF = eventsDF.groupBy(window("eventTime", "5 minutes")).avg("signal")
      
    • In your case, instead of calculating the average signal strength, you’ll need to adjust the aggregation function to capture the event time corresponding to the previous “TERMINATING” event.

  2. Handling “STARTING” Events:

    • If a “STARTING” event is present, you want to show its event time in the “StartingEventTime” column.
    • You can achieve this by using conditional expressions (such as when and otherwise) to compute the value for the “StartingEventTime” column based on the presence of “STARTING” events.
  3. Watermarking and Late Data Handling:

    • Watermarking is crucial for handling late data in event-time processing. It allows you to specify a threshold beyond which late events are dropped.
    • Set an appropriate watermark duration based on your data characteristics. For example, if your data tends to arrive late by up to 10 minutes, set the watermark to 10 minutes.
    • Late events (those arriving after the watermark) won’t be considered for aggregation, ensuring correctness.
  4. Avoiding Self-Joins and Loops:

    • As you mentioned, self-joins and loops can be inefficient for large datasets.
    • Structured Streaming’s windowing functions provide a more efficient way to handle event-time aggregations without explicit self-joins.

If you encounter any issues or need further assistance, feel free to ask! 😊

For more detailed information, you can refer to the Databricks blog post on event-time aggregation and watermarking.1

 

RajNath
New Contributor II

This event data does not have specific patter, I can not group it based on interval. Only option what is see is self join or looping. But i want to avoid it, is there any other option for given data set?

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.