<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic Re: Monitoring structure streaming in externar sink in Data Engineering</title>
    <link>https://community.databricks.com/t5/data-engineering/monitoring-structure-streaming-in-externar-sink/m-p/128506#M48266</link>
    <description>&lt;P&gt;Hi everyone,&lt;/P&gt;&lt;P&gt;I recently worked on a similar requirement and would like to share a structured approach to monitoring Structured Streaming when writing to external sinks.&lt;/P&gt;&lt;P&gt;1. Use a Unique Query Name&lt;/P&gt;&lt;P&gt;Always assign a clear and meaningful name to each streaming query with .queryName("&amp;lt;your_query_name&amp;gt;"). This helps you easily identify the stream and its metrics in the Spark UI under the Streaming tab.&lt;/P&gt;&lt;P&gt;2. Leverage StreamingQueryListener for Metrics Reporting&lt;/P&gt;&lt;P&gt;Spark provides the StreamingQueryListener interface (available in both Python and Scala since Databricks Runtime 11.3 LTS) that allows you to capture lifecycle events like onQueryStarted, onQueryProgress, onQueryIdle, and onQueryTerminated.&lt;/P&gt;&lt;P&gt;Keep logic in these callbacks lightweight to avoid delaying processing—prefer writing metrics to a lightweight store such as Kafka or Prometheus.&lt;/P&gt;&lt;P&gt;3. Define and Observe Custom Metrics&lt;/P&gt;&lt;P&gt;Use the Observable API with .observe(...) to define custom metrics directly within your query—such as row counts, error counts, or data quality checks. These get emitted as events and can be picked up by your listener.&lt;/P&gt;&lt;P&gt;In your listener’s onQueryProgress, access these via event.progress.observedMetrics and handle alerts, dashboards, or logs as needed.&lt;/P&gt;&lt;P&gt;4. Capture Detailed Source and Sink Metrics&lt;/P&gt;&lt;P&gt;The event.progress object contains rich metrics related to source, state, and sink—such as input/output row counts, processing rates, offsets, backlog (e.g., offsets behind latest), and event time statistics.&lt;/P&gt;&lt;P&gt;This is particularly valuable when writing to external sinks like Kafka: you can monitor how many rows are actually delivered, detect lag, and track throughput.&lt;/P&gt;&lt;P&gt;5. Send Metrics to External Observability Tools&lt;/P&gt;&lt;P&gt;Use your listener to push structured metrics—for example, to Prometheus Pushgateway or similar systems. One common pattern is to serialize the progress as JSON and extract key metrics for real-time observability.&lt;/P&gt;&lt;P&gt;This enables dashboarding (e.g., via Grafana) and alerting on key events like increasing output latency or growing backlog.&lt;/P&gt;&lt;P&gt;6. Monitor External Sink Health&lt;/P&gt;&lt;P&gt;Ensure your listener also includes logic for sink connectivity and monitor whether records are being written successfully.&lt;/P&gt;&lt;P&gt;Track both successes and failures (e.g., network errors, backpressure). Combine these with built-in sink.numOutputRows metrics to get better visibility.&lt;/P&gt;</description>
    <pubDate>Fri, 15 Aug 2025 12:42:38 GMT</pubDate>
    <dc:creator>WiliamRosa</dc:creator>
    <dc:date>2025-08-15T12:42:38Z</dc:date>
    <item>
      <title>Monitoring structure streaming in externar sink</title>
      <link>https://community.databricks.com/t5/data-engineering/monitoring-structure-streaming-in-externar-sink/m-p/63069#M32164</link>
      <description>&lt;P&gt;Hi!&amp;nbsp;&lt;/P&gt;&lt;P&gt;Today working trying to collect some metrics to create a splot in my spark structure streaming.&amp;nbsp;&lt;/P&gt;&lt;P&gt;It is configured with a trigger(processingTime="30 seconds") and I am trying to collect data with the following Listener Class (just an example).&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;DIV&gt;&lt;DIV class="lia-indent-padding-left-30px"&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV class="lia-indent-padding-left-30px"&gt;&lt;FONT face="courier new,courier" size="1 2 3 4 5 6 7"&gt;&lt;SPAN&gt;# Define my listener.&lt;/SPAN&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV class="lia-indent-padding-left-30px"&gt;&lt;FONT face="courier new,courier" size="1 2 3 4 5 6 7"&gt;&lt;SPAN&gt;class&lt;/SPAN&gt; &lt;SPAN&gt;MyListener&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;StreamingQueryListener):&amp;nbsp;&lt;/SPAN&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV class="lia-indent-padding-left-30px"&gt;&lt;FONT face="courier new,courier" size="1 2 3 4 5 6 7"&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp;&lt;/SPAN&gt;&lt;SPAN&gt;def&lt;/SPAN&gt; &lt;SPAN&gt;onQueryStarted&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;self&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;event):&lt;/SPAN&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV class="lia-indent-padding-left-30px"&gt;&lt;FONT face="courier new,courier" size="1 2 3 4 5 6 7"&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;print&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"onQueryStarted"&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV class="lia-indent-padding-left-30px"&gt;&lt;FONT face="courier new,courier" size="1 2 3 4 5 6 7"&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp;&lt;/SPAN&gt;&lt;SPAN&gt;def&lt;/SPAN&gt; &lt;SPAN&gt;onQueryProgress&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;self&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;event):&lt;/SPAN&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV class="lia-indent-padding-left-30px"&gt;&lt;FONT face="courier new,courier" size="1 2 3 4 5 6 7"&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;print&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"onQueryProgress"&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV class="lia-indent-padding-left-30px"&gt;&lt;FONT face="courier new,courier" size="1 2 3 4 5 6 7"&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp;&lt;/SPAN&gt;&lt;SPAN&gt;def&lt;/SPAN&gt; &lt;SPAN&gt;onQueryIdle&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;self&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;event):&lt;/SPAN&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV class="lia-indent-padding-left-30px"&gt;&lt;FONT face="courier new,courier" size="1 2 3 4 5 6 7"&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;print&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"onQueryIdle"&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV class="lia-indent-padding-left-30px"&gt;&lt;FONT face="courier new,courier" size="1 2 3 4 5 6 7"&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp;&lt;/SPAN&gt;&lt;SPAN&gt;def&lt;/SPAN&gt; &lt;SPAN&gt;onQueryTerminated&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;self&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;event):&lt;/SPAN&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV class="lia-indent-padding-left-30px"&gt;&lt;FONT face="courier new,courier" size="1 2 3 4 5 6 7"&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;&lt;/SPAN&gt;&lt;SPAN&gt;print&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"onQueryTerminated"&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV class="lia-indent-padding-left-30px"&gt;&lt;FONT face="courier new,courier" size="1 2 3 4 5 6 7"&gt;&lt;SPAN&gt;# Add my listener.&lt;/SPAN&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV class="lia-indent-padding-left-30px"&gt;&lt;FONT face="courier new,courier" size="1 2 3 4 5 6 7"&gt;&lt;SPAN&gt;spark.streams.&lt;/SPAN&gt;&lt;SPAN&gt;addListener&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;MyListener&lt;/SPAN&gt;&lt;SPAN&gt;())&lt;/SPAN&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT face="arial,helvetica,sans-serif" size="3"&gt;&lt;FONT face="arial,helvetica,sans-serif" size="3"&gt;&lt;SPAN&gt;But it is not working properly, as I show you in the image, it is only catching the event when the streaming starts, but not when each batch is processed.&lt;/SPAN&gt;&lt;/FONT&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&lt;span class="lia-inline-image-display-wrapper lia-image-align-inline" image-alt="Screenshot 2024-03-08 113453.png" style="width: 829px;"&gt;&lt;img src="https://community.databricks.com/t5/image/serverpage/image-id/6574i77DFE548CE8D9136/image-dimensions/829x268/is-moderation-mode/true?v=v2" width="829" height="268" role="button" title="Screenshot 2024-03-08 113453.png" alt="Screenshot 2024-03-08 113453.png" /&gt;&lt;/span&gt;&lt;P&gt;I have tried to write this class in different ways but it didn't work well. Does anybody know what could be the problem here? Does it need an additional configuration?&amp;nbsp;&lt;/P&gt;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;/DIV&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;</description>
      <pubDate>Fri, 08 Mar 2024 14:40:43 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/monitoring-structure-streaming-in-externar-sink/m-p/63069#M32164</guid>
      <dc:creator>Maxi1693</dc:creator>
      <dc:date>2024-03-08T14:40:43Z</dc:date>
    </item>
    <item>
      <title>Re: Monitoring structure streaming in externar sink</title>
      <link>https://community.databricks.com/t5/data-engineering/monitoring-structure-streaming-in-externar-sink/m-p/63132#M32176</link>
      <description>&lt;P&gt;Hi,&lt;/P&gt;&lt;P&gt;&lt;FONT face="arial,helvetica,sans-serif"&gt;I have come to a conclusion that this is a bug. In general there is a bug in obtaining individual values from the dictionary. For example,&amp;nbsp;&lt;SPAN&gt;a bug in the way Spark Streaming is populating the &lt;FONT face="courier new,courier"&gt;processed_rows_per_second&lt;/FONT&gt;&lt;/SPAN&gt;&lt;SPAN&gt; key within the &lt;FONT face="courier new,courier"&gt;microbatch_data&lt;/FONT&gt;&amp;nbsp;-&amp;gt;&lt;FONT face="courier new,courier"&gt;&amp;nbsp;microbatch_data = event.progres&lt;/FONT&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;SPAN&gt;dictionary or any other key. I have explored various debugging steps, and even though the key seems to exist, the value might not be getting set. Note that the dictionary itself prints the elements correctly. This is with regard to method&lt;FONT face="courier new,courier"&gt;&amp;nbsp;onQueryProgress(self, event)&lt;/FONT&gt; in class&lt;FONT face="courier new,courier"&gt; MyListener(StreamingQueryListener):&lt;/FONT&gt;&lt;/SPAN&gt;&lt;/FONT&gt;&lt;/P&gt;&lt;P&gt;&lt;FONT face="arial,helvetica,sans-serif"&gt;&lt;SPAN&gt;&lt;FONT face="courier new,courier"&gt;&lt;FONT face="arial,helvetica,sans-serif"&gt;For example with&amp;nbsp;&lt;/FONT&gt;print(microbatch_data), &lt;FONT face="arial,helvetica,sans-serif"&gt;you get all printed as below&lt;/FONT&gt;&lt;BR /&gt;&lt;BR /&gt;&lt;/FONT&gt;&lt;/SPAN&gt;&lt;/FONT&gt;&lt;/P&gt;&lt;P&gt;&lt;FONT face="arial,helvetica,sans-serif"&gt;&lt;SPAN&gt;&lt;FONT face="courier new,courier"&gt;onQueryProgress&lt;BR /&gt;microbatch_data received&lt;BR /&gt;{&lt;BR /&gt;"id" : "941e4cb6-f4ee-41f8-b662-af6dda61dc66",&lt;BR /&gt;"runId" : "691d5eb2-140e-48c0-949a-7efbe0fa0967",&lt;BR /&gt;"name" : null,&lt;BR /&gt;"timestamp" : "2024-03-10T09:21:27.233Z",&lt;BR /&gt;"batchId" : 21,&lt;BR /&gt;"numInputRows" : 1,&lt;BR /&gt;"inputRowsPerSecond" : 100.0,&lt;BR /&gt;&lt;FONT color="#FF0000"&gt;"processedRowsPerSecond" : 5.347593582887701&lt;/FONT&gt;,&lt;BR /&gt;"durationMs" : {&lt;BR /&gt;"addBatch" : 37,&lt;BR /&gt;"commitOffsets" : 41,&lt;BR /&gt;"getBatch" : 0,&lt;BR /&gt;"latestOffset" : 0,&lt;BR /&gt;"queryPlanning" : 5,&lt;BR /&gt;"triggerExecution" : 187,&lt;BR /&gt;"walCommit" : 104&lt;BR /&gt;},&lt;BR /&gt;"stateOperators" : [ ],&lt;BR /&gt;"sources" : [ {&lt;BR /&gt;"description" : "RateStreamV2[rowsPerSecond=1, rampUpTimeSeconds=0, numPartitions=default",&lt;BR /&gt;"startOffset" : 20,&lt;BR /&gt;"endOffset" : 21,&lt;BR /&gt;"latestOffset" : 21,&lt;BR /&gt;"numInputRows" : 1,&lt;BR /&gt;"inputRowsPerSecond" : 100.0,&lt;BR /&gt;"processedRowsPerSecond" : 5.347593582887701&lt;BR /&gt;} ],&lt;BR /&gt;"sink" : {&lt;BR /&gt;"description" : "org.apache.spark.sql.execution.streaming.ConsoleTable$@430a977c",&lt;BR /&gt;"numOutputRows" : 1&lt;BR /&gt;}&lt;BR /&gt;}&lt;BR /&gt;&lt;FONT face="arial,helvetica,sans-serif"&gt;However, t&lt;/FONT&gt;&lt;/FONT&gt;&lt;/SPAN&gt;&lt;/FONT&gt;&lt;FONT color="#183139"&gt;he observed behaviour&amp;nbsp;(i.e.&lt;FONT face="courier new,courier"&gt; processed_rows_per_second &lt;/FONT&gt;&lt;/FONT&gt;&lt;SPAN&gt;is either &lt;FONT face="courier new,courier"&gt;None&amp;nbsp;&lt;/FONT&gt;&lt;/SPAN&gt;&lt;SPAN&gt;or not being updated correctly).&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;FONT face="arial,helvetica,sans-serif"&gt;&lt;SPAN&gt;&lt;FONT face="courier new,courier"&gt;&lt;FONT face="arial,helvetica,sans-serif"&gt;The spark version I used for my test is 3.4&amp;nbsp;&lt;/FONT&gt;&lt;/FONT&gt;&lt;/SPAN&gt;&lt;/FONT&gt;&lt;/P&gt;&lt;P&gt;&lt;FONT face="arial,helvetica,sans-serif" size="4"&gt;&lt;SPAN&gt;&lt;FONT face="courier new,courier"&gt;&lt;FONT face="arial,helvetica,sans-serif"&gt;Sample code uses format=rate for simulating a streaming process. You can test the code yourself&lt;/FONT&gt;&lt;/FONT&gt;&lt;/SPAN&gt;&lt;/FONT&gt;&lt;/P&gt;&lt;LI-CODE lang="python"&gt;from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.streaming import DataStreamWriter, StreamingQueryListener
