โ03-08-2024 06:39 AM - edited โ03-08-2024 06:40 AM
Hi!
Today working trying to collect some metrics to create a splot in my spark structure streaming.
It is configured with a trigger(processingTime="30 seconds") and I am trying to collect data with the following Listener Class (just an example).
I have tried to write this class in different ways but it didn't work well. Does anybody know what could be the problem here? Does it need an additional configuration?
โ03-10-2024 01:59 AM
Hi,
I have come to a conclusion that this is a bug. In general there is a bug in obtaining individual values from the dictionary. For example, a bug in the way Spark Streaming is populating the processed_rows_per_second key within the microbatch_data -> microbatch_data = event.progres dictionary or any other key. I have explored various debugging steps, and even though the key seems to exist, the value might not be getting set. Note that the dictionary itself prints the elements correctly. This is with regard to method onQueryProgress(self, event) in class MyListener(StreamingQueryListener):
For example with print(microbatch_data), you get all printed as below
onQueryProgress
microbatch_data received
{
"id" : "941e4cb6-f4ee-41f8-b662-af6dda61dc66",
"runId" : "691d5eb2-140e-48c0-949a-7efbe0fa0967",
"name" : null,
"timestamp" : "2024-03-10T09:21:27.233Z",
"batchId" : 21,
"numInputRows" : 1,
"inputRowsPerSecond" : 100.0,
"processedRowsPerSecond" : 5.347593582887701,
"durationMs" : {
"addBatch" : 37,
"commitOffsets" : 41,
"getBatch" : 0,
"latestOffset" : 0,
"queryPlanning" : 5,
"triggerExecution" : 187,
"walCommit" : 104
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "RateStreamV2[rowsPerSecond=1, rampUpTimeSeconds=0, numPartitions=default",
"startOffset" : 20,
"endOffset" : 21,
"latestOffset" : 21,
"numInputRows" : 1,
"inputRowsPerSecond" : 100.0,
"processedRowsPerSecond" : 5.347593582887701
} ],
"sink" : {
"description" : "org.apache.spark.sql.execution.streaming.ConsoleTable$@430a977c",
"numOutputRows" : 1
}
}
However, the observed behaviour (i.e. processed_rows_per_second is either None or not being updated correctly).
The spark version I used for my test is 3.4
Sample code uses format=rate for simulating a streaming process. You can test the code yourself
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.streaming import DataStreamWriter, StreamingQueryListener
from pyspark.sql.functions import col, round, current_timestamp, lit
import uuid
def process_data(df):
processed_df = df.withColumn("key", lit(str(uuid.uuid4()))).\
withColumn("doubled_value", col("value") * 2). \
withColumn("op_type", lit(1)). \
withColumn("op_time", current_timestamp())
return processed_df
# Create a Spark session
appName = "testListener"
spark = SparkSession.builder.appName(appName).getOrCreate()
# Define the schema for the streaming data
schema = "key string timestamp timestamp, value long"
# Define my listener.
class MyListener(StreamingQueryListener):
def onQueryStarted(self, event):
print("onQueryStarted")
print(f"'{event.name}' [{event.id}] got started!")
def onQueryProgress(self, event):
print("onQueryProgress")
# Access micro-batch data
microbatch_data = event.progress
print("microbatch_data received") # Check if data is received
print(microbatch_data)
processed_rows_per_second = microbatch_data.get("processed_rows_per_second")
if processed_rows_per_second is not None: # Check if value exists
print("processed_rows_per_second retrieved")
print(f"Processed rows per second: {processed_rows_per_second}")
else:
print("processed_rows_per_second not retrieved!")
def onQueryTerminated(self, event):
print("onQueryTerminated")
if event.exception:
print(f"Query terminated with exception: {event.exception}")
else:
print("Query successfully terminated.")
# Add my listener.
listener_instance = MyListener()
spark.streams.addListener(listener_instance)
# Create a streaming DataFrame with the rate source
streaming_df = (
spark.readStream
.format("rate")
.option("rowsPerSecond", 1)
.load()
)
# Apply processing function to the streaming DataFrame
processed_streaming_df = process_data(streaming_df)
# Define the output sink (for example, console sink)
query = (
processed_streaming_df.select( \
col("key").alias("key") \
, col("doubled_value").alias("doubled_value") \
, col("op_type").alias("op_type") \
, col("op_time").alias("op_time")). \
writeStream.\
outputMode("append").\
format("console"). \
start()
)
# Wait for the streaming query to terminate
query.awaitTermination()
โ03-10-2024 03:15 AM
It helps to get some overview of this capability from the following Databricks link
https://www.databricks.com/blog/2022/05/27/how-to-monitor-streaming-queries-in-pyspark.html
HTH
โ03-12-2024 06:03 AM
Hi,
I have done further investigation on this.
Below I have tried to illustrate the issue through PySpark code
def onQueryProgress(self, event):
print("onQueryProgress")
# Access micro-batch data
microbatch_data = event.progress
#print("microbatch_data received") # Check if data is received
#print(microbatch_data)
print(f"Type of microbatch_data is {type(microbatch_data)}")
#processedRowsPerSecond = microbatch_data.get("processedRowsPerSecond") incorrect
processedRowsPerSecond = microbatch_data.processedRowsPerSecond
if processedRowsPerSecond is not None: # Check if value exists
print("processedRowsPerSecond retrieved")
print(f"Processed rows per second is -> {processedRowsPerSecond}")
else:
print("processedRowsPerSecond not retrieved!")
The output
onQueryProgress
Type of microbatch_data is <class 'pyspark.sql.streaming.listener.StreamingQueryProgress'>
processedRowsPerSecond retrieved
Processed rows per second is -> 2.570694087403599
So we are dealing with the attribute of the class and NOT the dictionary.
The line
(processedRowsPerSecond = microbatch_data.get("processedRowsPerSecond"))
fails because it uses the .get() method,
while the second line
(processedRowsPerSecond = microbatch_data.processedRowsPerSecond)
accesses the attribute directly.
โ03-12-2024 06:12 AM
Forgot to mention that
For now as a work-around you can access the attribute of the class microbatch_data directly like below with camelCase for the attribute.
processedRowsPerSecond = microbatch_data.processedRowsPerSecond
HTH
processedRowsPerSecond = microbatch_data.processedRowsPerSecond
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโt want to miss the chance to attend and share knowledge.
If there isnโt a group near you, start one and help create a community that brings people together.
Request a New Group