Structured Streaming QueryProgressEvent Metrics incorrect
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
12-09-2024 07:07 AM
Hi All,
I've been working on implementing a custom StreamingQueryListener in pyspark to enable integration with our monitoring solution, I've had quite a lot of success with this on multiple different streaming pipelines, however on the last set I've encountered an issue where some of the default metrics created by Spark on the QueryProgressEvent are not populated, despite me also being able to observe data being written out by the stream.
Here is an example of the metrics that are not being written correctly:
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
However when I check the data actually being written to the delta table, I can see results in it for the time the stream was active, meaning that it is indeed processing rows. I have also logged out the count of rows in the batch to ensure that it is getting data and that is also indeed the case.
Below is the stream that I am monitoring:
gold_customer_event_s = (
silver_customer_events.writeStream.foreachBatch(
lambda batch_df, batch_id: process_customer_event(
spark=spark,
storage_path=data_sources.gold_customerEvent,
recovery_path=data_sources.gold_customerEvent_retries,
source_df=batch_df,
identity_df=static_data,
static_classifications=static_classifications,
)
)
.option(
"checkpointLocation",
checkpoint(data_sources.gold_customerEvent),
).queryName("gold.customer.events")
)
And here is the part of the function process_customer_event that is writing the data (where I log out the count of rows in the batch just before):
(validated_df.write.format("delta").mode("append").option("mergeSchema", "true").save(storage_path))The only difference to the other streams I have tested is this one is using a forEachBatch, but I cannot find any resources that would indicate that would be a problem.
Any ideas at this point would be appreciated!
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
12-09-2024 09:12 AM - edited 12-09-2024 09:13 AM
Hi @oliverw Thanks for your question!
Could you please confirm wether you can find in the Driver logs wrapping the time range when the "0" metrics are reported, the following log line:
Could not report metrics as number leaves in trigger logical plan did not match that of the execution plan
If the above is your case, you can always disable the optimization via the configs:
spark.conf.set("spark.databricks.optimizer.reuseExchangeAndSubquery", "false")
spark.conf.set("spark.sql.exchange.reuse", "false")
However there may be performance regression if you disable these, therefore the suggestion is to first try in a lower environment if possible, if not in-place, but monitor and then revert it back if required.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
12-10-2024 05:40 AM
Hi @VZLA, thanks for the speedy reply!
Unfortunately I can't see that in the driver logs at all!
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
12-10-2024 05:55 AM
Hi @oliverw ,
I believe this will require some logs and information correlation, could you please raise a support ticket for the same? Sharing further details here may expose some sensitive data, hence a ticket would be more appropriate. Looking forward to assisting you further!