<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic Re: TransformWithState is not emitting for live streams in Data Engineering</title>
    <link>https://community.databricks.com/t5/data-engineering/transformwithstate-is-not-emitting-for-live-streams/m-p/135388#M50340</link>
    <description>&lt;P&gt;I managed to solve this. The issue was with how I handled the value state in the def init method. It was handled as a dataframe which caused the state to never materialize nor update therefore emitting nulls.&lt;/P&gt;&lt;P&gt;I changed them to a tuple of values and that fixed the issues.&lt;/P&gt;&lt;P&gt;Updated code below with the reference documentation:&lt;/P&gt;&lt;P&gt;&lt;A title="Databricks - Build  Stateful Application" href="https://docs.databricks.com/aws/en/stateful-applications" target="_self"&gt;https://docs.databricks.com/aws/en/stateful-applications&lt;/A&gt;&amp;nbsp;&lt;/P&gt;&lt;LI-CODE lang="python"&gt;STATE_DDL = "fwd_sent DOUBLE, fwd_loss DOUBLE, rtn_sent DOUBLE, rtn_loss DOUBLE"
class PacketLossProcessor(StatefulProcessor):
    def init(self, handle: StatefulProcessorHandle) -&amp;gt; None:
        # ValueState returns/accepts a TUPLE matching the schema order
        self.state = handle.getValueState("packet_loss_state_v2", STATE_DDL) #value state only gets a tuple for state

    def handleInputRows(self, key: Any, rows: Iterator[pd.DataFrame], timerValues) -&amp;gt; Iterator[pd.DataFrame]:
        oid = key[0] if isinstance(key, (list, tuple)) else key

        # --- load previous as tuple -&amp;gt; dict ---
        st = self.state.get()  # None or (fwd_sent, fwd_loss, rtn_sent, rtn_loss)
        if st is None:
            prev = {"fwd_sent": None, "fwd_loss": None, "rtn_sent": None, "rtn_loss": None}
        else:
            fwd_sent, fwd_loss, rtn_sent, rtn_loss = st
            prev = {"fwd_sent": fwd_sent, "fwd_loss": fwd_loss,
                    "rtn_sent": rtn_sent, "rtn_loss": rtn_loss}

        out = []

        def pct(prev_s, prev_l, cur_s, cur_l):
            if pd.isna(prev_s) or pd.isna(prev_l) or pd.isna(cur_s) or pd.isna(cur_l):
                return None
            if cur_s &amp;lt;= prev_s:
                return None
            ds, dl = cur_s - prev_s, cur_l - prev_l
            if ds &amp;lt;= 0:
                return None
            return max(0.0, min(100.0, 100.0 * (dl / ds)))

        for pdf in rows:
            pdf = pdf.sort_values("time_trunc")
            for _, r in pdf.iterrows():
                ts = r["time_trunc"]
                fs, fl = float(r["fwdPacketsSent"]), float(r["fwdPacketLoss"])
                rs, rl = float(r["rtnPacketsSent"]), float(r["rtnPacketLoss"])

                fwd_pct = pct(prev["fwd_sent"], prev["fwd_loss"], fs, fl)
                rtn_pct = pct(prev["rtn_sent"], prev["rtn_loss"], rs, rl)

                row = {
                    "OId": str(oid),
                    "time_trunc": ts,
                    "fwdPacketLoss": (float(fwd_pct) if fwd_pct is not None else np.nan),
                    "rtnPacketLoss": (float(rtn_pct) if rtn_pct is not None else np.nan),
                }
                if fwd_pct is not None:
                    row["fwdPacketLoss"] = float(fwd_pct)
                if rtn_pct is not None:
                    row["rtnPacketLoss"] = float(rtn_pct)

                out.append(row)

                # advance state to CURRENT counters for next record
                prev["fwd_sent"], prev["fwd_loss"] = fs, fl
                prev["rtn_sent"], prev["rtn_loss"] = rs, rl

        # save ONE tuple back
        self.state.update((prev["fwd_sent"], prev["fwd_loss"], prev["rtn_sent"], prev["rtn_loss"]))

        yield pd.DataFrame(out) if out else pd.DataFrame(
            columns=["OId","time_trunc","fwdPacketLoss","rtnPacketLoss"]
        )&lt;/LI-CODE&gt;</description>
    <pubDate>Mon, 20 Oct 2025 03:53:46 GMT</pubDate>
    <dc:creator>pranaav93</dc:creator>
    <dc:date>2025-10-20T03:53:46Z</dc:date>
    <item>
      <title>TransformWithState is not emitting for live streams</title>
      <link>https://community.databricks.com/t5/data-engineering/transformwithstate-is-not-emitting-for-live-streams/m-p/135360#M50331</link>
      <description>&lt;P&gt;Hi Team,&amp;nbsp;&lt;/P&gt;&lt;P&gt;For one of my custom logics i went with transformwithState processor. However it is not working for live stream inputs.,&amp;nbsp;&lt;/P&gt;&lt;P&gt;I have a start date filter on my df_base and when I give start date that is not current, the processor computes df_loss properly but as and when the new streams starts coming the processor freezes and df_loss is not emitting anymore.&lt;/P&gt;&lt;P&gt;below is the code and sample data:&lt;/P&gt;&lt;P&gt;start date :&amp;nbsp;2025-10-19T06:04:00.000Z&lt;/P&gt;&lt;P&gt;df_loss emits upto&amp;nbsp; and freezes: but df_base has steady inputs every minute&lt;/P&gt;&lt;TABLE border="1" width="63.999997666517324%"&gt;&lt;TBODY&gt;&lt;TR&gt;&lt;TD width="20%" height="30px"&gt;OId&amp;nbsp;&lt;/TD&gt;&lt;TD width="20%" height="30px"&gt;time_trunc&amp;nbsp;&lt;/TD&gt;&lt;TD width="20%" height="30px"&gt;pctfwdpacketLoss&amp;nbsp;&lt;/TD&gt;&lt;TD width="20%" height="30px"&gt;pctrtnpacketLoss&lt;/TD&gt;&lt;/TR&gt;&lt;TR&gt;&lt;TD width="20%" height="30px"&gt;20000147&lt;/TD&gt;&lt;TD width="20%" height="30px"&gt;2025-10-19T06:05:00.000+00:00&lt;/TD&gt;&lt;TD width="20%" height="30px"&gt;0&lt;/TD&gt;&lt;TD width="20%" height="30px"&gt;0&lt;/TD&gt;&lt;/TR&gt;&lt;TR&gt;&lt;TD width="20%" height="57px"&gt;20000147&lt;/TD&gt;&lt;TD width="20%" height="57px"&gt;2025-10-19T06:06:00.000+00:00&lt;/TD&gt;&lt;TD width="20%" height="57px"&gt;0&lt;/TD&gt;&lt;TD width="20%" height="57px"&gt;0&lt;/TD&gt;&lt;/TR&gt;&lt;TR&gt;&lt;TD width="20%" height="57px"&gt;20000147&lt;/TD&gt;&lt;TD width="20%" height="57px"&gt;2025-10-19T06:07:00.000+00:00&lt;/TD&gt;&lt;TD width="20%" height="57px"&gt;0&lt;/TD&gt;&lt;TD width="20%" height="57px"&gt;0&lt;/TD&gt;&lt;/TR&gt;&lt;TR&gt;&lt;TD width="20%" height="57px"&gt;20000147&lt;/TD&gt;&lt;TD width="20%" height="57px"&gt;2025-10-19T06:08:00.000+00:00&lt;/TD&gt;&lt;TD width="20%" height="57px"&gt;0&lt;/TD&gt;&lt;TD width="20%" height="57px"&gt;0&lt;/TD&gt;&lt;/TR&gt;&lt;TR&gt;&lt;TD width="20%" height="30px"&gt;20000147&lt;/TD&gt;&lt;TD width="20%" height="30px"&gt;2025-10-19T06:09:00.000+00:00&lt;/TD&gt;&lt;TD width="20%" height="30px"&gt;0&lt;/TD&gt;&lt;TD width="20%" height="30px"&gt;0&lt;/TD&gt;&lt;/TR&gt;&lt;/TBODY&gt;&lt;/TABLE&gt;&lt;P&gt;here is my transformation and my processor class:&lt;/P&gt;&lt;LI-CODE lang="python"&gt;df_base = (
    add_oid(df_mef,"managedGroupID","terminalID")
    .withWatermark("time_trunc", "15 minutes")
    .filter(F.col("OId") == 20000147)
)
counters = df_base.select(
    "OId", "time_trunc",
    "fwdPacketLoss", "fwdPacketsSent",
    "rtnPacketLoss", "rtnPacketsSent"
)
class PacketLossProcessor(StatefulProcessor):
    def init(self, handle: StatefulProcessorHandle) -&amp;gt; None:
        self.handle = handle
        state_schema = StructType([
            StructField("fwd_sent", DoubleType(), True),
            StructField("fwd_loss", DoubleType(), True),
            StructField("rtn_sent", DoubleType(), True),
            StructField("rtn_loss", DoubleType(), True),
        ])
        self.state = handle.getValueState("packet_loss_state", state_schema)

    def handleInputRows(self, key, rows: Iterator[pd.DataFrame], timerValues) -&amp;gt; Iterator[pd.DataFrame]:
        oid = key[0] if isinstance(key, (list, tuple)) else key
        if self.state.exists():
            sdf = self.state.get()
            if isinstance(sdf, pd.DataFrame) and not sdf.empty:
                prev = sdf.iloc[0].to_dict()
            else:
                prev = {"fwd_sent": None, "fwd_loss": None, "rtn_sent": None, "rtn_loss": None}
        else:
            prev = {"fwd_sent": None, "fwd_loss": None, "rtn_sent": None, "rtn_loss": None}

        out_rows = []

        def pct(prev_s, prev_l, cur_s, cur_l):
            # Must have all four values and positive progress
            if pd.isna(prev_s) or pd.isna(prev_l) or pd.isna(cur_s) or pd.isna(cur_l):
                return None
            if cur_s &amp;lt;= prev_s:
                return None
            ds, dl = cur_s - prev_s, cur_l - prev_l
            if ds &amp;lt;= 0:
                return None
            val = 100.0 * (dl / ds)
            return max(0.0, min(100.0, val))

        for pdf in rows:
            pdf = pdf.sort_values("time_trunc")
            for _, r in pdf.iterrows():
                ts = r["time_trunc"]
                fs, fl = r["fwdPacketsSent"], r["fwdPacketLoss"]
                rs, rl = r["rtnPacketsSent"], r["rtnPacketLoss"]

                fwd_pct = pct(prev.get("fwd_sent"), prev.get("fwd_loss"), fs, fl)
                rtn_pct = pct(prev.get("rtn_sent"), prev.get("rtn_loss"), rs, rl)

                if (fwd_pct is not None) or (rtn_pct is not None):
                    row = {"OId": str(oid), "time_trunc": ts}
                    if fwd_pct is not None:
                        row["fwdpacketLoss"] = fwd_pct
                    if rtn_pct is not None:
                        row["rtnpacketLoss"] = rtn_pct
                    out_rows.append(row)  # &amp;lt;-- append once, outside the rtn block

                prev["fwd_sent"], prev["fwd_loss"] = fs, fl
                prev["rtn_sent"], prev["rtn_loss"] = rs, rl

        self.state.update(pd.DataFrame([prev]))

        if out_rows:
            yield pd.DataFrame(out_rows)
        else:
            yield pd.DataFrame(columns=["OId", "time_trunc", "fwdpacketLoss", "rtnpacketLoss"])
    

