<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic Hi @YuriS, There are a few things going on here, and I wi... in Data Engineering</title>
    <link>https://community.databricks.com/t5/data-engineering/streamingquerylistener-metrics-strange-behaviour/m-p/150291#M53336</link>
    <description>&lt;P&gt;Hi &lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/160014"&gt;@YuriS&lt;/a&gt;,&lt;/P&gt;
&lt;P&gt;There are a few things going on here, and I will walk through each one.&lt;/P&gt;
&lt;P&gt;INPUTROWSPERSECOND SHOWING 0&lt;/P&gt;
&lt;P&gt;The inputRowsPerSecond metric is not calculated from the current batch. It is the rate of data arriving between the end of the previous trigger and the start of the current one. Specifically, Spark computes it as:&lt;/P&gt;
&lt;PRE&gt;inputRowsPerSecond = numInputRows / (elapsed time since previous trigger completed)&lt;/PRE&gt;
&lt;P&gt;On the very first batch after a stream starts (or restarts), there is no previous trigger to measure against, so the elapsed time is either zero or undefined. Spark reports 0.0 in that case. You will also see 0.0 whenever the time gap between triggers is extremely small or when the trigger spent most of its duration on metadata operations (latestOffset, WAL commit, query planning) rather than actively fetching rows.&lt;/P&gt;
&lt;P&gt;If you need a reliable throughput metric, processedRowsPerSecond is generally more useful because it divides numInputRows by the actual triggerExecution duration of the current batch:&lt;/P&gt;
&lt;PRE&gt;processedRowsPerSecond = numInputRows / durationMs.triggerExecution&lt;/PRE&gt;
&lt;P&gt;This gives you the actual processing rate for the batch that just completed.&lt;/P&gt;
&lt;P&gt;NUMINPUTROWS BEING 3-5X LARGER THAN ACTUAL COMMITTED ROWS&lt;/P&gt;
&lt;P&gt;This is the more interesting part of your observation. There are a few possible explanations, and they can stack:&lt;/P&gt;
&lt;P&gt;1. SPECULATIVE EXECUTION AND TASK RETRIES&lt;BR /&gt;
If spark.speculation is enabled (it is by default on many Databricks cluster configurations), Spark may launch duplicate tasks for the same partition. Each speculative copy reads the same input rows. numInputRows counts all rows read across all task attempts, not just the ones from the winning task. If you have 3 speculative copies that each read the same data, numInputRows can report 3x the actual unique rows.&lt;/P&gt;
&lt;P&gt;Check whether speculation is enabled:&lt;/P&gt;
&lt;PRE&gt;spark.conf.get("spark.speculation")&lt;/PRE&gt;
&lt;P&gt;If it is "true" and you want numInputRows to reflect unique rows, you can disable it:&lt;/P&gt;
&lt;PRE&gt;spark.conf.set("spark.speculation", "false")&lt;/PRE&gt;
&lt;P&gt;However, weigh this against the benefit speculation gives you for straggler mitigation.&lt;/P&gt;
&lt;P&gt;2. TASK FAILURES AND RETRIES&lt;BR /&gt;
If any tasks fail and are retried (even without speculation), the retried tasks re-read the same input rows. numInputRows accumulates across all attempts.&lt;/P&gt;
&lt;P&gt;3. FOREACHBATCH RETRY SEMANTICS&lt;BR /&gt;
When using foreachBatch, if the batch function throws an exception partway through, the entire micro-batch may be retried. The retried execution re-reads all the input rows from the source, and numInputRows will include both the failed and successful reads.&lt;/P&gt;
&lt;P&gt;4. DELTA SOURCE INTERNAL READS&lt;BR /&gt;
When reading from a Delta table as a streaming source with Change Data Feed or certain merge/update patterns upstream, the Delta source may emit multiple row versions (insert, update_preimage, update_postimage, delete) for a single logical row change. These all count toward numInputRows even if your foreachBatch logic filters some of them out before writing to the target table.&lt;/P&gt;
&lt;P&gt;HOW TO RECONCILE THE NUMBERS&lt;/P&gt;
&lt;P&gt;For accurate row counts of what actually landed in your target, the approach you are already using (comparing batch_id in the target table) is the correct one. The StreamingQueryListener metrics are operational telemetry for throughput estimation, not precise accounting.&lt;/P&gt;
&lt;P&gt;If you need exact input counts inside your foreachBatch logic, use the DataFrame.observe() API to count rows at the point in the pipeline you care about:&lt;/P&gt;
&lt;PRE&gt;from pyspark.sql.functions import count, lit

