cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

Monitoring structure streaming in externar sink

Maxi1693
New Contributor II

Hi! 

Today working trying to collect some metrics to create a splot in my spark structure streaming. 

It is configured with a trigger(processingTime="30 seconds") and I am trying to collect data with the following Listener Class (just an example).

 

 
# Define my listener.
class MyListener(StreamingQueryListener): 
   def onQueryStarted(self, event):
      print("onQueryStarted")
   def onQueryProgress(self, event):
      print("onQueryProgress")
   def onQueryIdle(self, event):
      print("onQueryIdle")
   def onQueryTerminated(self, event):
       print("onQueryTerminated")
# Add my listener.
spark.streams.addListener(MyListener())
 
But it is not working properly, as I show you in the image, it is only catching the event when the streaming starts, but not when each batch is processed.
 
Screenshot 2024-03-08 113453.png

I have tried to write this class in different ways but it didn't work well. Does anybody know what could be the problem here? Does it need an additional configuration? 

 

 

6 REPLIES 6

MichTalebzadeh
Valued Contributor

Hi,

I have come to a conclusion that this is a bug. In general there is a bug in obtaining individual values from the dictionary. For example, a bug in the way Spark Streaming is populating the processed_rows_per_second key within the microbatch_data -> microbatch_data = event.progres dictionary or any other key. I have explored various debugging steps, and even though the key seems to exist, the value might not be getting set. Note that the dictionary itself prints the elements correctly. This is with regard to method onQueryProgress(self, event) in class MyListener(StreamingQueryListener):

For example with print(microbatch_data), you get all printed as below

onQueryProgress
microbatch_data received
{
"id" : "941e4cb6-f4ee-41f8-b662-af6dda61dc66",
"runId" : "691d5eb2-140e-48c0-949a-7efbe0fa0967",
"name" : null,
"timestamp" : "2024-03-10T09:21:27.233Z",
"batchId" : 21,
"numInputRows" : 1,
"inputRowsPerSecond" : 100.0,
"processedRowsPerSecond" : 5.347593582887701,
"durationMs" : {
"addBatch" : 37,
"commitOffsets" : 41,
"getBatch" : 0,
"latestOffset" : 0,
"queryPlanning" : 5,
"triggerExecution" : 187,
"walCommit" : 104
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "RateStreamV2[rowsPerSecond=1, rampUpTimeSeconds=0, numPartitions=default",
"startOffset" : 20,
"endOffset" : 21,
"latestOffset" : 21,
"numInputRows" : 1,
"inputRowsPerSecond" : 100.0,
"processedRowsPerSecond" : 5.347593582887701
} ],
"sink" : {
"description" : "org.apache.spark.sql.execution.streaming.ConsoleTable$@430a977c",
"numOutputRows" : 1
}
}
However, t
he observed behaviour (i.e. processed_rows_per_second is either None or not being updated correctly).

The spark version I used for my test is 3.4 

Sample code uses format=rate for simulating a streaming process. You can test the code yourself

from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.streaming import DataStreamWriter, StreamingQueryListener
from pyspark.sql.functions import col, round, current_timestamp, lit
import uuid

def process_data(df):
  
    processed_df = df.withColumn("key", lit(str(uuid.uuid4()))).\
                      withColumn("doubled_value", col("value") * 2). \
                      withColumn("op_type", lit(1)). \
                      withColumn("op_time", current_timestamp())
      
    return processed_df

# Create a Spark session
appName = "testListener"
spark = SparkSession.builder.appName(appName).getOrCreate()

# Define the schema for the streaming data
schema = "key string timestamp timestamp, value long"
     
# Define my listener.
class MyListener(StreamingQueryListener):
    def onQueryStarted(self, event):
        print("onQueryStarted")
        print(f"'{event.name}' [{event.id}] got started!")
    def onQueryProgress(self, event):
        print("onQueryProgress")
        # Access micro-batch data
        microbatch_data = event.progress   
        print("microbatch_data received")  # Check if data is received
        print(microbatch_data)
        processed_rows_per_second = microbatch_data.get("processed_rows_per_second")
        if processed_rows_per_second is not None:  # Check if value exists
           print("processed_rows_per_second retrieved")
           print(f"Processed rows per second: {processed_rows_per_second}")
        else:
           print("processed_rows_per_second not retrieved!")
    def onQueryTerminated(self, event):
        print("onQueryTerminated")
        if event.exception:
            print(f"Query terminated with exception: {event.exception}")
        else:
            print("Query successfully terminated.")
    # Add my listener.
   
listener_instance = MyListener() 
spark.streams.addListener(listener_instance)


# Create a streaming DataFrame with the rate source
streaming_df = (
    spark.readStream
    .format("rate")
    .option("rowsPerSecond", 1)
    .load()
)

# Apply processing function to the streaming DataFrame
processed_streaming_df = process_data(streaming_df)

# Define the output sink (for example, console sink)          
query = (
    processed_streaming_df.select( \
                          col("key").alias("key") \
                        , col("doubled_value").alias("doubled_value") \
                        , col("op_type").alias("op_type") \
                        , col("op_time").alias("op_time")). \
                        writeStream.\
                        outputMode("append").\
                        format("console"). \
                        start()
)

# Wait for the streaming query to terminate
query.awaitTermination()

 

Mich Talebzadeh | Technologist | Data | Generative AI | Financial Fraud
London
United Kingdom

view my Linkedin profile



https://en.everybodywiki.com/Mich_Talebzadeh



Disclaimer: The information provided is correct to the best of my knowledge but of course cannot be guaranteed . It is essential to note that, as with any advice, quote "one test result is worth one-thousand expert opinions (Werner Von Braun)".

