Saturday
Hello everyone!
Iām very very new to Spark Structured Streaming, and not a data engineer š I would appreciate guidance on how to efficiently process streaming data and emit only changed aggregate results over multiple time windows.
Input Stream:
Source: Amazon Kinesis
Microbatch granularity : Every 60 seconds
Schema:
(profile_id, gti, event_timestamp, event_type)
Where:
event_type ā { select, highlight, view }
Time Windows:
We need to maintain counts for rolling aggregates of the following windows:
1 hour
12 hours
24 hours
Output Requirement:
For each (profile_id, gti) combination, I want to emit only the current counts that changed during the current micro-batch.
The output record should look like this:
{
"profile_id": "profileid",
"gti": "amz1.gfgfl",
"select_count_1d": 5,
"select_count_12h": 2,
"select_count_1h": 1,
"highlight_count_1d": 20,
"highlight_count_12h": 10,
"highlight_count_1h": 3,
"view_count_1d": 40,
"view_count_12h": 30,
"view_count_1h": 3
}
Key Requirements:
Per key output: (profile_id, gti)
Emit only changed rows in the current micro-batch
This data is written to a feature store, so we want to avoid rewriting unchanged aggregates
Each emitted record should represent the latest counts for that key
What We Tried:
We implemented sliding window aggregations using groupBy(window()) for each time window. For example:
groupBy(
profile_id,
gti,
window(event_timestamp, windowDuration, "1 minute")
)
Spark didnāt allow joining those three streams for outer join limitation error between streams.
We tried to work around it by writing each stream to the memory and take a snapshot every 60 seconds but it does not only output the changed rows..
How would you go about this problem? Should we maintain three rolling time windows like we tried and find a way to join them or is there any other way you could think of?
Very lost here, any help would be very appreciated!!
yesterday
Greetings @Galih , I did a little research to help you along.
Hereās a pragmatic way to design this so you only emit changed aggregates per (profile_id, gti) across multiple rolling windows, without trying to join multiple streaming aggregations.
TL;DR recommended approach
Build one stateful streaming pipeline that:
Pre-aggregates raw events into 1-minute buckets by event time (keeps cardinality sane, reduces per-event state churn).
Maintains rolling counts for 1h, 12h, and 24h per key using flatMapGroupsWithState (or mapGroupsWithState) plus event-time watermarks for bounded state.
Emits a row only when any of the nine counts changed for that key (compare against last emitted values stored in state).
Upserts only those changed rows into your feature table using foreachBatch + MERGE, so unchanged features never get rewritten.
Net effect: no streaming-streaming joins, exact rolling windows, late-data handling, bounded state, and change-only emission.
Why joining three streaming windows is a headache
Streaming-to-streaming outer joins are heavily restricted; even with watermarks, they donāt really fit this use case cleanly.
Three separate windowed aggregations plus a join typically pushes you into unsupported join types/modes, or brittle workarounds.
Memory sink + snapshotting doesnāt give you āchanged-onlyā output; complete-mode snapshots donāt inherently track deltas since last trigger.
Design: one pipeline with stateful rolling windows
The trick is to store compact per-minute counts in state, then update rolling sums incrementally.
Read and pre-aggregate to 1-minute buckets
Read from Kinesis on a 60s trigger.
Apply a watermark on event time slightly larger than your largest window (example: 25 hours) to allow late events while bounding state.
Group into 1-minute buckets per (profile_id, gti, event_type) to reduce skew and avoid per-event state updates.
PySpark-ish sketch:
from pyspark.sql.functions import col, window
events = (
spark.readStream.format("kinesis")
# .option(...) # your Kinesis options
.load()
.select("profile_id", "gti", "event_timestamp", "event_type")
.withWatermark("event_timestamp", "25 hours")
)
minute_counts = (
events
.groupBy(
"profile_id", "gti", "event_type",
window(col("event_timestamp"), "1 minute")
)
.count()
.select(
"profile_id", "gti", "event_type",
col("window").start.alias("bucket_minute"),
"count"
)
)
Maintain rolling sums with flatMapGroupsWithState
For each (profile_id, gti):
Keep per event_type a deque (or equivalent compact structure) of (bucket_minute, count) covering the last 24h.
Maintain three running sums per event_type: 1h, 12h, 24h.
Append to that event_typeās structure.
Evict anything older than 24h (driven by event time + watermark + event-time timeout).
Update the three sums incrementally (add new minute, subtract evicted minutes that fall out of each horizon).
For each incoming minute bucket:
Build the 9-field output row (3 windows Ć 3 event types).
Compare to lastEmitted in state; emit only if different; otherwise emit nothing.
Scala sketch (often clearer with typed state):
import java.sql.Timestamp
import scala.collection.mutable.ArrayDeque
import org.apache.spark.sql.streaming._
case class Key(profile_id: String, gti: String)
case class MinuteCount(event_type: String, bucket_minute: Timestamp, count: Long)
case class OutputRow(
profile_id: String, gti: String,
select_count_1d: Long, select_count_12h: Long, select_count_1h: Long,
highlight_count_1d: Long, highlight_count_12h: Long, highlight_count_1h: Long,
view_count_1d: Long, view_count_12h: Long, view_count_1h: Long
)
case class RollingState(
buckets: Map[String, ArrayDeque[(Long, Long)]],
sum1h: Map[String, Long],
sum12h: Map[String, Long],
sum24h: Map[String, Long],
lastEmitted: Option[OutputRow],
lastEventTimeMs: Long
)
def updatePerKey(
key: Key,
values: Iterator[MinuteCount],
state: GroupState[RollingState]
): Iterator[OutputRow] = {
val horizon24hMs = 24L * 60L * 60L * 1000L
val horizon12hMs = 12L * 60L * 60L * 1000L
val horizon1hMs = 1L * 60L * 60L * 1000L
val st0 =
if (state.exists) state.get
else RollingState(
buckets = Map("select" -> ArrayDeque.empty, "highlight" -> ArrayDeque.empty, "view" -> ArrayDeque.empty),
sum1h = Map("select" -> 0L, "highlight" -> 0L, "view" -> 0L),
sum12h = Map("select" -> 0L, "highlight" -> 0L, "view" -> 0L),
sum24h = Map("select" -> 0L, "highlight" -> 0L, "view" -> 0L),
lastEmitted = None,
lastEventTimeMs = 0L
)
// NOTE: this is illustrative; in production youāll want clean, exact eviction + sum maintenance.
var st = st0
values.foreach { v =>
val et = v.event_type
val minuteMs = v.bucket_minute.getTime
val deque = st.buckets(et)
// append
deque.append((minuteMs, v.count))
// evict >24h old
val cutoff24 = minuteMs - horizon24hMs
while (deque.nonEmpty && deque.head._1 < cutoff24) {
val (ts, cnt) = deque.removeHead()
st = st.copy(sum24h = st.sum24h.updated(et, st.sum24h(et) - cnt))
if (ts >= minuteMs - horizon12hMs) st = st.copy(sum12h = st.sum12h.updated(et, st.sum12h(et) - cnt))
if (ts >= minuteMs - horizon1hMs) st = st.copy(sum1h = st.sum1h.updated(et, st.sum1h(et) - cnt))
}
// add new bucket
st = st.copy(sum24h = st.sum24h.updated(et, st.sum24h(et) + v.count))
if (minuteMs >= minuteMs - horizon12hMs) st = st.copy(sum12h = st.sum12h.updated(et, st.sum12h(et) + v.count))
if (minuteMs >= minuteMs - horizon1hMs) st = st.copy(sum1h = st.sum1h.updated(et, st.sum1h(et) + v.count))
st = st.copy(lastEventTimeMs = math.max(st.lastEventTimeMs, minuteMs))
}
val out = OutputRow(
key.profile_id, key.gti,
st.sum24h("select"), st.sum12h("select"), st.sum1h("select"),
st.sum24h("highlight"), st.sum12h("highlight"), st.sum1h("highlight"),
st.sum24h("view"), st.sum12h("view"), st.sum1h("view")
)
val changed = st.lastEmitted.forall(_ != out)
state.update(st.copy(lastEmitted = Some(out)))
state.setTimeoutTimestamp(st.lastEventTimeMs + horizon24hMs) // event-time cleanup (with watermark)
if (changed) Iterator(out) else Iterator.empty
}
Notes worth calling out:
The eviction logic above is intentionally āconceptual.ā In production, youāll want to be very precise about what counts are included in each horizon and how you subtract as buckets age out.
Pair GroupStateTimeout.EventTimeTimeout with a proper watermark so Spark can actually garbage-collect state for quiet keys.
For speed, many teams move from deques to compact fixed-size ring buffers indexed by minute offset.
Write changed rows to your feature store via foreachBatch + MERGE
Because youāre emitting only changed keys, each microbatch is naturally small.
PySpark MERGE sketch:
def upsert_features(microbatch_df, batch_id: int):
microbatch_df.createOrReplaceTempView("changed_features")
spark.sql("""
MERGE INTO feature_store f
USING changed_features s
ON f.profile_id = s.profile_id AND f.gti = s.gti
WHEN MATCHED AND (
f.select_count_1d <> s.select_count_1d OR
f.select_count_12h <> s.select_count_12h OR
f.select_count_1h <> s.select_count_1h OR
f.highlight_count_1d <> s.highlight_count_1d OR
f.highlight_count_12h <> s.highlight_count_12h OR
f.highlight_count_1h <> s.highlight_count_1h OR
f.view_count_1d <> s.view_count_1d OR
f.view_count_12h <> s.view_count_12h OR
f.view_count_1h <> s.view_count_1h
)
THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")
(
changedRows
.writeStream
.trigger(processingTime="60 seconds")
.option("checkpointLocation", "dbfs:/checkpoints/your_pipeline")
.foreachBatch(upsert_features)
.start()
)
How this hits your requirements
Per-key output: state is keyed by (profile_id, gti), emission is per key.
Change-only output: lastEmitted in state gates emission.
True rolling windows: 1h/12h/24h are maintained incrementally from minute buckets.
No rewriting unchanged rows: stream emits only changed keys; MERGE updates only when values differ.
Late data: watermark + event-time timeout keeps state bounded and allows late updates within tolerance.
If stateful APIs feel too heavy
You can do a ābatch inside foreachBatchā pattern:
Stream writes minute-bucket counts to an append-only Delta ābucketsā table.
In the same foreachBatch, read only the last 24h of buckets, compute the 1h/12h/24h aggregates in batch, compare to the feature table, and MERGE only changed keys.
Pros: no custom state code.
Cons: more IO per trigger (still bounded to 24h), but often very workable with good partitioning.
Practical tips
Watermark: set slightly larger than your max window (example: 25h for a 24h feature).
Trigger: 60 seconds aligns nicely with minute buckets.
Checkpointing: treat the checkpoint path as immutable for that pipeline.
Idempotency: MERGE is your friend; keep the update clause deterministic.
Skew: pre-aggregation helps a lot; for extreme hot keys consider salting or special handling.
Hopefully these helpful tips/tricks get you past your sticking point.
Cheers, Louis.
2 hours ago
Thank you so much for the detailed and thoughtful response!
Iām very new to the real-time data processing world, so it will take me some time to fully understand and explore the advanced concepts you mentioned. That said, your explanation and suggestions are extremely helpful, and I really appreciate the effort you put into sharing your knowledge. I may follow up with a few questions once Iāve had more time to digest everything and implement that in my code.
yesterday
I would implement stateful streaming by using transformWithStateInPandas to keep the state and implement the logic there.
I would avoid doing stream-stream JOINs.
Passionate about hosting events and connecting people? Help us grow a vibrant local communityāsign up today to get started!
Sign Up Now