MichTalebzadeh
Valued Contributor

Hi,

I have done further investigation on this.

Below I have tried to illustrate the issue through PySpark code

   def onQueryProgress(self, event):
        print("onQueryProgress")
        # Access micro-batch data
        microbatch_data = event.progress   
        #print("microbatch_data received")  # Check if data is received
        #print(microbatch_data)
        print(f"Type of microbatch_data is {type(microbatch_data)}")
        #processedRowsPerSecond = microbatch_data.get("processedRowsPerSecond")  incorrect
        processedRowsPerSecond = microbatch_data.processedRowsPerSecond
        if processedRowsPerSecond is not None:  # Check if value exists
           print("processedRowsPerSecond retrieved")
           print(f"Processed rows per second is -> {processedRowsPerSecond}")
        else:
           print("processedRowsPerSecond not retrieved!")

The output

onQueryProgress
Type of microbatch_data is <class 'pyspark.sql.streaming.listener.StreamingQueryProgress'>
processedRowsPerSecond retrieved
Processed rows per second is -> 2.570694087403599

 So we are dealing with the attribute of the class and NOT the dictionary.

The line
(processedRowsPerSecond = microbatch_data.get("processedRowsPerSecond"))

fails because it uses the .get() method,
while the second line

(processedRowsPerSecond = microbatch_data.processedRowsPerSecond)

accesses the attribute directly.

 

Mich Talebzadeh | Technologist | Data | Generative AI | Financial Fraud
London
United Kingdom

view my Linkedin profile



https://en.everybodywiki.com/Mich_Talebzadeh



Disclaimer: The information provided is correct to the best of my knowledge but of course cannot be guaranteed . It is essential to note that, as with any advice, quote "one test result is worth one-thousand expert opinions (Werner Von Braun)".