cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

TransformWithState is not emitting for live streams

pranaav93
New Contributor III

Hi Team, 

For one of my custom logics i went with transformwithState processor. However it is not working for live stream inputs., 

I have a start date filter on my df_base and when I give start date that is not current, the processor computes df_loss properly but as and when the new streams starts coming the processor freezes and df_loss is not emitting anymore.

below is the code and sample data:

start date : 2025-10-19T06:04:00.000Z

df_loss emits upto  and freezes: but df_base has steady inputs every minute

OId time_trunc pctfwdpacketLoss pctrtnpacketLoss
200001472025-10-19T06:05:00.000+00:0000
200001472025-10-19T06:06:00.000+00:0000
200001472025-10-19T06:07:00.000+00:0000
200001472025-10-19T06:08:00.000+00:0000
200001472025-10-19T06:09:00.000+00:0000

here is my transformation and my processor class:

df_base = (
    add_oid(df_mef,"managedGroupID","terminalID")
    .withWatermark("time_trunc", "15 minutes")
    .filter(F.col("OId") == 20000147)
)
counters = df_base.select(
    "OId", "time_trunc",
    "fwdPacketLoss", "fwdPacketsSent",
    "rtnPacketLoss", "rtnPacketsSent"
)
class PacketLossProcessor(StatefulProcessor):
    def init(self, handle: StatefulProcessorHandle) -> None:
        self.handle = handle
        state_schema = StructType([
            StructField("fwd_sent", DoubleType(), True),
            StructField("fwd_loss", DoubleType(), True),
            StructField("rtn_sent", DoubleType(), True),
            StructField("rtn_loss", DoubleType(), True),
        ])
        self.state = handle.getValueState("packet_loss_state", state_schema)

    def handleInputRows(self, key, rows: Iterator[pd.DataFrame], timerValues) -> Iterator[pd.DataFrame]:
        oid = key[0] if isinstance(key, (list, tuple)) else key
        if self.state.exists():
            sdf = self.state.get()
            if isinstance(sdf, pd.DataFrame) and not sdf.empty:
                prev = sdf.iloc[0].to_dict()
            else:
                prev = {"fwd_sent": None, "fwd_loss": None, "rtn_sent": None, "rtn_loss": None}
        else:
            prev = {"fwd_sent": None, "fwd_loss": None, "rtn_sent": None, "rtn_loss": None}

        out_rows = []

        def pct(prev_s, prev_l, cur_s, cur_l):
            # Must have all four values and positive progress
            if pd.isna(prev_s) or pd.isna(prev_l) or pd.isna(cur_s) or pd.isna(cur_l):
                return None
            if cur_s <= prev_s:
                return None
            ds, dl = cur_s - prev_s, cur_l - prev_l
            if ds <= 0:
                return None
            val = 100.0 * (dl / ds)
            return max(0.0, min(100.0, val))

        for pdf in rows:
            pdf = pdf.sort_values("time_trunc")
            for _, r in pdf.iterrows():
                ts = r["time_trunc"]
                fs, fl = r["fwdPacketsSent"], r["fwdPacketLoss"]
                rs, rl = r["rtnPacketsSent"], r["rtnPacketLoss"]

                fwd_pct = pct(prev.get("fwd_sent"), prev.get("fwd_loss"), fs, fl)
                rtn_pct = pct(prev.get("rtn_sent"), prev.get("rtn_loss"), rs, rl)

                if (fwd_pct is not None) or (rtn_pct is not None):
                    row = {"OId": str(oid), "time_trunc": ts}
                    if fwd_pct is not None:
                        row["fwdpacketLoss"] = fwd_pct
                    if rtn_pct is not None:
                        row["rtnpacketLoss"] = rtn_pct
                    out_rows.append(row)  # <-- append once, outside the rtn block

                prev["fwd_sent"], prev["fwd_loss"] = fs, fl
                prev["rtn_sent"], prev["rtn_loss"] = rs, rl

        self.state.update(pd.DataFrame([prev]))

        if out_rows:
            yield pd.DataFrame(out_rows)
        else:
            yield pd.DataFrame(columns=["OId", "time_trunc", "fwdpacketLoss", "rtnpacketLoss"])
    

