cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

StreamingQueryListener onQueryTerminated in Databricks Job

Starki
New Contributor III

I am defining a StreamingQueryListener that collects metrics on my Spark Structured Streaming tasks and sends them to a Prometheus Pushgateway.

When the job is terminated, I want to use the onQueryTerminated to cleanup the metrics for each job from the Pushgateway so that Prometheus is not continuously pulling stale metrics for stream jobs that are not running. 

This works perfectly when run in a notebook. When I terminate the stream job, `onQueryTerminated` is called which then successfully deletes the metrics from the pushgateway.

However, when the streams are running as part of a Job on a Job compute cluster and the job is cancelled, `onQueryTerminated` does not to have time to complete its execution and results in an error:

 

 

23/08/10 14:56:41 ERROR StreamingQueryListenerBus: Listener PythonStreamingQueryListenerWrapper threw an exception
py4j.Py4JException: Cannot obtain a new communication channel
	at py4j.CallbackClient.sendCommand(CallbackClient.java:380)
	at py4j.CallbackClient.sendCommand(CallbackClient.java:356)
	at py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:106)
	at com.sun.proxy.$Proxy136.onQueryTerminated(Unknown Source)
	at org.apache.spark.sql.streaming.PythonStreamingQueryListenerWrapper.onQueryTerminated(StreamingQueryListener.scala:114)
	at org.apache.spark.sql.execution.streaming.StreamingQueryListenerBus.doPostEvent(StreamingQueryListenerBus.scala:150)
	at org.apache.spark.sql.execution.streaming.StreamingQueryListenerBus.doPostEvent(StreamingQueryListenerBus.scala:43)
	at org.apache.spark.util.ListenerBus.postToAll(ListenerBus.scala:118)
	at org.apache.spark.util.ListenerBus.postToAll$(ListenerBus.scala:102)
	at org.apache.spark.sql.execution.streaming.StreamingQueryListenerBus.postToAll(StreamingQueryListenerBus.scala:91)
	at org.apache.spark.sql.execution.streaming.StreamingQueryListenerBus.onOtherEvent(StreamingQueryListenerBus.scala:111)
	at org.apache.spark.scheduler.SparkListenerBus.doPostEvent(SparkListenerBus.scala:102)
	at org.apache.spark.scheduler.SparkListenerBus.doPostEvent$(SparkListenerBus.scala:28)
	at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:42)
	at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:42)
	at org.apache.spark.util.ListenerBus.postToAll(ListenerBus.scala:118)
	at org.apache.spark.util.ListenerBus.postToAll$(ListenerBus.scala:102)
	at org.apache.spark.scheduler.AsyncEventQueue.super$postToAll(AsyncEventQueue.scala:114)
	at org.apache.spark.scheduler.AsyncEventQueue.$anonfun$dispatch$1(AsyncEventQueue.scala:114)
	at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
	at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:109)
	at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.$anonfun$run$1(AsyncEventQueue.scala:105)
	at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1625)
	at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.run(AsyncEventQueue.scala:105)
23/08/10 14:56:41 INFO DriverCorral$: ReplId-211ee-e6538-f01c6-3 successfully discarded
23/08/10 14:56:41 INFO DatabricksStreamingQueryListener: Query termination received for [id=41f2efdd-1276-4ea5-a254-b4d34e50160e, runId=c6d82b79-52b7-444b-b010-53db54306ed4]
23/08/10 14:56:41 INFO Executor: Executor interrupted and killed task 0.0 in stage 434.0 (TID 1087), reason: Stage cancelled: Job 213 cancelled part of cancelled job group c6d82b79-52b7-444b-b010-53db54306ed4
23/08/10 14:56:41 ERROR StreamingQueryListenerBus: Listener PythonStreamingQueryListenerWrapper threw an exception
py4j.Py4JException: Cannot obtain a new communication channel
	at py4j.CallbackClient.sendCommand(CallbackClient.java:380)
	at py4j.CallbackClient.sendCommand(CallbackClient.java:356)
	at py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:106)
	at com.sun.proxy.$Proxy156.onQueryTerminated(Unknown Source)
	at org.apache.spark.sql.streaming.PythonStreamingQueryListenerWrapper.onQueryTerminated(StreamingQueryListener.scala:114)
	at org.apache.spark.sql.execution.streaming.StreamingQueryListenerBus.doPostEvent(StreamingQueryListenerBus.scala:150)
	at org.apache.spark.sql.execution.streaming.StreamingQueryListenerBus.doPostEvent(StreamingQueryListenerBus.scala:43)
	at org.apache.spark.util.ListenerBus.postToAll(ListenerBus.scala:118)
	at org.apache.spark.util.ListenerBus.postToAll$(ListenerBus.scala:102)
	at org.apache.spark.sql.execution.streaming.StreamingQueryListenerBus.postToAll(StreamingQueryListenerBus.scala:91)
	at org.apache.spark.sql.execution.streaming.StreamingQueryListenerBus.onOtherEvent(StreamingQueryListenerBus.scala:111)
	at org.apache.spark.scheduler.SparkListenerBus.doPostEvent(SparkListenerBus.scala:102)
	at org.apache.spark.scheduler.SparkListenerBus.doPostEvent$(SparkListenerBus.scala:28)
	at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:42)
	at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:42)
	at org.apache.spark.util.ListenerBus.postToAll(ListenerBus.scala:118)
	at org.apache.spark.util.ListenerBus.postToAll$(ListenerBus.scala:102)
	at org.apache.spark.scheduler.AsyncEventQueue.super$postToAll(AsyncEventQueue.scala:114)
	at org.apache.spark.scheduler.AsyncEventQueue.$anonfun$dispatch$1(AsyncEventQueue.scala:114)
	at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
	at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:109)
	at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.$anonfun$run$1(AsyncEventQueue.scala:105)
	at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1625)
	at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.run(AsyncEventQueue.scala:105)

 

 

This StackOverflow post describes a very similar issue where a time.sleep(5) just after StreamingQuery.stop() solved the issue. 

I don't think this work around would be possible to use in a Databricks Job as I'm never calling StreamingQuery.stop()  explicitly in my code but rather cancelling the running job.

My question thus is, how can I ensure that onQueryTerminated has time to complete its execution?

My job is implemented in PySpark using Databricks Runtime 13.2.

3 REPLIES 3

shan_chandra
Databricks Employee
Databricks Employee

@Starki  - Per documentation, 

StreamingQueryListener.onQueryTerminated is called when the query is stopped, e.g., StreamingQuery.stop.

and each of these Python observable APIs work asynchronously. 

https://www.databricks.com/blog/2022/05/27/how-to-monitor-streaming-queries-in-pyspark.html

Starki
New Contributor III

This answer is not useful as it does not answer my question nor provide an explanation for the behavior I'm observing. 

shan_chandra
Databricks Employee
Databricks Employee

@Starki  - Please find the below response on why onTerminationEvent is not triggered

1. Cancelling the streaming query did not result in the termination of the stream properly. sleep of 5 seconds will allow termination of the underlying tasks when the stream is abruptly killed or cancelled, which would eventually result in sending the signal to the onTerminationEvent. 

It would be helpful if you add query.awaitAnyTermination() instead of query.stop() along with sleep of 5 seconds when abruptly cancelling. 

Please refer below for additional details. 

Reference: https://jaceklaskowski.gitbooks.io/spark-structured-streaming/content/spark-sql-streaming-StreamingQ...