from pyspark.sql.functions import col, round, current_timestamp, lit
import uuid

def process_data(df):
  
    processed_df = df.withColumn("key", lit(str(uuid.uuid4()))).\
                      withColumn("doubled_value", col("value") * 2). \
                      withColumn("op_type", lit(1)). \
                      withColumn("op_time", current_timestamp())
      
    return processed_df

# Create a Spark session
appName = "testListener"
spark = SparkSession.builder.appName(appName).getOrCreate()

# Define the schema for the streaming data
schema = "key string timestamp timestamp, value long"
     
# Define my listener.
class MyListener(StreamingQueryListener):
    def onQueryStarted(self, event):
        print("onQueryStarted")
        print(f"'{event.name}' [{event.id}] got started!")
    def onQueryProgress(self, event):
        print("onQueryProgress")
        # Access micro-batch data
        microbatch_data = event.progress   
        print("microbatch_data received")  # Check if data is received
        print(microbatch_data)
        processed_rows_per_second = microbatch_data.get("processed_rows_per_second")
        if processed_rows_per_second is not None:  # Check if value exists
           print("processed_rows_per_second retrieved")
           print(f"Processed rows per second: {processed_rows_per_second}")
        else:
           print("processed_rows_per_second not retrieved!")
    def onQueryTerminated(self, event):
        print("onQueryTerminated")
        if event.exception:
            print(f"Query terminated with exception: {event.exception}")
        else:
            print("Query successfully terminated.")
    # Add my listener.
   
