cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Traversing to previous rows and getting the data based on condition

RajNath
New Contributor II

Sample Input data set

ClusterIdEventEventTime
1212-18-r9u1kzn1RUNNING2024-02-02T11:38:30.168+00:00
1212-18-r9u1kzn1TERMINATING2024-02-02T13:43:33.933+00:00
1212-18-r9u1kzn1STARTING2024-02-02T15:50:05.174+00:00
1212-18-r9u1kzn1RUNNING2024-02-02T15:54:21.510+00:00
1212-18-r9u1kzn1RUNNING2024-02-02T16:09:20.576+00:00
1212-18-r9u1kzn1RUNNING2024-02-02T16:19:58.744+00:00
1212-18-r9u1kzn1TERMINATING2024-02-02T17:18:33.863+00:00
1212-18-r9u1kzn1STARTING2024-02-02T17:22:38.635+00:00
1212-18-r9u1kzn1RUNNING2024-02-02T17:23:40.781+00:00
1212-18-r9u1kzn1TERMINATING2024-02-02T18:03:33.953+00:00
1212-18-r9u1kzn1STARTING2024-02-02T21:10:21.651+00:00
1212-18-r9u1kzn1RUNNING2024-02-02T21:13:59.842+00:00
1212-18-r9u1kzn1TERMINATING2024-02-02T22:43:34.022+00:00

Below is sample expected output. In this RunningEventTime will show the event time corresponding to the previous running event time for the event "TERMINATING". In case "STARTING" event is present then for that event time should be showing in "StartingEventTime" column.

ClusterIdEventEventTimeRunningEventTimeStartingEventTime
1212-18-r9u1kzn1RUNNING2024-02-02T11:38:30.168+00:00  
1212-18-r9u1kzn1TERMINATING2024-02-02T13:43:33.933+00:002024-02-02T11:38:30.168+00:00 
1212-18-r9u1kzn1STARTING2024-02-02T15:50:05.174+00:00  
1212-18-r9u1kzn1RUNNING2024-02-02T15:54:21.510+00:00  
1212-18-r9u1kzn1RUNNING2024-02-02T16:09:20.576+00:00  
1212-18-r9u1kzn1RUNNING2024-02-02T16:19:58.744+00:00  
1212-18-r9u1kzn1TERMINATING2024-02-02T17:18:33.863+00:00 2024-02-02T15:50:05.174+00:00
1212-18-r9u1kzn1STARTING2024-02-02T17:22:38.635+00:00  
1212-18-r9u1kzn1RUNNING2024-02-02T17:23:40.781+00:00  
1212-18-r9u1kzn1TERMINATING2024-02-02T18:03:33.953+00:00 2024-02-02T17:22:38.635+00:00
1212-18-r9u1kzn1STARTING2024-02-02T21:10:21.651+00:00  
1212-18-r9u1kzn1RUNNING2024-02-02T21:13:59.842+00:00  
1212-18-r9u1kzn1TERMINATING2024-02-02T22:43:34.022+00:00 2024-02-02T21:10:21.651+00:00

I tried few option such using self join but that is not ideal when data set is large. Another option i tried is looping but here also same problem. It will not be good for large data sets. I tried windowing function "lag" but could not make it work. Any suggestion or hint would be really helpful.

2 REPLIES 2

Kaniz_Fatma
Community Manager
Community Manager

Hi @RajNath , Handling event times and aggregations in large datasets can be challenging, but Structured Streaming in Databricks provides powerful tools to address this.

Let’s break down your requirements and explore how you can achieve them:

  1. RunningEventTime for TERMINATING Events:

    • You want to find the event time corresponding to the previous running event time for the “TERMINATING” events.

    • To achieve this, you can use the window function to create time-based windows over the event time column. Specifically, you’ll create tumbling windows of a certain duration (e.g., 5 minutes) and aggregate within those windows.

    • Here’s an example of how you can calculate the average signal strength for each device over 5-minute windows:

      from pyspark.sql.functions import window, avg
      
      # Assuming you have a streaming DataFrame 'eventsDF' with schema [eventTime: timestamp, deviceId: string, signal: bigint]
      windowedAvgSignalDF = eventsDF.groupBy(window("eventTime", "5 minutes")).avg("signal")
      
    • In your case, instead of calculating the average signal strength, you’ll need to adjust the aggregation function to capture the event time corresponding to the previous “TERMINATING” event.

  2. Handling “STARTING” Events:

    • If a “STARTING” event is present, you want to show its event time in the “StartingEventTime” column.
    • You can achieve this by using conditional expressions (such as when and otherwise) to compute the value for the “StartingEventTime” column based on the presence of “STARTING” events.
  3. Watermarking and Late Data Handling:

    • Watermarking is crucial for handling late data in event-time processing. It allows you to specify a threshold beyond which late events are dropped.
    • Set an appropriate watermark duration based on your data characteristics. For example, if your data tends to arrive late by up to 10 minutes, set the watermark to 10 minutes.
    • Late events (those arriving after the watermark) won’t be considered for aggregation, ensuring correctness.
  4. Avoiding Self-Joins and Loops:

    • As you mentioned, self-joins and loops can be inefficient for large datasets.
    • Structured Streaming’s windowing functions provide a more efficient way to handle event-time aggregations without explicit self-joins.

If you encounter any issues or need further assistance, feel free to ask! 😊

For more detailed information, you can refer to the Databricks blog post on event-time aggregation and watermarking.1

 

RajNath
New Contributor II

This event data does not have specific patter, I can not group it based on interval. Only option what is see is self join or looping. But i want to avoid it, is there any other option for given data set?

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group