cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

StreamingQueryListener onQueryTerminated in Databricks Job

Starki
New Contributor II

I am defining a StreamingQueryListener that collects metrics on my Spark Structured Streaming tasks and sends them to a Prometheus Pushgateway.

When the job is terminated, I want to use the onQueryTerminated to cleanup the metrics for each job from the Pushgateway so that Prometheus is not continuously pulling stale metrics for stream jobs that are not running. 

This works perfectly when run in a notebook. When I terminate the stream job, `onQueryTerminated` is called which then successfully deletes the metrics from the pushgateway.

However, when the streams are running as part of a Job on a Job compute cluster and the job is cancelled, `onQueryTerminated` does not to have time to complete its execution and results in an error:

 

 

23/08/10 14:56:41 ERROR StreamingQueryListenerBus: Listener PythonStreamingQueryListenerWrapper threw an exception
py4j.Py4JException: Cannot obtain a new communication channel
	at py4j.CallbackClient.sendCommand(CallbackClient.java:380)
	at py4j.CallbackClient.sendCommand(CallbackClient.java:356)
	at py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:106)
	at com.sun.proxy.$Proxy136.onQueryTerminated(Unknown Source)
	at org.apache.spark.sql.streaming.PythonStreamingQueryListenerWrapper.onQueryTerminated(StreamingQueryListener.scala:114)
	at org.apache.spark.sql.execution.streaming.StreamingQueryListenerBus.doPostEvent(StreamingQueryListenerBus.scala:150)
	at org.apache.spark.sql.execution.streaming.StreamingQueryListenerBus.doPostEvent(StreamingQueryListenerBus.scala:43)
	at org.apache.spark.util.ListenerBus.postToAll(ListenerBus.scala:118)
	at org.apache.spark.util.ListenerBus.postToAll$(ListenerBus.scala:102)
	at org.apache.spark.sql.execution.streaming.StreamingQueryListenerBus.postToAll(StreamingQueryListenerBus.scala:91)
	at org.apache.spark.sql.execution.streaming.StreamingQueryListenerBus.onOtherEvent(StreamingQueryListenerBus.scala:111)
	at org.apache.spark.scheduler.SparkListenerBus.doPostEvent(SparkListenerBus.scala:102)
	at org.apache.spark.scheduler.SparkListenerBus.doPostEvent$(SparkListenerBus.scala:28)
	at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:42)
	at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:42)
	at org.apache.spark.util.ListenerBus.postToAll(ListenerBus.scala:118)
	at org.apache.spark.util.ListenerBus.postToAll$(ListenerBus.scala:102)
	at org.apache.spark.scheduler.AsyncEventQueue.super$postToAll(AsyncEventQueue.scala:114)
	at org.apache.spark.scheduler.AsyncEventQueue.$anonfun$dispatch$1(AsyncEventQueue.scala:114)
	at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
	at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:109)
	at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.$anonfun$run$1(AsyncEventQueue.scala:105)
	at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1625)
	at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.run(AsyncEventQueue.scala:105)
23/08/10 14:56:41 INFO DriverCorral$: ReplId-211ee-e6538-f01c6-3 successfully discarded
23/08/10 14:56:41 INFO DatabricksStreamingQueryListener: Query termination received for [id=41f2efdd-1276-4ea5-a254-b4d34e50160e, runId=c6d82b79-52b7-444b-b010-53db54306ed4]
23/08/10 14:56:41 INFO Executor: Executor interrupted and killed task 0.0 in stage 434.0 (TID 1087), reason: Stage cancelled: Job 213 cancelled part of cancelled job group c6d82b79-52b7-444b-b010-53db54306ed4
23/08/10 14:56:41 ERROR StreamingQueryListenerBus: Listener PythonStreamingQueryListenerWrapper threw an exception
py4j.Py4JException: Cannot obtain a new communication channel
	at py4j.CallbackClient.sendCommand(CallbackClient.java:380)
	at py4j.CallbackClient.sendCommand(CallbackClient.java:356)
	at py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:106)
	at com.sun.proxy.$Proxy156.onQueryTerminated(Unknown Source)
	at org.apache.spark.sql.streaming.PythonStreamingQueryListenerWrapper.onQueryTerminated(StreamingQueryListener.scala:114)
	at org.apache.spark.sql.execution.streaming.StreamingQueryListenerBus.doPostEvent(StreamingQueryListenerBus.scala:150)
	at org.apache.spark.sql.execution.streaming.StreamingQueryListenerBus.doPostEvent(StreamingQueryListenerBus.scala:43)
	at org.apache.spark.util.ListenerBus.postToAll(ListenerBus.scala:118)
	at org.apache.spark.util.ListenerBus.postToAll$(ListenerBus.scala:102)
	at org.apache.spark.sql.execution.streaming.StreamingQueryListenerBus.postToAll(StreamingQueryListenerBus.scala:91)
	at org.apache.spark.sql.execution.streaming.StreamingQueryListenerBus.onOtherEvent(StreamingQueryListenerBus.scala:111)
	at org.apache.spark.scheduler.SparkListenerBus.doPostEvent(SparkListenerBus.scala:102)
	at org.apache.spark.scheduler.SparkListenerBus.doPostEvent$(SparkListenerBus.scala:28)
	at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:42)
	at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:42)
	at org.apache.spark.util.ListenerBus.postToAll(ListenerBus.scala:118)
	at org.apache.spark.util.ListenerBus.postToAll$(ListenerBus.scala:102)
	at org.apache.spark.scheduler.AsyncEventQueue.super$postToAll(AsyncEventQueue.scala:114)
	at org.apache.spark.scheduler.AsyncEventQueue.$anonfun$dispatch$1(AsyncEventQueue.scala:114)
	at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
	at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:109)
	at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.$anonfun$run$1(AsyncEventQueue.scala:105)
	at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1625)
	at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.run(AsyncEventQueue.scala:105)

 

 

This StackOverflow post describes a very similar issue where a time.sleep(5) just after StreamingQuery.stop() solved the issue. 

I don't think this work around would be possible to use in a Databricks Job as I'm never calling StreamingQuery.stop()  explicitly in my code but rather cancelling the running job.

My question thus is, how can I ensure that onQueryTerminated has time to complete its execution?

My job is implemented in PySpark using Databricks Runtime 13.2.

3 REPLIES 3

shan_chandra
Honored Contributor III
Honored Contributor III

@Starki  - Per documentation, 

StreamingQueryListener.onQueryTerminated is called when the query is stopped, e.g., StreamingQuery.stop.

and each of these Python observable APIs work asynchronously. 

https://www.databricks.com/blog/2022/05/27/how-to-monitor-streaming-queries-in-pyspark.html

Starki
New Contributor II

This answer is not useful as it does not answer my question nor provide an explanation for the behavior I'm observing. 

shan_chandra
Honored Contributor III
Honored Contributor III

@Starki  - Please find the below response on why onTerminationEvent is not triggered

1. Cancelling the streaming query did not result in the termination of the stream properly. sleep of 5 seconds will allow termination of the underlying tasks when the stream is abruptly killed or cancelled, which would eventually result in sending the signal to the onTerminationEvent. 

It would be helpful if you add query.awaitAnyTermination() instead of query.stop() along with sleep of 5 seconds when abruptly cancelling. 

Please refer below for additional details. 

Reference: https://jaceklaskowski.gitbooks.io/spark-structured-streaming/content/spark-sql-streaming-StreamingQ...

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.