output_schema = StructType([
    StructField("OId", StringType(), True),
    StructField("time_trunc", TimestampType(), True),
    StructField("fwdpacketLoss", DoubleType(), True),
    StructField("rtnpacketLoss", DoubleType(), True)
])
    
df_loss = (
    counters.groupBy("OId")
    .transformWithStateInPandas(
        statefulProcessor=PacketLossProcessor(),
        outputStructType=output_schema,
        outputMode="append",
        timeMode="ProcessingTime"
    )
)
df_loss = (
    df_loss.withColumnsRenamed({"fwdpacketLoss": "pctfwdpacketLoss", "rtnpacketLoss": "pctrtnpacketLoss"})
    .withWatermark("time_trunc", "15 minutes")
)&lt;/LI-CODE&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;</description>
      <pubDate>Sun, 19 Oct 2025 06:20:23 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/transformwithstate-is-not-emitting-for-live-streams/m-p/135360#M50331</guid>
      <dc:creator>pranaav93</dc:creator>
      <dc:date>2025-10-19T06:20:23Z</dc:date>
    </item>
    <item>
      <title>Re: TransformWithState is not emitting for live streams</title>
      <link>https://community.databricks.com/t5/data-engineering/transformwithstate-is-not-emitting-for-live-streams/m-p/135388#M50340</link>
      <description>&lt;P&gt;I managed to solve this. The issue was with how I handled the value state in the def init method. It was handled as a dataframe which caused the state to never materialize nor update therefore emitting nulls.&lt;/P&gt;&lt;P&gt;I changed them to a tuple of values and that fixed the issues.&lt;/P&gt;&lt;P&gt;Updated code below with the reference documentation:&lt;/P&gt;&lt;P&gt;&lt;A title="Databricks - Build  Stateful Application" href="https://docs.databricks.com/aws/en/stateful-applications" target="_self"&gt;https://docs.databricks.com/aws/en/stateful-applications&lt;/A&gt;&amp;nbsp;&lt;/P&gt;&lt;LI-CODE lang="python"&gt;STATE_DDL = "fwd_sent DOUBLE, fwd_loss DOUBLE, rtn_sent DOUBLE, rtn_loss DOUBLE"
