Saturday
Hi Team,
For one of my custom logics i went with transformwithState processor. However it is not working for live stream inputs.,
I have a start date filter on my df_base and when I give start date that is not current, the processor computes df_loss properly but as and when the new streams starts coming the processor freezes and df_loss is not emitting anymore.
below is the code and sample data:
start date : 2025-10-19T06:04:00.000Z
df_loss emits upto and freezes: but df_base has steady inputs every minute
OId | time_trunc | pctfwdpacketLoss | pctrtnpacketLoss |
20000147 | 2025-10-19T06:05:00.000+00:00 | 0 | 0 |
20000147 | 2025-10-19T06:06:00.000+00:00 | 0 | 0 |
20000147 | 2025-10-19T06:07:00.000+00:00 | 0 | 0 |
20000147 | 2025-10-19T06:08:00.000+00:00 | 0 | 0 |
20000147 | 2025-10-19T06:09:00.000+00:00 | 0 | 0 |
here is my transformation and my processor class:
df_base = (
add_oid(df_mef,"managedGroupID","terminalID")
.withWatermark("time_trunc", "15 minutes")
.filter(F.col("OId") == 20000147)
)
counters = df_base.select(
"OId", "time_trunc",
"fwdPacketLoss", "fwdPacketsSent",
"rtnPacketLoss", "rtnPacketsSent"
)
class PacketLossProcessor(StatefulProcessor):
def init(self, handle: StatefulProcessorHandle) -> None:
self.handle = handle
state_schema = StructType([
StructField("fwd_sent", DoubleType(), True),
StructField("fwd_loss", DoubleType(), True),
StructField("rtn_sent", DoubleType(), True),
StructField("rtn_loss", DoubleType(), True),
])
self.state = handle.getValueState("packet_loss_state", state_schema)
def handleInputRows(self, key, rows: Iterator[pd.DataFrame], timerValues) -> Iterator[pd.DataFrame]:
oid = key[0] if isinstance(key, (list, tuple)) else key
if self.state.exists():
sdf = self.state.get()
if isinstance(sdf, pd.DataFrame) and not sdf.empty:
prev = sdf.iloc[0].to_dict()
else:
prev = {"fwd_sent": None, "fwd_loss": None, "rtn_sent": None, "rtn_loss": None}
else:
prev = {"fwd_sent": None, "fwd_loss": None, "rtn_sent": None, "rtn_loss": None}
out_rows = []
def pct(prev_s, prev_l, cur_s, cur_l):
# Must have all four values and positive progress
if pd.isna(prev_s) or pd.isna(prev_l) or pd.isna(cur_s) or pd.isna(cur_l):
return None
if cur_s <= prev_s:
return None
ds, dl = cur_s - prev_s, cur_l - prev_l
if ds <= 0:
return None
val = 100.0 * (dl / ds)
return max(0.0, min(100.0, val))
for pdf in rows:
pdf = pdf.sort_values("time_trunc")
for _, r in pdf.iterrows():
ts = r["time_trunc"]
fs, fl = r["fwdPacketsSent"], r["fwdPacketLoss"]
rs, rl = r["rtnPacketsSent"], r["rtnPacketLoss"]
fwd_pct = pct(prev.get("fwd_sent"), prev.get("fwd_loss"), fs, fl)
rtn_pct = pct(prev.get("rtn_sent"), prev.get("rtn_loss"), rs, rl)
if (fwd_pct is not None) or (rtn_pct is not None):
row = {"OId": str(oid), "time_trunc": ts}
if fwd_pct is not None:
row["fwdpacketLoss"] = fwd_pct
if rtn_pct is not None:
row["rtnpacketLoss"] = rtn_pct
out_rows.append(row) # <-- append once, outside the rtn block
prev["fwd_sent"], prev["fwd_loss"] = fs, fl
prev["rtn_sent"], prev["rtn_loss"] = rs, rl
self.state.update(pd.DataFrame([prev]))
if out_rows:
yield pd.DataFrame(out_rows)
else:
yield pd.DataFrame(columns=["OId", "time_trunc", "fwdpacketLoss", "rtnpacketLoss"])
output_schema = StructType([
StructField("OId", StringType(), True),
StructField("time_trunc", TimestampType(), True),
StructField("fwdpacketLoss", DoubleType(), True),
StructField("rtnpacketLoss", DoubleType(), True)
])
df_loss = (
counters.groupBy("OId")
.transformWithStateInPandas(
statefulProcessor=PacketLossProcessor(),
outputStructType=output_schema,
outputMode="append",
timeMode="ProcessingTime"
)
)
df_loss = (
df_loss.withColumnsRenamed({"fwdpacketLoss": "pctfwdpacketLoss", "rtnpacketLoss": "pctrtnpacketLoss"})
.withWatermark("time_trunc", "15 minutes")
)
Sunday
I managed to solve this. The issue was with how I handled the value state in the def init method. It was handled as a dataframe which caused the state to never materialize nor update therefore emitting nulls.
I changed them to a tuple of values and that fixed the issues.
Updated code below with the reference documentation:
https://docs.databricks.com/aws/en/stateful-applications
STATE_DDL = "fwd_sent DOUBLE, fwd_loss DOUBLE, rtn_sent DOUBLE, rtn_loss DOUBLE"
class PacketLossProcessor(StatefulProcessor):
def init(self, handle: StatefulProcessorHandle) -> None:
# ValueState returns/accepts a TUPLE matching the schema order
self.state = handle.getValueState("packet_loss_state_v2", STATE_DDL) #value state only gets a tuple for state
def handleInputRows(self, key: Any, rows: Iterator[pd.DataFrame], timerValues) -> Iterator[pd.DataFrame]:
oid = key[0] if isinstance(key, (list, tuple)) else key
# --- load previous as tuple -> dict ---
st = self.state.get() # None or (fwd_sent, fwd_loss, rtn_sent, rtn_loss)
if st is None:
prev = {"fwd_sent": None, "fwd_loss": None, "rtn_sent": None, "rtn_loss": None}
else:
fwd_sent, fwd_loss, rtn_sent, rtn_loss = st
prev = {"fwd_sent": fwd_sent, "fwd_loss": fwd_loss,
"rtn_sent": rtn_sent, "rtn_loss": rtn_loss}
out = []
def pct(prev_s, prev_l, cur_s, cur_l):
if pd.isna(prev_s) or pd.isna(prev_l) or pd.isna(cur_s) or pd.isna(cur_l):
return None
if cur_s <= prev_s:
return None
ds, dl = cur_s - prev_s, cur_l - prev_l
if ds <= 0:
return None
return max(0.0, min(100.0, 100.0 * (dl / ds)))
for pdf in rows:
pdf = pdf.sort_values("time_trunc")
for _, r in pdf.iterrows():
ts = r["time_trunc"]
fs, fl = float(r["fwdPacketsSent"]), float(r["fwdPacketLoss"])
rs, rl = float(r["rtnPacketsSent"]), float(r["rtnPacketLoss"])
fwd_pct = pct(prev["fwd_sent"], prev["fwd_loss"], fs, fl)
rtn_pct = pct(prev["rtn_sent"], prev["rtn_loss"], rs, rl)
row = {
"OId": str(oid),
"time_trunc": ts,
"fwdPacketLoss": (float(fwd_pct) if fwd_pct is not None else np.nan),
"rtnPacketLoss": (float(rtn_pct) if rtn_pct is not None else np.nan),
}
if fwd_pct is not None:
row["fwdPacketLoss"] = float(fwd_pct)
if rtn_pct is not None:
row["rtnPacketLoss"] = float(rtn_pct)
out.append(row)
# advance state to CURRENT counters for next record
prev["fwd_sent"], prev["fwd_loss"] = fs, fl
prev["rtn_sent"], prev["rtn_loss"] = rs, rl
# save ONE tuple back
self.state.update((prev["fwd_sent"], prev["fwd_loss"], prev["rtn_sent"], prev["rtn_loss"]))
yield pd.DataFrame(out) if out else pd.DataFrame(
columns=["OId","time_trunc","fwdPacketLoss","rtnPacketLoss"]
)
Sunday
I managed to solve this. The issue was with how I handled the value state in the def init method. It was handled as a dataframe which caused the state to never materialize nor update therefore emitting nulls.
I changed them to a tuple of values and that fixed the issues.
Updated code below with the reference documentation:
https://docs.databricks.com/aws/en/stateful-applications
STATE_DDL = "fwd_sent DOUBLE, fwd_loss DOUBLE, rtn_sent DOUBLE, rtn_loss DOUBLE"
class PacketLossProcessor(StatefulProcessor):
def init(self, handle: StatefulProcessorHandle) -> None:
# ValueState returns/accepts a TUPLE matching the schema order
self.state = handle.getValueState("packet_loss_state_v2", STATE_DDL) #value state only gets a tuple for state
def handleInputRows(self, key: Any, rows: Iterator[pd.DataFrame], timerValues) -> Iterator[pd.DataFrame]:
oid = key[0] if isinstance(key, (list, tuple)) else key
# --- load previous as tuple -> dict ---
st = self.state.get() # None or (fwd_sent, fwd_loss, rtn_sent, rtn_loss)
if st is None:
prev = {"fwd_sent": None, "fwd_loss": None, "rtn_sent": None, "rtn_loss": None}
else:
fwd_sent, fwd_loss, rtn_sent, rtn_loss = st
prev = {"fwd_sent": fwd_sent, "fwd_loss": fwd_loss,
"rtn_sent": rtn_sent, "rtn_loss": rtn_loss}
out = []
def pct(prev_s, prev_l, cur_s, cur_l):
if pd.isna(prev_s) or pd.isna(prev_l) or pd.isna(cur_s) or pd.isna(cur_l):
return None
if cur_s <= prev_s:
return None
ds, dl = cur_s - prev_s, cur_l - prev_l
if ds <= 0:
return None
return max(0.0, min(100.0, 100.0 * (dl / ds)))
for pdf in rows:
pdf = pdf.sort_values("time_trunc")
for _, r in pdf.iterrows():
ts = r["time_trunc"]
fs, fl = float(r["fwdPacketsSent"]), float(r["fwdPacketLoss"])
rs, rl = float(r["rtnPacketsSent"]), float(r["rtnPacketLoss"])
fwd_pct = pct(prev["fwd_sent"], prev["fwd_loss"], fs, fl)
rtn_pct = pct(prev["rtn_sent"], prev["rtn_loss"], rs, rl)
row = {
"OId": str(oid),
"time_trunc": ts,
"fwdPacketLoss": (float(fwd_pct) if fwd_pct is not None else np.nan),
"rtnPacketLoss": (float(rtn_pct) if rtn_pct is not None else np.nan),
}
if fwd_pct is not None:
row["fwdPacketLoss"] = float(fwd_pct)
if rtn_pct is not None:
row["rtnPacketLoss"] = float(rtn_pct)
out.append(row)
# advance state to CURRENT counters for next record
prev["fwd_sent"], prev["fwd_loss"] = fs, fl
prev["rtn_sent"], prev["rtn_loss"] = rs, rl
# save ONE tuple back
self.state.update((prev["fwd_sent"], prev["fwd_loss"], prev["rtn_sent"], prev["rtn_loss"]))
yield pd.DataFrame(out) if out else pd.DataFrame(
columns=["OId","time_trunc","fwdPacketLoss","rtnPacketLoss"]
)
Passionate about hosting events and connecting people? Help us grow a vibrant local community—sign up today to get started!
Sign Up Now