cancel
Showing results forĀ 
Search instead forĀ 
Did you mean:Ā 
Khoros Community Forums Support (Not for Databricks Product Questions)
Please use this forum to alert the Community administrators to issues with the Khoros platform and moderation concerns. This is not a forum for Databricks product questions.
cancel
Showing results forĀ 
Search instead forĀ 
Did you mean:Ā 

Spark structured streaming- calculate signal, help required! šŸ™

Galih
New Contributor

Hello everyone!

I’m very very new to Spark Structured Streaming, and not a data engineer šŸ˜…I would appreciate guidance on how to efficiently process streaming data and emit only changed aggregate results over multiple time windows.

Input Stream:

Source: Amazon Kinesis

Microbatch granularity : Every 60 seconds

Schema:

(profile_id, gti, event_timestamp, event_type)

Where:

event_type ∈ { select, highlight, view }

Time Windows:

We need to maintain counts for rolling aggregates of the following windows:

1 hour

12 hours

24 hours

Output Requirement:

For each (profile_id, gti) combination, I want to emit only the current counts that changed during the current micro-batch.

The output record should look like this:

{

"profile_id": "profileid",

"gti": "amz1.gfgfl",

"select_count_1d": 5,

"select_count_12h": 2,

"select_count_1h": 1,

"highlight_count_1d": 20,

"highlight_count_12h": 10,

"highlight_count_1h": 3,

"view_count_1d": 40,

"view_count_12h": 30,

"view_count_1h": 3

}

Key Requirements:

Per key output: (profile_id, gti)

Emit only changed rows in the current micro-batch

This data is written to a feature store, so we want to avoid rewriting unchanged aggregates

Each emitted record should represent the latest counts for that key

What We Tried:

We implemented sliding window aggregations using groupBy(window()) for each time window. For example:

groupBy(

profile_id,

gti,

window(event_timestamp, windowDuration, "1 minute")

)

Spark didn’t allow joining those three streams for outer join limitation error between streams.

We tried to work around it by writing each stream to the memory and take a snapshot every 60 seconds but it does not only output the changed rows..

How would you go about this problem? Should we maintain three rolling time windows like we tried and find a way to join them or is there any other way you could think of?

Very lost here, any help would be very appreciated!!

1 REPLY 1

Louis_Frolio
Databricks Employee
Databricks Employee

Greetings @Galih , I did a little research to help you along.

Here’s a pragmatic way to design this so you only emit changed aggregates per (profile_id, gti) across multiple rolling windows, without trying to join multiple streaming aggregations.

TL;DR recommended approach

Build one stateful streaming pipeline that:

  • Pre-aggregates raw events into 1-minute buckets by event time (keeps cardinality sane, reduces per-event state churn).

  • Maintains rolling counts for 1h, 12h, and 24h per key using flatMapGroupsWithState (or mapGroupsWithState) plus event-time watermarks for bounded state.

  • Emits a row only when any of the nine counts changed for that key (compare against last emitted values stored in state).

  • Upserts only those changed rows into your feature table using foreachBatch + MERGE, so unchanged features never get rewritten.

Net effect: no streaming-streaming joins, exact rolling windows, late-data handling, bounded state, and change-only emission.

Why joining three streaming windows is a headache

  • Streaming-to-streaming outer joins are heavily restricted; even with watermarks, they don’t really fit this use case cleanly.

  • Three separate windowed aggregations plus a join typically pushes you into unsupported join types/modes, or brittle workarounds.

  • Memory sink + snapshotting doesn’t give you ā€œchanged-onlyā€ output; complete-mode snapshots don’t inherently track deltas since last trigger.

Design: one pipeline with stateful rolling windows

The trick is to store compact per-minute counts in state, then update rolling sums incrementally.

  1. Read and pre-aggregate to 1-minute buckets

  • Read from Kinesis on a 60s trigger.

  • Apply a watermark on event time slightly larger than your largest window (example: 25 hours) to allow late events while bounding state.

  • Group into 1-minute buckets per (profile_id, gti, event_type) to reduce skew and avoid per-event state updates.

 

PySpark-ish sketch:

from pyspark.sql.functions import col, window

