Hi All,
I've been working on implementing a custom StreamingQueryListener in pyspark to enable integration with our monitoring solution, I've had quite a lot of success with this on multiple different streaming pipelines, however on the last set I've encountered an issue where some of the default metrics created by Spark on the QueryProgressEvent are not populated, despite me also being able to observe data being written out by the stream.
Here is an example of the metrics that are not being written correctly:
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
However when I check the data actually being written to the delta table, I can see results in it for the time the stream was active, meaning that it is indeed processing rows. I have also logged out the count of rows in the batch to ensure that it is getting data and that is also indeed the case.
Below is the stream that I am monitoring:
gold_customer_event_s = (
silver_customer_events.writeStream.foreachBatch(
lambda batch_df, batch_id: process_customer_event(
spark=spark,
storage_path=data_sources.gold_customerEvent,
recovery_path=data_sources.gold_customerEvent_retries,
source_df=batch_df,
identity_df=static_data,
static_classifications=static_classifications,
)
)
.option(
"checkpointLocation",
checkpoint(data_sources.gold_customerEvent),
).queryName("gold.customer.events")
)
And here is the part of the function process_customer_event that is writing the data (where I log out the count of rows in the batch just before):
(validated_df.write.format("delta").mode("append").option("mergeSchema", "true").save(storage_path))
The only difference to the other streams I have tested is this one is using a forEachBatch, but I cannot find any resources that would indicate that would be a problem.
Any ideas at this point would be appreciated!