output_schema = StructType([
    StructField("OId", StringType(), True),
    StructField("time_trunc", TimestampType(), True),
    StructField("fwdpacketLoss", DoubleType(), True),
    StructField("rtnpacketLoss", DoubleType(), True)
])
    
df_loss = (
    counters.groupBy("OId")
    .transformWithStateInPandas(
        statefulProcessor=PacketLossProcessor(),
        outputStructType=output_schema,
        outputMode="append",
        timeMode="ProcessingTime"
    )
)
df_loss = (
    df_loss.withColumnsRenamed({"fwdpacketLoss": "pctfwdpacketLoss", "rtnpacketLoss": "pctrtnpacketLoss"})
    .withWatermark("time_trunc", "15 minutes")
)

 

laughingbroccoli93
1 ACCEPTED SOLUTION

Accepted Solutions

pranaav93
New Contributor III

I managed to solve this. The issue was with how I handled the value state in the def init method. It was handled as a dataframe which caused the state to never materialize nor update therefore emitting nulls.

I changed them to a tuple of values and that fixed the issues.

Updated code below with the reference documentation:

https://docs.databricks.com/aws/en/stateful-applications 

STATE_DDL = "fwd_sent DOUBLE, fwd_loss DOUBLE, rtn_sent DOUBLE, rtn_loss DOUBLE"
class PacketLossProcessor(StatefulProcessor):
    def init(self, handle: StatefulProcessorHandle) -> None:
        # ValueState returns/accepts a TUPLE matching the schema order
        self.state = handle.getValueState("packet_loss_state_v2", STATE_DDL) #value state only gets a tuple for state

    def handleInputRows(self, key: Any, rows: Iterator[pd.DataFrame], timerValues) -> Iterator[pd.DataFrame]:
        oid = key[0] if isinstance(key, (list, tuple)) else key

        # --- load previous as tuple -> dict ---
        st = self.state.get()  # None or (fwd_sent, fwd_loss, rtn_sent, rtn_loss)
        if st is None:
            prev = {"fwd_sent": None, "fwd_loss": None, "rtn_sent": None, "rtn_loss": None}
        else:
            fwd_sent, fwd_loss, rtn_sent, rtn_loss = st
            prev = {"fwd_sent": fwd_sent, "fwd_loss": fwd_loss,
                    "rtn_sent": rtn_sent, "rtn_loss": rtn_loss}

        out = []

        def pct(prev_s, prev_l, cur_s, cur_l):
            if pd.isna(prev_s) or pd.isna(prev_l) or pd.isna(cur_s) or pd.isna(cur_l):
                return None
            if cur_s <= prev_s:
                return None
            ds, dl = cur_s - prev_s, cur_l - prev_l
            if ds <= 0:
                return None
            return max(0.0, min(100.0, 100.0 * (dl / ds)))

        for pdf in rows:
            pdf = pdf.sort_values("time_trunc")
            for _, r in pdf.iterrows():
                ts = r["time_trunc"]
                fs, fl = float(r["fwdPacketsSent"]), float(r["fwdPacketLoss"])
                rs, rl = float(r["rtnPacketsSent"]), float(r["rtnPacketLoss"])

                fwd_pct = pct(prev["fwd_sent"], prev["fwd_loss"], fs, fl)
                rtn_pct = pct(prev["rtn_sent"], prev["rtn_loss"], rs, rl)

                row = {
                    "OId": str(oid),
                    "time_trunc": ts,
                    "fwdPacketLoss": (float(fwd_pct) if fwd_pct is not None else np.nan),
                    "rtnPacketLoss": (float(rtn_pct) if rtn_pct is not None else np.nan),
                }
                if fwd_pct is not None:
                    row["fwdPacketLoss"] = float(fwd_pct)
                if rtn_pct is not None:
                    row["rtnPacketLoss"] = float(rtn_pct)

                out.append(row)

                # advance state to CURRENT counters for next record
                prev["fwd_sent"], prev["fwd_loss"] = fs, fl
                prev["rtn_sent"], prev["rtn_loss"] = rs, rl

        # save ONE tuple back
        self.state.update((prev["fwd_sent"], prev["fwd_loss"], prev["rtn_sent"], prev["rtn_loss"]))

        yield pd.DataFrame(out) if out else pd.DataFrame(
            columns=["OId","time_trunc","fwdPacketLoss","rtnPacketLoss"]
        )
