cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

Monitoring structure streaming in externar sink

Maxi1693
New Contributor II

Hi! 

Today working trying to collect some metrics to create a splot in my spark structure streaming. 

It is configured with a trigger(processingTime="30 seconds") and I am trying to collect data with the following Listener Class (just an example).

 

 
# Define my listener.
class MyListener(StreamingQueryListener): 
   def onQueryStarted(self, event):
      print("onQueryStarted")
   def onQueryProgress(self, event):
      print("onQueryProgress")
   def onQueryIdle(self, event):
      print("onQueryIdle")
   def onQueryTerminated(self, event):
       print("onQueryTerminated")
# Add my listener.
spark.streams.addListener(MyListener())
 
But it is not working properly, as I show you in the image, it is only catching the event when the streaming starts, but not when each batch is processed.
 
Screenshot 2024-03-08 113453.png

I have tried to write this class in different ways but it didn't work well. Does anybody know what could be the problem here? Does it need an additional configuration? 

 

 

4 REPLIES 4

MichTalebzadeh
Contributor

Hi,

I have come to a conclusion that this is a bug. In general there is a bug in obtaining individual values from the dictionary. For example, a bug in the way Spark Streaming is populating the processed_rows_per_second key within the microbatch_data -> microbatch_data = event.progres dictionary or any other key. I have explored various debugging steps, and even though the key seems to exist, the value might not be getting set. Note that the dictionary itself prints the elements correctly. This is with regard to method onQueryProgress(self, event) in class MyListener(StreamingQueryListener):

For example with print(microbatch_data), you get all printed as below

onQueryProgress
microbatch_data received
{
"id" : "941e4cb6-f4ee-41f8-b662-af6dda61dc66",
"runId" : "691d5eb2-140e-48c0-949a-7efbe0fa0967",
"name" : null,
"timestamp" : "2024-03-10T09:21:27.233Z",
"batchId" : 21,
"numInputRows" : 1,
"inputRowsPerSecond" : 100.0,
"processedRowsPerSecond" : 5.347593582887701,
"durationMs" : {
"addBatch" : 37,
"commitOffsets" : 41,
"getBatch" : 0,
"latestOffset" : 0,
"queryPlanning" : 5,
"triggerExecution" : 187,
"walCommit" : 104
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "RateStreamV2[rowsPerSecond=1, rampUpTimeSeconds=0, numPartitions=default",
"startOffset" : 20,
"endOffset" : 21,
"latestOffset" : 21,
"numInputRows" : 1,
"inputRowsPerSecond" : 100.0,
"processedRowsPerSecond" : 5.347593582887701
} ],
"sink" : {
"description" : "org.apache.spark.sql.execution.streaming.ConsoleTable$@430a977c",
"numOutputRows" : 1
}
}
However, t
he observed behaviour (i.e. processed_rows_per_second is either None or not being updated correctly).

The spark version I used for my test is 3.4 

Sample code uses format=rate for simulating a streaming process. You can test the code yourself

from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.streaming import DataStreamWriter, StreamingQueryListener
from pyspark.sql.functions import col, round, current_timestamp, lit
import uuid

def process_data(df):
  
    processed_df = df.withColumn("key", lit(str(uuid.uuid4()))).\
                      withColumn("doubled_value", col("value") * 2). \
                      withColumn("op_type", lit(1)). \
                      withColumn("op_time", current_timestamp())
      
    return processed_df

# Create a Spark session
appName = "testListener"
spark = SparkSession.builder.appName(appName).getOrCreate()

# Define the schema for the streaming data
schema = "key string timestamp timestamp, value long"
     
# Define my listener.
class MyListener(StreamingQueryListener):
    def onQueryStarted(self, event):
        print("onQueryStarted")
        print(f"'{event.name}' [{event.id}] got started!")
    def onQueryProgress(self, event):
        print("onQueryProgress")
        # Access micro-batch data
        microbatch_data = event.progress   
        print("microbatch_data received")  # Check if data is received
        print(microbatch_data)
        processed_rows_per_second = microbatch_data.get("processed_rows_per_second")
        if processed_rows_per_second is not None:  # Check if value exists
           print("processed_rows_per_second retrieved")
           print(f"Processed rows per second: {processed_rows_per_second}")
        else:
           print("processed_rows_per_second not retrieved!")
    def onQueryTerminated(self, event):
        print("onQueryTerminated")
        if event.exception:
            print(f"Query terminated with exception: {event.exception}")
        else:
            print("Query successfully terminated.")
    # Add my listener.
   