class PacketLossProcessor(StatefulProcessor):
    def init(self, handle: StatefulProcessorHandle) -&amp;gt; None:
        # ValueState returns/accepts a TUPLE matching the schema order
        self.state = handle.getValueState("packet_loss_state_v2", STATE_DDL) #value state only gets a tuple for state

    def handleInputRows(self, key: Any, rows: Iterator[pd.DataFrame], timerValues) -&amp;gt; Iterator[pd.DataFrame]:
        oid = key[0] if isinstance(key, (list, tuple)) else key

        # --- load previous as tuple -&amp;gt; dict ---
        st = self.state.get()  # None or (fwd_sent, fwd_loss, rtn_sent, rtn_loss)
        if st is None:
            prev = {"fwd_sent": None, "fwd_loss": None, "rtn_sent": None, "rtn_loss": None}
        else:
            fwd_sent, fwd_loss, rtn_sent, rtn_loss = st
            prev = {"fwd_sent": fwd_sent, "fwd_loss": fwd_loss,
                    "rtn_sent": rtn_sent, "rtn_loss": rtn_loss}

        out = []

        def pct(prev_s, prev_l, cur_s, cur_l):
            if pd.isna(prev_s) or pd.isna(prev_l) or pd.isna(cur_s) or pd.isna(cur_l):
                return None
            if cur_s &amp;lt;= prev_s:
                return None
            ds, dl = cur_s - prev_s, cur_l - prev_l
            if ds &amp;lt;= 0:
                return None
            return max(0.0, min(100.0, 100.0 * (dl / ds)))

        for pdf in rows:
            pdf = pdf.sort_values("time_trunc")
            for _, r in pdf.iterrows():
                ts = r["time_trunc"]
                fs, fl = float(r["fwdPacketsSent"]), float(r["fwdPacketLoss"])
                rs, rl = float(r["rtnPacketsSent"]), float(r["rtnPacketLoss"])

                fwd_pct = pct(prev["fwd_sent"], prev["fwd_loss"], fs, fl)
                rtn_pct = pct(prev["rtn_sent"], prev["rtn_loss"], rs, rl)

                row = {
                    "OId": str(oid),
                    "time_trunc": ts,
                    "fwdPacketLoss": (float(fwd_pct) if fwd_pct is not None else np.nan),
                    "rtnPacketLoss": (float(rtn_pct) if rtn_pct is not None else np.nan),
                }
                if fwd_pct is not None:
                    row["fwdPacketLoss"] = float(fwd_pct)
                if rtn_pct is not None:
                    row["rtnPacketLoss"] = float(rtn_pct)

                out.append(row)

                # advance state to CURRENT counters for next record
                prev["fwd_sent"], prev["fwd_loss"] = fs, fl
                prev["rtn_sent"], prev["rtn_loss"] = rs, rl

        # save ONE tuple back
        self.state.update((prev["fwd_sent"], prev["fwd_loss"], prev["rtn_sent"], prev["rtn_loss"]))

        yield pd.DataFrame(out) if out else pd.DataFrame(
            columns=["OId","time_trunc","fwdPacketLoss","rtnPacketLoss"]
        )&lt;/LI-CODE&gt;</description>
      <pubDate>Mon, 20 Oct 2025 03:53:46 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/transformwithstate-is-not-emitting-for-live-streams/m-p/135388#M50340</guid>
      <dc:creator>pranaav93</dc:creator>
      <dc:date>2025-10-20T03:53:46Z</dc:date>
    </item>
  </channel>
</rss>