laughingbroccoli93

View solution in original post

1 REPLY 1

pranaav93
New Contributor III

I managed to solve this. The issue was with how I handled the value state in the def init method. It was handled as a dataframe which caused the state to never materialize nor update therefore emitting nulls.

I changed them to a tuple of values and that fixed the issues.

Updated code below with the reference documentation:

https://docs.databricks.com/aws/en/stateful-applications 

STATE_DDL = "fwd_sent DOUBLE, fwd_loss DOUBLE, rtn_sent DOUBLE, rtn_loss DOUBLE"
class PacketLossProcessor(StatefulProcessor):
    def init(self, handle: StatefulProcessorHandle) -> None:
        # ValueState returns/accepts a TUPLE matching the schema order
        self.state = handle.getValueState("packet_loss_state_v2", STATE_DDL) #value state only gets a tuple for state

    def handleInputRows(self, key: Any, rows: Iterator[pd.DataFrame], timerValues) -> Iterator[pd.DataFrame]:
        oid = key[0] if isinstance(key, (list, tuple)) else key

        # --- load previous as tuple -> dict ---
        st = self.state.get()  # None or (fwd_sent, fwd_loss, rtn_sent, rtn_loss)
        if st is None:
            prev = {"fwd_sent": None, "fwd_loss": None, "rtn_sent": None, "rtn_loss": None}
        else:
            fwd_sent, fwd_loss, rtn_sent, rtn_loss = st
            prev = {"fwd_sent": fwd_sent, "fwd_loss": fwd_loss,
                    "rtn_sent": rtn_sent, "rtn_loss": rtn_loss}

        out = []

        def pct(prev_s, prev_l, cur_s, cur_l):
            if pd.isna(prev_s) or pd.isna(prev_l) or pd.isna(cur_s) or pd.isna(cur_l):
                return None
            if cur_s <= prev_s:
                return None
            ds, dl = cur_s - prev_s, cur_l - prev_l
            if ds <= 0:
                return None
            return max(0.0, min(100.0, 100.0 * (dl / ds)))

        for pdf in rows:
            pdf = pdf.sort_values("time_trunc")
            for _, r in pdf.iterrows():
                ts = r["time_trunc"]
                fs, fl = float(r["fwdPacketsSent"]), float(r["fwdPacketLoss"])
                rs, rl = float(r["rtnPacketsSent"]), float(r["rtnPacketLoss"])

                fwd_pct = pct(prev["fwd_sent"], prev["fwd_loss"], fs, fl)
                rtn_pct = pct(prev["rtn_sent"], prev["rtn_loss"], rs, rl)

                row = {
                    "OId": str(oid),
                    "time_trunc": ts,
                    "fwdPacketLoss": (float(fwd_pct) if fwd_pct is not None else np.nan),
                    "rtnPacketLoss": (float(rtn_pct) if rtn_pct is not None else np.nan),
                }
                if fwd_pct is not None:
                    row["fwdPacketLoss"] = float(fwd_pct)
                if rtn_pct is not None:
                    row["rtnPacketLoss"] = float(rtn_pct)

                out.append(row)

                # advance state to CURRENT counters for next record
                prev["fwd_sent"], prev["fwd_loss"] = fs, fl
                prev["rtn_sent"], prev["rtn_loss"] = rs, rl

        # save ONE tuple back
        self.state.update((prev["fwd_sent"], prev["fwd_loss"], prev["rtn_sent"], prev["rtn_loss"]))

        yield pd.DataFrame(out) if out else pd.DataFrame(
            columns=["OId","time_trunc","fwdPacketLoss","rtnPacketLoss"]
        )
laughingbroccoli93

Join Us as a Local Community Builder!

Passionate about hosting events and connecting people? Help us grow a vibrant local community—sign up today to get started!

Sign Up Now