listener_instance = MyListener() 
spark.streams.addListener(listener_instance)


# Create a streaming DataFrame with the rate source
streaming_df = (
    spark.readStream
    .format("rate")
    .option("rowsPerSecond", 1)
    .load()
)

# Apply processing function to the streaming DataFrame
processed_streaming_df = process_data(streaming_df)

# Define the output sink (for example, console sink)          
query = (
    processed_streaming_df.select( \
                          col("key").alias("key") \
                        , col("doubled_value").alias("doubled_value") \
                        , col("op_type").alias("op_type") \
                        , col("op_time").alias("op_time")). \
                        writeStream.\
                        outputMode("append").\
                        format("console"). \
                        start()
)

# Wait for the streaming query to terminate
query.awaitTermination()&lt;/LI-CODE&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;</description>
      <pubDate>Sun, 10 Mar 2024 09:59:51 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/monitoring-structure-streaming-in-externar-sink/m-p/63132#M32176</guid>
      <dc:creator>MichTalebzadeh</dc:creator>
      <dc:date>2024-03-10T09:59:51Z</dc:date>
    </item>
    <item>
      <title>Re: Monitoring structure streaming in externar sink</title>
      <link>https://community.databricks.com/t5/data-engineering/monitoring-structure-streaming-in-externar-sink/m-p/63133#M32177</link>
      <description>&lt;P&gt;It helps to get some overview of this capability from the following Databricks link&lt;/P&gt;&lt;H1&gt;&lt;A href="https://www.databricks.com/blog/2022/05/27/how-to-monitor-streaming-queries-in-pyspark.html" target="_self"&gt;&lt;FONT size="4"&gt;How to Monitor Streaming Queries in PySpark&lt;/FONT&gt;&lt;/A&gt;&lt;/H1&gt;&lt;P&gt;&lt;FONT size="4"&gt;&lt;A href="https://www.databricks.com/blog/2022/05/27/how-to-monitor-streaming-queries-in-pyspark.html" target="_blank"&gt;https://www.databricks.com/blog/2022/05/27/how-to-monitor-streaming-queries-in-pyspark.html&lt;/A&gt;&lt;/FONT&gt;&lt;/P&gt;&lt;P&gt;HTH&lt;/P&gt;</description>
      <pubDate>Sun, 10 Mar 2024 10:15:22 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/monitoring-structure-streaming-in-externar-sink/m-p/63133#M32177</guid>
      <dc:creator>MichTalebzadeh</dc:creator>
      <dc:date>2024-03-10T10:15:22Z</dc:date>
    </item>
    <item>
      <title>Re: Monitoring structure streaming in externar sink</title>
      <link>https://community.databricks.com/t5/data-engineering/monitoring-structure-streaming-in-externar-sink/m-p/63389#M32223</link>
      <description>&lt;P&gt;Hi,&lt;/P&gt;&lt;P&gt;I have done further investigation on this.&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;Below I have tried to illustrate the issue through PySpark code&lt;/SPAN&gt;&lt;/P&gt;&lt;LI-CODE lang="python"&gt;   def onQueryProgress(self, event):
        print("onQueryProgress")
        # Access micro-batch data
        microbatch_data = event.progress   
        #print("microbatch_data received")  # Check if data is received
        #print(microbatch_data)
        print(f"Type of microbatch_data is {type(microbatch_data)}")
        #processedRowsPerSecond = microbatch_data.get("processedRowsPerSecond")  incorrect
        processedRowsPerSecond = microbatch_data.processedRowsPerSecond
        if processedRowsPerSecond is not None:  # Check if value exists
           print("processedRowsPerSecond retrieved")
           print(f"Processed rows per second is -&amp;gt; {processedRowsPerSecond}")
        else:
           print("processedRowsPerSecond not retrieved!")&lt;/LI-CODE&gt;&lt;P&gt;&lt;SPAN&gt;The output&lt;/SPAN&gt;&lt;/P&gt;&lt;LI-CODE lang="markup"&gt;onQueryProgress