listener_instance = MyListener() 
spark.streams.addListener(listener_instance)


# Create a streaming DataFrame with the rate source
streaming_df = (
    spark.readStream
    .format("rate")
    .option("rowsPerSecond", 1)
    .load()
)

# Apply processing function to the streaming DataFrame
processed_streaming_df = process_data(streaming_df)

# Define the output sink (for example, console sink)          
query = (
    processed_streaming_df.select( \
                          col("key").alias("key") \
                        , col("doubled_value").alias("doubled_value") \
                        , col("op_type").alias("op_type") \
                        , col("op_time").alias("op_time")). \
                        writeStream.\
                        outputMode("append").\
                        format("console"). \
                        start()
)

# Wait for the streaming query to terminate
query.awaitTermination()

 

Mich Talebzadeh | Technologist | Data | Generative AI | Financial Fraud
London
United Kingdom

view my Linkedin profile



https://en.everybodywiki.com/Mich_Talebzadeh



Disclaimer: The information provided is correct to the best of my knowledge but of course cannot be guaranteed . It is essential to note that, as with any advice, quote "one test result is worth one-thousand expert opinions (Werner Von Braun)".

It helps to get some overview of this capability from the following Databricks link

How to Monitor Streaming Queries in PySpark

https://www.databricks.com/blog/2022/05/27/how-to-monitor-streaming-queries-in-pyspark.html

HTH

Mich Talebzadeh | Technologist | Data | Generative AI | Financial Fraud
London
United Kingdom

view my Linkedin profile



https://en.everybodywiki.com/Mich_Talebzadeh



Disclaimer: The information provided is correct to the best of my knowledge but of course cannot be guaranteed . It is essential to note that, as with any advice, quote "one test result is worth one-thousand expert opinions (Werner Von Braun)".

MichTalebzadeh
Contributor

Hi,

I have done further investigation on this.

Below I have tried to illustrate the issue through PySpark code

   def onQueryProgress(self, event):
        print("onQueryProgress")
        # Access micro-batch data
        microbatch_data = event.progress   
        #print("microbatch_data received")  # Check if data is received
        #print(microbatch_data)
        print(f"Type of microbatch_data is {type(microbatch_data)}")
        #processedRowsPerSecond = microbatch_data.get("processedRowsPerSecond")  incorrect
        processedRowsPerSecond = microbatch_data.processedRowsPerSecond
        if processedRowsPerSecond is not None:  # Check if value exists
           print("processedRowsPerSecond retrieved")
           print(f"Processed rows per second is -> {processedRowsPerSecond}")
        else:
           print("processedRowsPerSecond not retrieved!")

The output

onQueryProgress
Type of microbatch_data is <class 'pyspark.sql.streaming.listener.StreamingQueryProgress'>
processedRowsPerSecond retrieved
Processed rows per second is -> 2.570694087403599

 So we are dealing with the attribute of the class and NOT the dictionary.

The line
(processedRowsPerSecond = microbatch_data.get("processedRowsPerSecond"))

fails because it uses the .get() method,
while the second line

(processedRowsPerSecond = microbatch_data.processedRowsPerSecond)

accesses the attribute directly.

 

Mich Talebzadeh | Technologist | Data | Generative AI | Financial Fraud
London
United Kingdom

view my Linkedin profile



https://en.everybodywiki.com/Mich_Talebzadeh



Disclaimer: The information provided is correct to the best of my knowledge but of course cannot be guaranteed . It is essential to note that, as with any advice, quote "one test result is worth one-thousand expert opinions (Werner Von Braun)".

Forgot to mention that

 For now as a work-around you can access the attribute of the class microbatch_data directly like  below with camelCase for the attribute.

processedRowsPerSecond = microbatch_data.processedRowsPerSecond

HTH

processedRowsPerSecond = microbatch_data.processedRowsPerSecond

Mich Talebzadeh | Technologist | Data | Generative AI | Financial Fraud
London
United Kingdom

view my Linkedin profile



https://en.everybodywiki.com/Mich_Talebzadeh



Disclaimer: The information provided is correct to the best of my knowledge but of course cannot be guaranteed . It is essential to note that, as with any advice, quote "one test result is worth one-thousand expert opinions (Werner Von Braun)".
Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.