def process_batch(df, batch_id):
  observed = df.observe("row_count", count(lit(1)).alias("cnt"))
  # your processing logic on observed
  ...

query = (
  spark.readStream
      .format("delta")
      .table("source_table")
      .writeStream
      .foreachBatch(process_batch)
      .option("checkpointLocation", "/path/to/checkpoint")
      .trigger(processingTime="1 minute")
      .start()
)&lt;/PRE&gt;
&lt;P&gt;You can then capture the observed metric in your StreamingQueryListener.onQueryProgress handler via event.progress.observedMetrics.&lt;/P&gt;
&lt;P&gt;BATCH VS. TRIGGER&lt;/P&gt;
&lt;P&gt;A trigger is the scheduling mechanism that tells Spark when to look for new data (e.g., processingTime="1 minute" fires every 60 seconds). A batch (micro-batch) is the unit of work that actually processes the data found during that trigger.&lt;/P&gt;
&lt;P&gt;In practice, they are almost always 1:1: one trigger fires, one micro-batch executes. The distinction surfaces in a few cases:&lt;/P&gt;
&lt;P&gt;- If a trigger fires but no new data is available, no new batch is created (batchId does not increment), and an onQueryIdle event fires instead of onQueryProgress.&lt;BR /&gt;
- After a restart, the first trigger may process a backlog, but it still runs as a single micro-batch.&lt;BR /&gt;
- The batchDuration and durationMs.triggerExecution values will be identical for a normal micro-batch cycle, which is why they look the same in your metrics.&lt;/P&gt;
&lt;P&gt;The key metrics reference for all of these fields is here:&lt;BR /&gt;
&lt;A href="https://learn.microsoft.com/en-us/azure/databricks/structured-streaming/stream-monitoring" target="_blank"&gt;https://learn.microsoft.com/en-us/azure/databricks/structured-streaming/stream-monitoring&lt;/A&gt;&lt;/P&gt;
&lt;P&gt;SUMMARY&lt;/P&gt;
&lt;P&gt;- inputRowsPerSecond = 0 is expected on the first batch and when metadata time dominates. Use processedRowsPerSecond for throughput monitoring.&lt;BR /&gt;
- numInputRows includes retried and speculative task reads. It is not a count of unique input rows. Check spark.speculation and task retry counts in the Spark UI.&lt;BR /&gt;
- For precise row accounting, use DataFrame.observe() or compare offsets/target table data as you are already doing.&lt;BR /&gt;
- Batch and trigger are conceptually different but 1:1 in normal micro-batch operation.&lt;/P&gt;
&lt;P&gt;REFERENCES&lt;/P&gt;
&lt;P&gt;- Monitoring Structured Streaming queries:&lt;/P&gt;
&lt;PRE&gt;https://learn.microsoft.com/en-us/azure/databricks/structured-streaming/stream-monitoring&lt;/PRE&gt;
&lt;P&gt;- StreamingQueryListener documentation:&lt;/P&gt;
&lt;PRE&gt;https://docs.databricks.com/en/structured-streaming/stream-monitoring.html&lt;/PRE&gt;
&lt;P&gt;- Observable metrics with DataFrame.observe():&lt;/P&gt;
&lt;PRE&gt;https://docs.databricks.com/en/structured-streaming/stream-monitoring.html#defining-observable-metrics-in-structured-streaming&lt;/PRE&gt;
&lt;P&gt;* This reply used an agent system I built to research and draft this response based on the wide set of documentation I have available and previous memory. I personally review the draft for any obvious issues and for monitoring system reliability and update it when I detect any drift, but there is still a small chance that something is inaccurate, especially if you are experimenting with brand new features.&lt;/P&gt;
&lt;P&gt;If this answer resolves your question, could you mark it as "Accept as Solution"? That helps other users quickly find the correct fix.&lt;/P&gt;</description>
    <pubDate>Mon, 09 Mar 2026 01:05:49 GMT</pubDate>
    <dc:creator>SteveOstrowski</dc:creator>
    <dc:date>2026-03-09T01:05:49Z</dc:date>
    <item>
      <title>StreamingQueryListener metrics strange behaviour (inputRowsPerSecond metric is set to 0)</title>
      <link>https://community.databricks.com/t5/data-engineering/streamingquerylistener-metrics-strange-behaviour/m-p/145231#M52474</link>
      <description>&lt;P&gt;&lt;SPAN&gt;After implementing StreamingQueryListener to enable integration with our monitoring solution we have noticed some strange metrics for our DeltaSource streams (based on&amp;nbsp;&lt;A href="https://learn.microsoft.com/en-us/azure/databricks/structured-streaming/stream-monitoring" target="_blank"&gt;https://learn.microsoft.com/en-us/azure/databricks/structured-streaming/stream-monitoring&lt;/A&gt;)&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;In some cases for DeltaSource streams metric inputRowsPerSecond is set to 0:&lt;/P&gt;&lt;P&gt;&lt;span class="lia-inline-image-display-wrapper lia-image-align-inline" image-alt="YuriS_0-1769419735190.png" style="width: 400px;"&gt;&lt;img src="https://community.databricks.com/t5/image/serverpage/image-id/23298i71F615DAFC1E0C76/image-size/medium?v=v2&amp;amp;px=400" role="button" title="YuriS_0-1769419735190.png" alt="YuriS_0-1769419735190.png" /&gt;&lt;/span&gt;&lt;/P&gt;&lt;P&gt;For the particular event (same is visible on Spark UI)&lt;/P&gt;&lt;P&gt;&lt;span class="lia-inline-image-display-wrapper lia-image-align-inline" image-alt="YuriS_1-1769419836870.png" style="width: 400px;"&gt;&lt;img src="https://community.databricks.com/t5/image/serverpage/image-id/23299i4D67105A3FC6210D/image-size/medium?v=v2&amp;amp;px=400" role="button" title="YuriS_1-1769419836870.png" alt="YuriS_1-1769419836870.png" /&gt;&lt;/span&gt;&lt;/P&gt;&lt;P&gt;Also would be good to understand what is the different between &lt;STRONG&gt;batch&lt;/STRONG&gt; and &lt;STRONG&gt;trigger&lt;/STRONG&gt; - are these the same, or difference would be visible only when batches are restarted?&lt;BR /&gt;&lt;BR /&gt;Thank you&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;</description>
      <pubDate>Mon, 26 Jan 2026 09:34:11 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/streamingquerylistener-metrics-strange-behaviour/m-p/145231#M52474</guid>
      <dc:creator>YuriS</dc:creator>
      <dc:date>2026-01-26T09:34:11Z</dc:date>
    </item>
    <item>
      <title>Re: StreamingQueryListener metrics strange behaviour (inputRowsPerSecond metric is set to 0)</title>
      <link>https://community.databricks.com/t5/data-engineering/streamingquerylistener-metrics-strange-behaviour/m-p/145241#M52475</link>
      <description>&lt;P&gt;Firstly -&amp;nbsp; let’s talk about batch vs trigger.&lt;/P&gt;&lt;P&gt;A trigger is the scheduling event that tells Spark when to check for new data (eg processingTime, availableNow, once). A batch (micro-batch) is the actual unit of work that processes data, reads input, and commits results. In many cases there is a 1:1 relationship, so they appear the same, but they are conceptually different. The difference becomes visible during restarts, backlog processing, or when a trigger fires but no data is available.&lt;/P&gt;&lt;P&gt;This video gives a clear explanation of trigger behaviour in Structured Streaming:&lt;BR /&gt;&lt;A class="" href="https://www.youtube.com/watch?v=t7cRAIgVduQ&amp;amp;utm_source=chatgpt.com" target="_new" rel="noopener"&gt;https://www.youtube.com/watch?v=t7cRAIgVduQ&lt;/A&gt;&lt;/P&gt;&lt;P&gt;Regarding the metrics shown (batchDuration and triggerExecution being equal):&lt;BR /&gt;this looks strange, but it is expected for micro-batch streaming when a single batch fully occupies the trigger window. the trigger execution time often includes delta metadata work and waiting, so both values can collapse to the same duration.&lt;/P&gt;&lt;P&gt;This also explains why inputRowsPerSecond can be reported as 0.0 for DeltaSource streams. the metric is derived from numInputRows divided by trigger execution time - so yes its slightly strange. when most of the trigger time is spent waiting or doing metadata operations rather than actively reading rows, spark may report an effective input rate of zero even though rows were processed. i would say for monitoring -&amp;nbsp;numInputRows is the more reliable metric.&lt;BR /&gt;&lt;BR /&gt;&lt;BR /&gt;&lt;/P&gt;&lt;P&gt;So - are they the same? - No. A trigger defines when Spark checks for work (e.g. an interval). A batch runs if and only if data is available when the trigger fires.&lt;BR /&gt;Is a difference visible on batch restart -&amp;nbsp;No. The difference is not only visible when batches are restarted.&lt;BR /&gt;&lt;BR /&gt;&lt;/P&gt;</description>
      <pubDate>Mon, 26 Jan 2026 11:07:37 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/streamingquerylistener-metrics-strange-behaviour/m-p/145241#M52475</guid>
      <dc:creator>hasnat_unifeye</dc:creator>
      <dc:date>2026-01-26T11:07:37Z</dc:date>
    </item>
    <item>
      <title>Re: StreamingQueryListener metrics strange behaviour (inputRowsPerSecond metric is set to 0)</title>
      <link>https://community.databricks.com/t5/data-engineering/streamingquerylistener-metrics-strange-behaviour/m-p/148163#M52835</link>
      <description>&lt;P&gt;Could someone from Databricks answer on that question.&lt;BR /&gt;On top of the above numInputRows metric is not (imho) reliable information as well.&lt;/P&gt;&lt;P&gt;Checked several stateless streams (simple foreachBatch function with extraction and straightforward validation where "bad" records are moved to "quarantine" table), compared endOffsets minus startOffsets with numInputRows metric, as well as, with actual data processed to the target table:&lt;/P&gt;&lt;OL&gt;&lt;LI&gt;endOffsets minus startOffsets is the same as the actual number of records committed to target table - if we will not take into account records that were rolled back by consumer - i.e. some offsets are not processed to target table.&lt;/LI&gt;&lt;LI&gt;numInputRows in 95% of cases is 3 or 5 times&amp;nbsp; bigger than the actual number of processed records (i.e. records sunk to target table - does that mean that batch was re-started 3 or 5 times? it would explain a lot, but there some rare scenarious&lt;/LI&gt;&lt;LI&gt;where&amp;nbsp;numInputRows does not seems to be in exact correlation with actual processed number of records. For instance, event with&amp;nbsp;numInputRows = 10523, actual committed records for that particular batch in the target table = 3198 and number of processed offsets is 3224 (looks like there were uncommitted events on producer)&lt;/LI&gt;&lt;/OL&gt;&lt;P&gt;Hard to find logic here...&amp;nbsp;&lt;/P&gt;</description>
      <pubDate>Thu, 12 Feb 2026 11:04:14 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/streamingquerylistener-metrics-strange-behaviour/m-p/148163#M52835</guid>
      <dc:creator>YuriS</dc:creator>
      <dc:date>2026-02-12T11:04:14Z</dc:date>
    </item>
    <item>
      <title>Re: StreamingQueryListener metrics strange behaviour (inputRowsPerSecond metric is set to 0)</title>
      <link>https://community.databricks.com/t5/data-engineering/streamingquerylistener-metrics-strange-behaviour/m-p/148171#M52838</link>
      <description>&lt;P&gt;Hi&amp;nbsp;&lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/160014"&gt;@YuriS&lt;/a&gt;&amp;nbsp;Can you share your writestream trigger properties? what is the trigger processing time you have selected there? can you try by using 10 seconds?&lt;/P&gt;&lt;DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; .&lt;/SPAN&gt;&lt;SPAN&gt;writeStream&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; .&lt;/SPAN&gt;&lt;SPAN&gt;trigger&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;processingTime&lt;/SPAN&gt; &lt;SPAN&gt;=&lt;/SPAN&gt;&amp;nbsp;'10'&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; )&lt;/SPAN&gt;&lt;/DIV&gt;&lt;/DIV&gt;</description>
      <pubDate>Thu, 12 Feb 2026 12:41:26 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/streamingquerylistener-metrics-strange-behaviour/m-p/148171#M52838</guid>
      <dc:creator>saurabh18cs</dc:creator>
      <dc:date>2026-02-12T12:41:26Z</dc:date>
    </item>
    <item>
      <title>Re: StreamingQueryListener metrics strange behaviour (inputRowsPerSecond metric is set to 0)</title>
      <link>https://community.databricks.com/t5/data-engineering/streamingquerylistener-metrics-strange-behaviour/m-p/148272#M52858</link>
      <description>&lt;P&gt;Trigger interval is set to 1 minute;&amp;nbsp;&lt;EM&gt;maxOffsetsPerTrigger&lt;/EM&gt; is set to 20k. That's all.&lt;/P&gt;&lt;P&gt;And once again when i am comparing either offsets (from the StreamingQueryListener events) or actual data in target table (batch_id is in target table, so it is very easy the check what was actually processed) with numInputRows it is x3 times for particular streams:&lt;/P&gt;&lt;P&gt;&lt;span class="lia-inline-image-display-wrapper lia-image-align-inline" image-alt="YuriS_0-1770974536294.png" style="width: 400px;"&gt;&lt;img src="https://community.databricks.com/t5/image/serverpage/image-id/24007iD38C71160B41E23E/image-size/medium?v=v2&amp;amp;px=400" role="button" title="YuriS_0-1770974536294.png" alt="YuriS_0-1770974536294.png" /&gt;&lt;/span&gt;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&lt;BR /&gt;&lt;BR /&gt;&lt;/P&gt;</description>
      <pubDate>Fri, 13 Feb 2026 09:23:25 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/streamingquerylistener-metrics-strange-behaviour/m-p/148272#M52858</guid>
      <dc:creator>YuriS</dc:creator>
      <dc:date>2026-02-13T09:23:25Z</dc:date>
    </item>
    <item>
      <title>Re: StreamingQueryListener metrics strange behaviour (inputRowsPerSecond metric is set to 0)</title>
      <link>https://community.databricks.com/t5/data-engineering/streamingquerylistener-metrics-strange-behaviour/m-p/148297#M52863</link>
      <description>&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; .&lt;/SPAN&gt;&lt;SPAN&gt;option&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;'kafka.request.timeout.ms'&lt;/SPAN&gt;&lt;SPAN&gt;&lt;SPAN&gt;,&lt;/SPAN&gt;&lt;/SPAN&gt;&lt;SPAN&gt;150000&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; .&lt;/SPAN&gt;&lt;SPAN&gt;option&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;'kafka.session.timeout.ms'&lt;/SPAN&gt;&lt;SPAN&gt;&lt;SPAN&gt;,&amp;nbsp;&lt;/SPAN&gt;&lt;/SPAN&gt;&lt;SPAN&gt;200000&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; .&lt;/SPAN&gt;&lt;SPAN&gt;option&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;'group.max.session.timeout.ms'&lt;/SPAN&gt;&lt;SPAN&gt;&lt;SPAN&gt;,&amp;nbsp;&lt;/SPAN&gt;&lt;/SPAN&gt;&lt;SPAN&gt;7200000&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; .&lt;/SPAN&gt;&lt;SPAN&gt;option&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;'failOnDataLoss'&lt;/SPAN&gt;&lt;SPAN&gt;&lt;SPAN&gt;,&amp;nbsp;&lt;/SPAN&gt;&lt;/SPAN&gt;&lt;SPAN&gt;'true'&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; .&lt;/SPAN&gt;&lt;SPAN&gt;option&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;'startingOffsets'&lt;/SPAN&gt;&lt;SPAN&gt;&lt;SPAN&gt;,&amp;nbsp;&lt;/SPAN&gt;&lt;/SPAN&gt;&lt;SPAN&gt;'latest'&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; .&lt;/SPAN&gt;&lt;SPAN&gt;option&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;'fetchOffset.numRetries'&lt;/SPAN&gt;&lt;SPAN&gt;, 1&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; .&lt;/SPAN&gt;&lt;SPAN&gt;option&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;'maxOffsetsPerTrigger'&lt;/SPAN&gt;&lt;SPAN&gt;, 1000&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; )&lt;BR /&gt;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;my settings see if any option here can help you&lt;/SPAN&gt;&lt;/DIV&gt;</description>
      <pubDate>Fri, 13 Feb 2026 10:50:15 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/streamingquerylistener-metrics-strange-behaviour/m-p/148297#M52863</guid>
      <dc:creator>saurabh18cs</dc:creator>
      <dc:date>2026-02-13T10:50:15Z</dc:date>
    </item>
    <item>
      <title>Hi @YuriS, There are a few things going on here, and I wi...</title>
      <link>https://community.databricks.com/t5/data-engineering/streamingquerylistener-metrics-strange-behaviour/m-p/150291#M53336</link>
      <description>&lt;P&gt;Hi &lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/160014"&gt;@YuriS&lt;/a&gt;,&lt;/P&gt;
&lt;P&gt;There are a few things going on here, and I will walk through each one.&lt;/P&gt;
&lt;P&gt;INPUTROWSPERSECOND SHOWING 0&lt;/P&gt;
&lt;P&gt;The inputRowsPerSecond metric is not calculated from the current batch. It is the rate of data arriving between the end of the previous trigger and the start of the current one. Specifically, Spark computes it as:&lt;/P&gt;
&lt;PRE&gt;inputRowsPerSecond = numInputRows / (elapsed time since previous trigger completed)&lt;/PRE&gt;
&lt;P&gt;On the very first batch after a stream starts (or restarts), there is no previous trigger to measure against, so the elapsed time is either zero or undefined. Spark reports 0.0 in that case. You will also see 0.0 whenever the time gap between triggers is extremely small or when the trigger spent most of its duration on metadata operations (latestOffset, WAL commit, query planning) rather than actively fetching rows.&lt;/P&gt;
&lt;P&gt;If you need a reliable throughput metric, processedRowsPerSecond is generally more useful because it divides numInputRows by the actual triggerExecution duration of the current batch:&lt;/P&gt;
&lt;PRE&gt;processedRowsPerSecond = numInputRows / durationMs.triggerExecution&lt;/PRE&gt;
&lt;P&gt;This gives you the actual processing rate for the batch that just completed.&lt;/P&gt;
&lt;P&gt;NUMINPUTROWS BEING 3-5X LARGER THAN ACTUAL COMMITTED ROWS&lt;/P&gt;
&lt;P&gt;This is the more interesting part of your observation. There are a few possible explanations, and they can stack:&lt;/P&gt;
&lt;P&gt;1. SPECULATIVE EXECUTION AND TASK RETRIES&lt;BR /&gt;
If spark.speculation is enabled (it is by default on many Databricks cluster configurations), Spark may launch duplicate tasks for the same partition. Each speculative copy reads the same input rows. numInputRows counts all rows read across all task attempts, not just the ones from the winning task. If you have 3 speculative copies that each read the same data, numInputRows can report 3x the actual unique rows.&lt;/P&gt;
&lt;P&gt;Check whether speculation is enabled:&lt;/P&gt;
&lt;PRE&gt;spark.conf.get("spark.speculation")&lt;/PRE&gt;
&lt;P&gt;If it is "true" and you want numInputRows to reflect unique rows, you can disable it:&lt;/P&gt;
&lt;PRE&gt;spark.conf.set("spark.speculation", "false")&lt;/PRE&gt;
&lt;P&gt;However, weigh this against the benefit speculation gives you for straggler mitigation.&lt;/P&gt;
&lt;P&gt;2. TASK FAILURES AND RETRIES&lt;BR /&gt;
If any tasks fail and are retried (even without speculation), the retried tasks re-read the same input rows. numInputRows accumulates across all attempts.&lt;/P&gt;
&lt;P&gt;3. FOREACHBATCH RETRY SEMANTICS&lt;BR /&gt;
When using foreachBatch, if the batch function throws an exception partway through, the entire micro-batch may be retried. The retried execution re-reads all the input rows from the source, and numInputRows will include both the failed and successful reads.&lt;/P&gt;
&lt;P&gt;4. DELTA SOURCE INTERNAL READS&lt;BR /&gt;
When reading from a Delta table as a streaming source with Change Data Feed or certain merge/update patterns upstream, the Delta source may emit multiple row versions (insert, update_preimage, update_postimage, delete) for a single logical row change. These all count toward numInputRows even if your foreachBatch logic filters some of them out before writing to the target table.&lt;/P&gt;
&lt;P&gt;HOW TO RECONCILE THE NUMBERS&lt;/P&gt;
&lt;P&gt;For accurate row counts of what actually landed in your target, the approach you are already using (comparing batch_id in the target table) is the correct one. The StreamingQueryListener metrics are operational telemetry for throughput estimation, not precise accounting.&lt;/P&gt;
&lt;P&gt;If you need exact input counts inside your foreachBatch logic, use the DataFrame.observe() API to count rows at the point in the pipeline you care about:&lt;/P&gt;
&lt;PRE&gt;from pyspark.sql.functions import count, lit

def process_batch(df, batch_id):
  observed = df.observe("row_count", count(lit(1)).alias("cnt"))
  # your processing logic on observed
  ...

query = (
  spark.readStream
      .format("delta")
      .table("source_table")
      .writeStream
      .foreachBatch(process_batch)
      .option("checkpointLocation", "/path/to/checkpoint")
      .trigger(processingTime="1 minute")
      .start()
)&lt;/PRE&gt;
&lt;P&gt;You can then capture the observed metric in your StreamingQueryListener.onQueryProgress handler via event.progress.observedMetrics.&lt;/P&gt;
&lt;P&gt;BATCH VS. TRIGGER&lt;/P&gt;
&lt;P&gt;A trigger is the scheduling mechanism that tells Spark when to look for new data (e.g., processingTime="1 minute" fires every 60 seconds). A batch (micro-batch) is the unit of work that actually processes the data found during that trigger.&lt;/P&gt;
&lt;P&gt;In practice, they are almost always 1:1: one trigger fires, one micro-batch executes. The distinction surfaces in a few cases:&lt;/P&gt;
&lt;P&gt;- If a trigger fires but no new data is available, no new batch is created (batchId does not increment), and an onQueryIdle event fires instead of onQueryProgress.&lt;BR /&gt;
- After a restart, the first trigger may process a backlog, but it still runs as a single micro-batch.&lt;BR /&gt;
- The batchDuration and durationMs.triggerExecution values will be identical for a normal micro-batch cycle, which is why they look the same in your metrics.&lt;/P&gt;
&lt;P&gt;The key metrics reference for all of these fields is here:&lt;BR /&gt;
&lt;A href="https://learn.microsoft.com/en-us/azure/databricks/structured-streaming/stream-monitoring" target="_blank"&gt;https://learn.microsoft.com/en-us/azure/databricks/structured-streaming/stream-monitoring&lt;/A&gt;&lt;/P&gt;
&lt;P&gt;SUMMARY&lt;/P&gt;
&lt;P&gt;- inputRowsPerSecond = 0 is expected on the first batch and when metadata time dominates. Use processedRowsPerSecond for throughput monitoring.&lt;BR /&gt;
- numInputRows includes retried and speculative task reads. It is not a count of unique input rows. Check spark.speculation and task retry counts in the Spark UI.&lt;BR /&gt;
- For precise row accounting, use DataFrame.observe() or compare offsets/target table data as you are already doing.&lt;BR /&gt;
- Batch and trigger are conceptually different but 1:1 in normal micro-batch operation.&lt;/P&gt;
&lt;P&gt;REFERENCES&lt;/P&gt;
&lt;P&gt;- Monitoring Structured Streaming queries:&lt;/P&gt;
&lt;PRE&gt;https://learn.microsoft.com/en-us/azure/databricks/structured-streaming/stream-monitoring&lt;/PRE&gt;
&lt;P&gt;- StreamingQueryListener documentation:&lt;/P&gt;
&lt;PRE&gt;https://docs.databricks.com/en/structured-streaming/stream-monitoring.html&lt;/PRE&gt;
&lt;P&gt;- Observable metrics with DataFrame.observe():&lt;/P&gt;
&lt;PRE&gt;https://docs.databricks.com/en/structured-streaming/stream-monitoring.html#defining-observable-metrics-in-structured-streaming&lt;/PRE&gt;
&lt;P&gt;* This reply used an agent system I built to research and draft this response based on the wide set of documentation I have available and previous memory. I personally review the draft for any obvious issues and for monitoring system reliability and update it when I detect any drift, but there is still a small chance that something is inaccurate, especially if you are experimenting with brand new features.&lt;/P&gt;
&lt;P&gt;If this answer resolves your question, could you mark it as "Accept as Solution"? That helps other users quickly find the correct fix.&lt;/P&gt;</description>
      <pubDate>Mon, 09 Mar 2026 01:05:49 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/streamingquerylistener-metrics-strange-behaviour/m-p/150291#M53336</guid>
      <dc:creator>SteveOstrowski</dc:creator>
      <dc:date>2026-03-09T01:05:49Z</dc:date>
    </item>
  </channel>
</rss>

