Greetings @Galih , I did a little research to help you along.
Hereās a pragmatic way to design this so you only emit changed aggregates per (profile_id, gti) across multiple rolling windows, without trying to join multiple streaming aggregations.
TL;DR recommended approach
Build one stateful streaming pipeline that:
-
Pre-aggregates raw events into 1-minute buckets by event time (keeps cardinality sane, reduces per-event state churn).
-
Maintains rolling counts for 1h, 12h, and 24h per key using flatMapGroupsWithState (or mapGroupsWithState) plus event-time watermarks for bounded state.
-
Emits a row only when any of the nine counts changed for that key (compare against last emitted values stored in state).
-
Upserts only those changed rows into your feature table using foreachBatch + MERGE, so unchanged features never get rewritten.
Net effect: no streaming-streaming joins, exact rolling windows, late-data handling, bounded state, and change-only emission.
Why joining three streaming windows is a headache
-
Streaming-to-streaming outer joins are heavily restricted; even with watermarks, they donāt really fit this use case cleanly.
-
Three separate windowed aggregations plus a join typically pushes you into unsupported join types/modes, or brittle workarounds.
-
Memory sink + snapshotting doesnāt give you āchanged-onlyā output; complete-mode snapshots donāt inherently track deltas since last trigger.
Design: one pipeline with stateful rolling windows
The trick is to store compact per-minute counts in state, then update rolling sums incrementally.
-
Read and pre-aggregate to 1-minute buckets
-
Read from Kinesis on a 60s trigger.
-
Apply a watermark on event time slightly larger than your largest window (example: 25 hours) to allow late events while bounding state.
-
Group into 1-minute buckets per (profile_id, gti, event_type) to reduce skew and avoid per-event state updates.
PySpark-ish sketch:
from pyspark.sql.functions import col, window
events = (
spark.readStream.format("kinesis")
# .option(...) # your Kinesis options
.load()
.select("profile_id", "gti", "event_timestamp", "event_type")
.withWatermark("event_timestamp", "25 hours")
)
minute_counts = (
events
.groupBy(
"profile_id", "gti", "event_type",
window(col("event_timestamp"), "1 minute")
)
.count()
.select(
"profile_id", "gti", "event_type",
col("window").start.alias("bucket_minute"),
"count"
)
)
-
Maintain rolling sums with flatMapGroupsWithState
For each (profile_id, gti):
-
Keep per event_type a deque (or equivalent compact structure) of (bucket_minute, count) covering the last 24h.
-
Maintain three running sums per event_type: 1h, 12h, 24h.
-
-
Append to that event_typeās structure.
-
Evict anything older than 24h (driven by event time + watermark + event-time timeout).
-
Update the three sums incrementally (add new minute, subtract evicted minutes that fall out of each horizon).
For each incoming minute bucket:
-
Build the 9-field output row (3 windows Ć 3 event types).
-
Compare to lastEmitted in state; emit only if different; otherwise emit nothing.
Scala sketch (often clearer with typed state):
import java.sql.Timestamp
import scala.collection.mutable.ArrayDeque
import org.apache.spark.sql.streaming._
case class Key(profile_id: String, gti: String)
case class MinuteCount(event_type: String, bucket_minute: Timestamp, count: Long)
case class OutputRow(
profile_id: String, gti: String,
select_count_1d: Long, select_count_12h: Long, select_count_1h: Long,
highlight_count_1d: Long, highlight_count_12h: Long, highlight_count_1h: Long,
view_count_1d: Long, view_count_12h: Long, view_count_1h: Long
)
case class RollingState(
buckets: Map[String, ArrayDeque[(Long, Long)]],
sum1h: Map[String, Long],
sum12h: Map[String, Long],
sum24h: Map[String, Long],
lastEmitted: Option[OutputRow],
lastEventTimeMs: Long
)
def updatePerKey(
key: Key,
values: Iterator[MinuteCount],
state: GroupState[RollingState]
): Iterator[OutputRow] = {
val horizon24hMs = 24L * 60L * 60L * 1000L
val horizon12hMs = 12L * 60L * 60L * 1000L
val horizon1hMs = 1L * 60L * 60L * 1000L
val st0 =
if (state.exists) state.get
else RollingState(
buckets = Map("select" -> ArrayDeque.empty, "highlight" -> ArrayDeque.empty, "view" -> ArrayDeque.empty),
sum1h = Map("select" -> 0L, "highlight" -> 0L, "view" -> 0L),
sum12h = Map("select" -> 0L, "highlight" -> 0L, "view" -> 0L),
sum24h = Map("select" -> 0L, "highlight" -> 0L, "view" -> 0L),
lastEmitted = None,
lastEventTimeMs = 0L
)
// NOTE: this is illustrative; in production youāll want clean, exact eviction + sum maintenance.
var st = st0
values.foreach { v =>
val et = v.event_type
val minuteMs = v.bucket_minute.getTime
val deque = st.buckets(et)
// append
deque.append((minuteMs, v.count))
// evict >24h old
val cutoff24 = minuteMs - horizon24hMs
while (deque.nonEmpty && deque.head._1 < cutoff24) {
val (ts, cnt) = deque.removeHead()
st = st.copy(sum24h = st.sum24h.updated(et, st.sum24h(et) - cnt))
if (ts >= minuteMs - horizon12hMs) st = st.copy(sum12h = st.sum12h.updated(et, st.sum12h(et) - cnt))
if (ts >= minuteMs - horizon1hMs) st = st.copy(sum1h = st.sum1h.updated(et, st.sum1h(et) - cnt))
}
// add new bucket
st = st.copy(sum24h = st.sum24h.updated(et, st.sum24h(et) + v.count))
if (minuteMs >= minuteMs - horizon12hMs) st = st.copy(sum12h = st.sum12h.updated(et, st.sum12h(et) + v.count))
if (minuteMs >= minuteMs - horizon1hMs) st = st.copy(sum1h = st.sum1h.updated(et, st.sum1h(et) + v.count))
st = st.copy(lastEventTimeMs = math.max(st.lastEventTimeMs, minuteMs))
}
val out = OutputRow(
key.profile_id, key.gti,
st.sum24h("select"), st.sum12h("select"), st.sum1h("select"),
st.sum24h("highlight"), st.sum12h("highlight"), st.sum1h("highlight"),
st.sum24h("view"), st.sum12h("view"), st.sum1h("view")
)
val changed = st.lastEmitted.forall(_ != out)
state.update(st.copy(lastEmitted = Some(out)))
state.setTimeoutTimestamp(st.lastEventTimeMs + horizon24hMs) // event-time cleanup (with watermark)
if (changed) Iterator(out) else Iterator.empty
}
Notes worth calling out:
-
The eviction logic above is intentionally āconceptual.ā In production, youāll want to be very precise about what counts are included in each horizon and how you subtract as buckets age out.
-
Pair GroupStateTimeout.EventTimeTimeout with a proper watermark so Spark can actually garbage-collect state for quiet keys.
-
For speed, many teams move from deques to compact fixed-size ring buffers indexed by minute offset.
-
-
Write changed rows to your feature store via foreachBatch + MERGE
Because youāre emitting only changed keys, each microbatch is naturally small.
PySpark MERGE sketch:
def upsert_features(microbatch_df, batch_id: int):
microbatch_df.createOrReplaceTempView("changed_features")
spark.sql("""
MERGE INTO feature_store f
USING changed_features s
ON f.profile_id = s.profile_id AND f.gti = s.gti
WHEN MATCHED AND (
f.select_count_1d <> s.select_count_1d OR
f.select_count_12h <> s.select_count_12h OR
f.select_count_1h <> s.select_count_1h OR
f.highlight_count_1d <> s.highlight_count_1d OR
f.highlight_count_12h <> s.highlight_count_12h OR
f.highlight_count_1h <> s.highlight_count_1h OR
f.view_count_1d <> s.view_count_1d OR
f.view_count_12h <> s.view_count_12h OR
f.view_count_1h <> s.view_count_1h
)
THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")
(
changedRows
.writeStream
.trigger(processingTime="60 seconds")
.option("checkpointLocation", "dbfs:/checkpoints/your_pipeline")
.foreachBatch(upsert_features)
.start()
)
How this hits your requirements
-
Per-key output: state is keyed by (profile_id, gti), emission is per key.
-
Change-only output: lastEmitted in state gates emission.
-
True rolling windows: 1h/12h/24h are maintained incrementally from minute buckets.
-
No rewriting unchanged rows: stream emits only changed keys; MERGE updates only when values differ.
-
Late data: watermark + event-time timeout keeps state bounded and allows late updates within tolerance.
If stateful APIs feel too heavy
You can do a ābatch inside foreachBatchā pattern:
-
Stream writes minute-bucket counts to an append-only Delta ābucketsā table.
-
In the same foreachBatch, read only the last 24h of buckets, compute the 1h/12h/24h aggregates in batch, compare to the feature table, and MERGE only changed keys.
Pros: no custom state code.
Cons: more IO per trigger (still bounded to 24h), but often very workable with good partitioning.
Practical tips
-
Watermark: set slightly larger than your max window (example: 25h for a 24h feature).
-
Trigger: 60 seconds aligns nicely with minute buckets.
-
Checkpointing: treat the checkpoint path as immutable for that pipeline.
-
Idempotency: MERGE is your friend; keep the update clause deterministic.
-
Skew: pre-aggregation helps a lot; for extreme hot keys consider salting or special handling.
Hopefully these helpful tips/tricks get you past your sticking point.
Cheers, Louis.