events = (
  spark.readStream.format("kinesis")
    # .option(...)  # your Kinesis options
    .load()
    .select("profile_id", "gti", "event_timestamp", "event_type")
    .withWatermark("event_timestamp", "25 hours")
)

minute_counts = (
  events
  .groupBy(
    "profile_id", "gti", "event_type",
    window(col("event_timestamp"), "1 minute")
  )
  .count()
  .select(
    "profile_id", "gti", "event_type",
    col("window").start.alias("bucket_minute"),
    "count"
  )
)

 

  1. Maintain rolling sums with flatMapGroupsWithState

    For each (profile_id, gti):

  • Keep per event_type a deque (or equivalent compact structure) of (bucket_minute, count) covering the last 24h.

  • Maintain three running sums per event_type: 1h, 12h, 24h.

    • Append to that event_type’s structure.

    • Evict anything older than 24h (driven by event time + watermark + event-time timeout).

    • Update the three sums incrementally (add new minute, subtract evicted minutes that fall out of each horizon).

      For each incoming minute bucket:

  • Build the 9-field output row (3 windows Ɨ 3 event types).

  • Compare to lastEmitted in state; emit only if different; otherwise emit nothing.

 

Scala sketch (often clearer with typed state):

import java.sql.Timestamp
import scala.collection.mutable.ArrayDeque
import org.apache.spark.sql.streaming._

case class Key(profile_id: String, gti: String)
case class MinuteCount(event_type: String, bucket_minute: Timestamp, count: Long)

case class OutputRow(
  profile_id: String, gti: String,
  select_count_1d: Long, select_count_12h: Long, select_count_1h: Long,
  highlight_count_1d: Long, highlight_count_12h: Long, highlight_count_1h: Long,
  view_count_1d: Long, view_count_12h: Long, view_count_1h: Long
)

case class RollingState(
  buckets: Map[String, ArrayDeque[(Long, Long)]],
  sum1h: Map[String, Long],
  sum12h: Map[String, Long],
  sum24h: Map[String, Long],
  lastEmitted: Option[OutputRow],
  lastEventTimeMs: Long
)

def updatePerKey(
  key: Key,
  values: Iterator[MinuteCount],
  state: GroupState[RollingState]
): Iterator[OutputRow] = {

  val horizon24hMs = 24L * 60L * 60L * 1000L
  val horizon12hMs = 12L * 60L * 60L * 1000L
  val horizon1hMs  =  1L * 60L * 60L * 1000L

  val st0 =
    if (state.exists) state.get
    else RollingState(
      buckets = Map("select" -> ArrayDeque.empty, "highlight" -> ArrayDeque.empty, "view" -> ArrayDeque.empty),
      sum1h = Map("select" -> 0L, "highlight" -> 0L, "view" -> 0L),
      sum12h = Map("select" -> 0L, "highlight" -> 0L, "view" -> 0L),
      sum24h = Map("select" -> 0L, "highlight" -> 0L, "view" -> 0L),
      lastEmitted = None,
      lastEventTimeMs = 0L
    )

  // NOTE: this is illustrative; in production you’ll want clean, exact eviction + sum maintenance.
  var st = st0

  values.foreach { v =>
    val et = v.event_type
    val minuteMs = v.bucket_minute.getTime
    val deque = st.buckets(et)

    // append
    deque.append((minuteMs, v.count))

    // evict >24h old
    val cutoff24 = minuteMs - horizon24hMs
    while (deque.nonEmpty && deque.head._1 < cutoff24) {
      val (ts, cnt) = deque.removeHead()
      st = st.copy(sum24h = st.sum24h.updated(et, st.sum24h(et) - cnt))
      if (ts >= minuteMs - horizon12hMs) st = st.copy(sum12h = st.sum12h.updated(et, st.sum12h(et) - cnt))
      if (ts >= minuteMs - horizon1hMs)  st = st.copy(sum1h  = st.sum1h.updated(et,  st.sum1h(et)  - cnt))
    }

    // add new bucket
    st = st.copy(sum24h = st.sum24h.updated(et, st.sum24h(et) + v.count))
    if (minuteMs >= minuteMs - horizon12hMs) st = st.copy(sum12h = st.sum12h.updated(et, st.sum12h(et) + v.count))
    if (minuteMs >= minuteMs - horizon1hMs)  st = st.copy(sum1h  = st.sum1h.updated(et,  st.sum1h(et)  + v.count))

    st = st.copy(lastEventTimeMs = math.max(st.lastEventTimeMs, minuteMs))
  }

  val out = OutputRow(
    key.profile_id, key.gti,
    st.sum24h("select"), st.sum12h("select"), st.sum1h("select"),
    st.sum24h("highlight"), st.sum12h("highlight"), st.sum1h("highlight"),
    st.sum24h("view"), st.sum12h("view"), st.sum1h("view")
  )

  val changed = st.lastEmitted.forall(_ != out)

  state.update(st.copy(lastEmitted = Some(out)))
  state.setTimeoutTimestamp(st.lastEventTimeMs + horizon24hMs) // event-time cleanup (with watermark)

  if (changed) Iterator(out) else Iterator.empty
}

