08-15-2023 04:40 AM - edited 08-15-2023 04:42 AM
I am defining a StreamingQueryListener that collects metrics on my Spark Structured Streaming tasks and sends them to a Prometheus Pushgateway.
When the job is terminated, I want to use the onQueryTerminated to cleanup the metrics for each job from the Pushgateway so that Prometheus is not continuously pulling stale metrics for stream jobs that are not running.
This works perfectly when run in a notebook. When I terminate the stream job, `onQueryTerminated` is called which then successfully deletes the metrics from the pushgateway.
However, when the streams are running as part of a Job on a Job compute cluster and the job is cancelled, `onQueryTerminated` does not to have time to complete its execution and results in an error:
23/08/10 14:56:41 ERROR StreamingQueryListenerBus: Listener PythonStreamingQueryListenerWrapper threw an exception
py4j.Py4JException: Cannot obtain a new communication channel
at py4j.CallbackClient.sendCommand(CallbackClient.java:380)
at py4j.CallbackClient.sendCommand(CallbackClient.java:356)
at py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:106)
at com.sun.proxy.$Proxy136.onQueryTerminated(Unknown Source)
at org.apache.spark.sql.streaming.PythonStreamingQueryListenerWrapper.onQueryTerminated(StreamingQueryListener.scala:114)
at org.apache.spark.sql.execution.streaming.StreamingQueryListenerBus.doPostEvent(StreamingQueryListenerBus.scala:150)
at org.apache.spark.sql.execution.streaming.StreamingQueryListenerBus.doPostEvent(StreamingQueryListenerBus.scala:43)
at org.apache.spark.util.ListenerBus.postToAll(ListenerBus.scala:118)
at org.apache.spark.util.ListenerBus.postToAll$(ListenerBus.scala:102)
at org.apache.spark.sql.execution.streaming.StreamingQueryListenerBus.postToAll(StreamingQueryListenerBus.scala:91)
at org.apache.spark.sql.execution.streaming.StreamingQueryListenerBus.onOtherEvent(StreamingQueryListenerBus.scala:111)
at org.apache.spark.scheduler.SparkListenerBus.doPostEvent(SparkListenerBus.scala:102)
at org.apache.spark.scheduler.SparkListenerBus.doPostEvent$(SparkListenerBus.scala:28)
at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:42)
at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:42)
at org.apache.spark.util.ListenerBus.postToAll(ListenerBus.scala:118)
at org.apache.spark.util.ListenerBus.postToAll$(ListenerBus.scala:102)
at org.apache.spark.scheduler.AsyncEventQueue.super$postToAll(AsyncEventQueue.scala:114)
at org.apache.spark.scheduler.AsyncEventQueue.$anonfun$dispatch$1(AsyncEventQueue.scala:114)
at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:109)
at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.$anonfun$run$1(AsyncEventQueue.scala:105)
at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1625)
at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.run(AsyncEventQueue.scala:105)
23/08/10 14:56:41 INFO DriverCorral$: ReplId-211ee-e6538-f01c6-3 successfully discarded
23/08/10 14:56:41 INFO DatabricksStreamingQueryListener: Query termination received for [id=41f2efdd-1276-4ea5-a254-b4d34e50160e, runId=c6d82b79-52b7-444b-b010-53db54306ed4]
23/08/10 14:56:41 INFO Executor: Executor interrupted and killed task 0.0 in stage 434.0 (TID 1087), reason: Stage cancelled: Job 213 cancelled part of cancelled job group c6d82b79-52b7-444b-b010-53db54306ed4
23/08/10 14:56:41 ERROR StreamingQueryListenerBus: Listener PythonStreamingQueryListenerWrapper threw an exception
py4j.Py4JException: Cannot obtain a new communication channel
at py4j.CallbackClient.sendCommand(CallbackClient.java:380)
at py4j.CallbackClient.sendCommand(CallbackClient.java:356)
at py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:106)
at com.sun.proxy.$Proxy156.onQueryTerminated(Unknown Source)
at org.apache.spark.sql.streaming.PythonStreamingQueryListenerWrapper.onQueryTerminated(StreamingQueryListener.scala:114)
at org.apache.spark.sql.execution.streaming.StreamingQueryListenerBus.doPostEvent(StreamingQueryListenerBus.scala:150)
at org.apache.spark.sql.execution.streaming.StreamingQueryListenerBus.doPostEvent(StreamingQueryListenerBus.scala:43)
at org.apache.spark.util.ListenerBus.postToAll(ListenerBus.scala:118)
at org.apache.spark.util.ListenerBus.postToAll$(ListenerBus.scala:102)
at org.apache.spark.sql.execution.streaming.StreamingQueryListenerBus.postToAll(StreamingQueryListenerBus.scala:91)
at org.apache.spark.sql.execution.streaming.StreamingQueryListenerBus.onOtherEvent(StreamingQueryListenerBus.scala:111)
at org.apache.spark.scheduler.SparkListenerBus.doPostEvent(SparkListenerBus.scala:102)
at org.apache.spark.scheduler.SparkListenerBus.doPostEvent$(SparkListenerBus.scala:28)
at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:42)
at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:42)
at org.apache.spark.util.ListenerBus.postToAll(ListenerBus.scala:118)
at org.apache.spark.util.ListenerBus.postToAll$(ListenerBus.scala:102)
at org.apache.spark.scheduler.AsyncEventQueue.super$postToAll(AsyncEventQueue.scala:114)
at org.apache.spark.scheduler.AsyncEventQueue.$anonfun$dispatch$1(AsyncEventQueue.scala:114)
at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:109)
at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.$anonfun$run$1(AsyncEventQueue.scala:105)
at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1625)
at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.run(AsyncEventQueue.scala:105)
This StackOverflow post describes a very similar issue where a time.sleep(5) just after StreamingQuery.stop() solved the issue.
I don't think this work around would be possible to use in a Databricks Job as I'm never calling StreamingQuery.stop() explicitly in my code but rather cancelling the running job.
My question thus is, how can I ensure that onQueryTerminated has time to complete its execution?
My job is implemented in PySpark using Databricks Runtime 13.2.
08-17-2023 10:50 AM
@Starki - Per documentation,
StreamingQueryListener.onQueryTerminated is called when the query is stopped, e.g., StreamingQuery.stop.
and each of these Python observable APIs work asynchronously.
https://www.databricks.com/blog/2022/05/27/how-to-monitor-streaming-queries-in-pyspark.html
08-17-2023 11:20 PM
This answer is not useful as it does not answer my question nor provide an explanation for the behavior I'm observing.
08-18-2023 10:22 AM
@Starki - Please find the below response on why onTerminationEvent is not triggered
1. Cancelling the streaming query did not result in the termination of the stream properly. sleep of 5 seconds will allow termination of the underlying tasks when the stream is abruptly killed or cancelled, which would eventually result in sending the signal to the onTerminationEvent.
It would be helpful if you add query.awaitAnyTermination() instead of query.stop() along with sleep of 5 seconds when abruptly cancelling.
Please refer below for additional details.
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.
If there isn’t a group near you, start one and help create a community that brings people together.
Request a New Group