Type of microbatch_data is &amp;lt;class 'pyspark.sql.streaming.listener.StreamingQueryProgress'&amp;gt;
processedRowsPerSecond retrieved
Processed rows per second is -&amp;gt; 2.570694087403599&lt;/LI-CODE&gt;&lt;P&gt;&amp;nbsp;&lt;SPAN&gt;So we are dealing with the attribute of the class and NOT the dictionary.&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;The line&lt;BR /&gt;&lt;FONT face="courier new,courier" color="#0000FF"&gt;(processedRowsPerSecond = microbatch_data.get("processedRowsPerSecond"))&lt;/FONT&gt;&lt;/P&gt;&lt;P&gt;fails because it uses the .get() method,&lt;BR /&gt;while the second line&lt;/P&gt;&lt;P&gt;&lt;FONT face="courier new,courier" color="#0000FF"&gt;(processedRowsPerSecond = microbatch_data.processedRowsPerSecond)&lt;/FONT&gt;&lt;/P&gt;&lt;P&gt;accesses the attribute directly.&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;</description>
      <pubDate>Tue, 12 Mar 2024 13:03:59 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/monitoring-structure-streaming-in-externar-sink/m-p/63389#M32223</guid>
      <dc:creator>MichTalebzadeh</dc:creator>
      <dc:date>2024-03-12T13:03:59Z</dc:date>
    </item>
    <item>
      <title>Re: Monitoring structure streaming in externar sink</title>
      <link>https://community.databricks.com/t5/data-engineering/monitoring-structure-streaming-in-externar-sink/m-p/63390#M32224</link>
      <description>&lt;P&gt;Forgot to mention that&lt;/P&gt;&lt;P&gt;&amp;nbsp;For now as a work-around you can access the attribute of the class&amp;nbsp;microbatch_data directly like&amp;nbsp; below with camelCase for the attribute.&lt;/P&gt;&lt;P&gt;&lt;FONT face="courier new,courier" color="#0000FF"&gt;processedRowsPerSecond = microbatch_data.processedRowsPerSecond&lt;/FONT&gt;&lt;BR /&gt;&lt;BR /&gt;&lt;SPAN&gt;HTH&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;processedRowsPerSecond = microbatch_data.processedRowsPerSecond&lt;/P&gt;</description>
      <pubDate>Tue, 12 Mar 2024 13:12:47 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/monitoring-structure-streaming-in-externar-sink/m-p/63390#M32224</guid>
      <dc:creator>MichTalebzadeh</dc:creator>
      <dc:date>2024-03-12T13:12:47Z</dc:date>
    </item>
    <item>
      <title>Re: Monitoring structure streaming in externar sink</title>
      <link>https://community.databricks.com/t5/data-engineering/monitoring-structure-streaming-in-externar-sink/m-p/128496#M48259</link>
      <description>&lt;P&gt;&lt;SPAN&gt;&amp;nbsp;Hi&amp;nbsp;&lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/101814"&gt;@MichTalebzadeh&lt;/a&gt;&amp;nbsp;&lt;BR /&gt;I'm currently facing the same problem that was mentioned in that thread, and I haven’t been able to resolve it yet.&lt;/SPAN&gt;&lt;BR /&gt;&lt;BR /&gt;&lt;SPAN&gt;It would be incredibly helpful if you could share any further findings, solutions, or workarounds you might have discovered since then. Your insights could really help me move forward&lt;/SPAN&gt;&lt;/P&gt;</description>
      <pubDate>Fri, 15 Aug 2025 07:20:32 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/monitoring-structure-streaming-in-externar-sink/m-p/128496#M48259</guid>
      <dc:creator>saicharandeepb</dc:creator>
      <dc:date>2025-08-15T07:20:32Z</dc:date>
    </item>
    <item>
      <title>Re: Monitoring structure streaming in externar sink</title>
      <link>https://community.databricks.com/t5/data-engineering/monitoring-structure-streaming-in-externar-sink/m-p/128506#M48266</link>
      <description>&lt;P&gt;Hi everyone,&lt;/P&gt;&lt;P&gt;I recently worked on a similar requirement and would like to share a structured approach to monitoring Structured Streaming when writing to external sinks.&lt;/P&gt;&lt;P&gt;1. Use a Unique Query Name&lt;/P&gt;&lt;P&gt;Always assign a clear and meaningful name to each streaming query with .queryName("&amp;lt;your_query_name&amp;gt;"). This helps you easily identify the stream and its metrics in the Spark UI under the Streaming tab.&lt;/P&gt;&lt;P&gt;2. Leverage StreamingQueryListener for Metrics Reporting&lt;/P&gt;&lt;P&gt;Spark provides the StreamingQueryListener interface (available in both Python and Scala since Databricks Runtime 11.3 LTS) that allows you to capture lifecycle events like onQueryStarted, onQueryProgress, onQueryIdle, and onQueryTerminated.&lt;/P&gt;&lt;P&gt;Keep logic in these callbacks lightweight to avoid delaying processing—prefer writing metrics to a lightweight store such as Kafka or Prometheus.&lt;/P&gt;&lt;P&gt;3. Define and Observe Custom Metrics&lt;/P&gt;&lt;P&gt;Use the Observable API with .observe(...) to define custom metrics directly within your query—such as row counts, error counts, or data quality checks. These get emitted as events and can be picked up by your listener.&lt;/P&gt;&lt;P&gt;In your listener’s onQueryProgress, access these via event.progress.observedMetrics and handle alerts, dashboards, or logs as needed.&lt;/P&gt;&lt;P&gt;4. Capture Detailed Source and Sink Metrics&lt;/P&gt;&lt;P&gt;The event.progress object contains rich metrics related to source, state, and sink—such as input/output row counts, processing rates, offsets, backlog (e.g., offsets behind latest), and event time statistics.&lt;/P&gt;&lt;P&gt;This is particularly valuable when writing to external sinks like Kafka: you can monitor how many rows are actually delivered, detect lag, and track throughput.&lt;/P&gt;&lt;P&gt;5. Send Metrics to External Observability Tools&lt;/P&gt;&lt;P&gt;Use your listener to push structured metrics—for example, to Prometheus Pushgateway or similar systems. One common pattern is to serialize the progress as JSON and extract key metrics for real-time observability.&lt;/P&gt;&lt;P&gt;This enables dashboarding (e.g., via Grafana) and alerting on key events like increasing output latency or growing backlog.&lt;/P&gt;&lt;P&gt;6. Monitor External Sink Health&lt;/P&gt;&lt;P&gt;Ensure your listener also includes logic for sink connectivity and monitor whether records are being written successfully.&lt;/P&gt;&lt;P&gt;Track both successes and failures (e.g., network errors, backpressure). Combine these with built-in sink.numOutputRows metrics to get better visibility.&lt;/P&gt;</description>
      <pubDate>Fri, 15 Aug 2025 12:42:38 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/monitoring-structure-streaming-in-externar-sink/m-p/128506#M48266</guid>
      <dc:creator>WiliamRosa</dc:creator>
      <dc:date>2025-08-15T12:42:38Z</dc:date>
    </item>
  </channel>
</rss>

