cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Structured Streaming QueryProgressEvent Metrics incorrect

oliverw
New Contributor

Hi All,

I've been working on implementing a custom StreamingQueryListener in pyspark to enable integration with our monitoring solution, I've had quite a lot of success with this on multiple different streaming pipelines, however on the last set I've encountered an issue where some of the default metrics created by Spark on the QueryProgressEvent are not populated, despite me also being able to observe data being written out by the stream.

Here is an example of the metrics that are not being written correctly: 

  "numInputRows" : 0,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.0,

However when I check the data actually being written to the delta table, I can see results in it for the time the stream was active, meaning that it is indeed processing rows. I have also logged out the count of rows in the batch to ensure that it is getting data and that is also indeed the case.

Below is the stream that I am monitoring:

gold_customer_event_s = (
    silver_customer_events.writeStream.foreachBatch(
        lambda batch_df, batch_id: process_customer_event(
            spark=spark,
            storage_path=data_sources.gold_customerEvent,
            recovery_path=data_sources.gold_customerEvent_retries,
            source_df=batch_df,
            identity_df=static_data,
            static_classifications=static_classifications,
        )
    )
    .option(
        "checkpointLocation",
        checkpoint(data_sources.gold_customerEvent),
    ).queryName("gold.customer.events")
)

 

And here is the part of the function process_customer_event that is writing the data (where I log out the count of rows in the batch just before):

(validated_df.write.format("delta").mode("append").option("mergeSchema", "true").save(storage_path))

The only difference to the other streams I have tested is this one is using a forEachBatch, but I cannot find any resources that would indicate that would be a problem. 

 

Any ideas at this point would be appreciated!

3 REPLIES 3

VZLA
Databricks Employee
Databricks Employee

Hi @oliverw Thanks for your question!

Could you please confirm wether you can find in the Driver logs wrapping the time range when the "0" metrics are reported, the following log line:

 

Could not report metrics as number leaves in trigger logical plan did not match that of the execution plan

 

If the above is your case, you can always disable the optimization via the configs:

spark.conf.set("spark.databricks.optimizer.reuseExchangeAndSubquery", "false")
spark.conf.set("spark.sql.exchange.reuse", "false")

However there may be performance regression if you disable these, therefore the suggestion is to first try in a lower environment if possible, if not in-place, but monitor and then revert it back if required.

oliverw
New Contributor

Hi @VZLA, thanks for the speedy reply!

Unfortunately I can't see that in the driver logs at all!

VZLA
Databricks Employee
Databricks Employee

Hi @oliverw ,

I believe this will require some logs and information correlation, could you please raise a support ticket for the same? Sharing further details here may expose some sensitive data, hence a ticket would be more appropriate. Looking forward to assisting you further!

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group