Notes worth calling out:

  • The eviction logic above is intentionally ā€œconceptual.ā€ In production, you’ll want to be very precise about what counts are included in each horizon and how you subtract as buckets age out.

  • Pair GroupStateTimeout.EventTimeTimeout with a proper watermark so Spark can actually garbage-collect state for quiet keys.

  • For speed, many teams move from deques to compact fixed-size ring buffers indexed by minute offset.

  •  
  1. Write changed rows to your feature store via foreachBatch + MERGE

    Because you’re emitting only changed keys, each microbatch is naturally small.

 

PySpark MERGE sketch:

def upsert_features(microbatch_df, batch_id: int):
    microbatch_df.createOrReplaceTempView("changed_features")

    spark.sql("""
      MERGE INTO feature_store f
      USING changed_features s
      ON f.profile_id = s.profile_id AND f.gti = s.gti
      WHEN MATCHED AND (
         f.select_count_1d  <> s.select_count_1d  OR
         f.select_count_12h <> s.select_count_12h OR
         f.select_count_1h  <> s.select_count_1h  OR
         f.highlight_count_1d  <> s.highlight_count_1d  OR
         f.highlight_count_12h <> s.highlight_count_12h OR
         f.highlight_count_1h  <> s.highlight_count_1h  OR
         f.view_count_1d  <> s.view_count_1d  OR
         f.view_count_12h <> s.view_count_12h OR
         f.view_count_1h  <> s.view_count_1h
      )
      THEN UPDATE SET *
      WHEN NOT MATCHED THEN INSERT *
    """)

(
  changedRows
  .writeStream
  .trigger(processingTime="60 seconds")
  .option("checkpointLocation", "dbfs:/checkpoints/your_pipeline")
  .foreachBatch(upsert_features)
  .start()
)

How this hits your requirements

  • Per-key output: state is keyed by (profile_id, gti), emission is per key.

  • Change-only output: lastEmitted in state gates emission.

  • True rolling windows: 1h/12h/24h are maintained incrementally from minute buckets.

  • No rewriting unchanged rows: stream emits only changed keys; MERGE updates only when values differ.

  • Late data: watermark + event-time timeout keeps state bounded and allows late updates within tolerance.

 

If stateful APIs feel too heavy

You can do a ā€œbatch inside foreachBatchā€ pattern:

  • Stream writes minute-bucket counts to an append-only Delta ā€œbucketsā€ table.

  • In the same foreachBatch, read only the last 24h of buckets, compute the 1h/12h/24h aggregates in batch, compare to the feature table, and MERGE only changed keys.

Pros: no custom state code.

Cons: more IO per trigger (still bounded to 24h), but often very workable with good partitioning.

Practical tips

  • Watermark: set slightly larger than your max window (example: 25h for a 24h feature).

  • Trigger: 60 seconds aligns nicely with minute buckets.

  • Checkpointing: treat the checkpoint path as immutable for that pipeline.

  • Idempotency: MERGE is your friend; keep the update clause deterministic.

  • Skew: pre-aggregation helps a lot; for extreme hot keys consider salting or special handling.

 

Hopefully these helpful tips/tricks get you past your sticking point.

Cheers, Louis.

Join Us as a Local Community Builder!

Passionate about hosting events and connecting people? Help us grow a vibrant local community—sign up today to get started!

Sign Up Now