It helps to get some overview of this capability from the following Databricks link

How to Monitor Streaming Queries in PySpark

https://www.databricks.com/blog/2022/05/27/how-to-monitor-streaming-queries-in-pyspark.html

HTH

Mich Talebzadeh | Technologist | Data | Generative AI | Financial Fraud
London
United Kingdom

view my Linkedin profile



https://en.everybodywiki.com/Mich_Talebzadeh



Disclaimer: The information provided is correct to the best of my knowledge but of course cannot be guaranteed . It is essential to note that, as with any advice, quote "one test result is worth one-thousand expert opinions (Werner Von Braun)".

MichTalebzadeh
Valued Contributor

Hi,

I have done further investigation on this.

Below I have tried to illustrate the issue through PySpark code

   def onQueryProgress(self, event):
        print("onQueryProgress")
        # Access micro-batch data
        microbatch_data = event.progress   
        #print("microbatch_data received")  # Check if data is received
        #print(microbatch_data)
        print(f"Type of microbatch_data is {type(microbatch_data)}")
        #processedRowsPerSecond = microbatch_data.get("processedRowsPerSecond")  incorrect
        processedRowsPerSecond = microbatch_data.processedRowsPerSecond
        if processedRowsPerSecond is not None:  # Check if value exists
           print("processedRowsPerSecond retrieved")
           print(f"Processed rows per second is -> {processedRowsPerSecond}")
        else:
           print("processedRowsPerSecond not retrieved!")

The output

onQueryProgress
Type of microbatch_data is <class 'pyspark.sql.streaming.listener.StreamingQueryProgress'>
processedRowsPerSecond retrieved
Processed rows per second is -> 2.570694087403599

 So we are dealing with the attribute of the class and NOT the dictionary.

The line
(processedRowsPerSecond = microbatch_data.get("processedRowsPerSecond"))

fails because it uses the .get() method,
while the second line

(processedRowsPerSecond = microbatch_data.processedRowsPerSecond)

accesses the attribute directly.

 

Mich Talebzadeh | Technologist | Data | Generative AI | Financial Fraud
London
United Kingdom

view my Linkedin profile



https://en.everybodywiki.com/Mich_Talebzadeh



Disclaimer: The information provided is correct to the best of my knowledge but of course cannot be guaranteed . It is essential to note that, as with any advice, quote "one test result is worth one-thousand expert opinions (Werner Von Braun)".

Forgot to mention that

 For now as a work-around you can access the attribute of the class microbatch_data directly like  below with camelCase for the attribute.

processedRowsPerSecond = microbatch_data.processedRowsPerSecond

HTH

processedRowsPerSecond = microbatch_data.processedRowsPerSecond

Mich Talebzadeh | Technologist | Data | Generative AI | Financial Fraud
London
United Kingdom

view my Linkedin profile



https://en.everybodywiki.com/Mich_Talebzadeh



Disclaimer: The information provided is correct to the best of my knowledge but of course cannot be guaranteed . It is essential to note that, as with any advice, quote "one test result is worth one-thousand expert opinions (Werner Von Braun)".

saicharandeepb
New Contributor II

 Hi @MichTalebzadeh 
I'm currently facing the same problem that was mentioned in that thread, and I havenโ€™t been able to resolve it yet.


It would be incredibly helpful if you could share any further findings, solutions, or workarounds you might have discovered since then. Your insights could really help me move forward

WiliamRosa
New Contributor II

Hi everyone,

I recently worked on a similar requirement and would like to share a structured approach to monitoring Structured Streaming when writing to external sinks.

1. Use a Unique Query Name

Always assign a clear and meaningful name to each streaming query with .queryName("<your_query_name>"). This helps you easily identify the stream and its metrics in the Spark UI under the Streaming tab.

2. Leverage StreamingQueryListener for Metrics Reporting

Spark provides the StreamingQueryListener interface (available in both Python and Scala since Databricks Runtime 11.3 LTS) that allows you to capture lifecycle events like onQueryStarted, onQueryProgress, onQueryIdle, and onQueryTerminated.

Keep logic in these callbacks lightweight to avoid delaying processingโ€”prefer writing metrics to a lightweight store such as Kafka or Prometheus.

3. Define and Observe Custom Metrics

Use the Observable API with .observe(...) to define custom metrics directly within your queryโ€”such as row counts, error counts, or data quality checks. These get emitted as events and can be picked up by your listener.

In your listenerโ€™s onQueryProgress, access these via event.progress.observedMetrics and handle alerts, dashboards, or logs as needed.

4. Capture Detailed Source and Sink Metrics

The event.progress object contains rich metrics related to source, state, and sinkโ€”such as input/output row counts, processing rates, offsets, backlog (e.g., offsets behind latest), and event time statistics.

This is particularly valuable when writing to external sinks like Kafka: you can monitor how many rows are actually delivered, detect lag, and track throughput.

5. Send Metrics to External Observability Tools

Use your listener to push structured metricsโ€”for example, to Prometheus Pushgateway or similar systems. One common pattern is to serialize the progress as JSON and extract key metrics for real-time observability.

This enables dashboarding (e.g., via Grafana) and alerting on key events like increasing output latency or growing backlog.

6. Monitor External Sink Health

Ensure your listener also includes logic for sink connectivity and monitor whether records are being written successfully.

Track both successes and failures (e.g., network errors, backpressure). Combine these with built-in sink.numOutputRows metrics to get better visibility.

Wiliam Rosa
Data Engineer | Machine Learning Engineer
LinkedIn: linkedin.com/in/wiliamrosa