<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic Re: StreamingQueryListener onQueryTerminated in Databricks Job in Data Engineering</title>
    <link>https://community.databricks.com/t5/data-engineering/streamingquerylistener-onqueryterminated-in-databricks-job/m-p/40484#M27211</link>
    <description>&lt;P&gt;&lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/48859"&gt;@Starki&lt;/a&gt;&amp;nbsp; - Please find the below response on why onTerminationEvent is not triggered&lt;/P&gt;&lt;P&gt;1. Cancelling the streaming query did not result in the termination of the stream properly. sleep of 5 seconds will allow termination of the underlying tasks when the stream is abruptly killed or cancelled, which would eventually result in sending the signal to the onTerminationEvent.&amp;nbsp;&lt;/P&gt;&lt;P&gt;It would be helpful if you add &lt;STRONG&gt;query.awaitAnyTermination()&lt;/STRONG&gt; instead of &lt;STRONG&gt;query.stop()&lt;/STRONG&gt; along with sleep of 5 seconds when abruptly cancelling.&amp;nbsp;&lt;/P&gt;&lt;P&gt;Please refer below for additional details.&amp;nbsp;&lt;/P&gt;&lt;P&gt;Reference:&amp;nbsp;&lt;A href="https://jaceklaskowski.gitbooks.io/spark-structured-streaming/content/spark-sql-streaming-StreamingQueryManager.html" target="_blank"&gt;https://jaceklaskowski.gitbooks.io/spark-structured-streaming/content/spark-sql-streaming-StreamingQueryManager.html&lt;/A&gt;&lt;/P&gt;</description>
    <pubDate>Fri, 18 Aug 2023 17:22:35 GMT</pubDate>
    <dc:creator>shan_chandra</dc:creator>
    <dc:date>2023-08-18T17:22:35Z</dc:date>
    <item>
      <title>StreamingQueryListener onQueryTerminated in Databricks Job</title>
      <link>https://community.databricks.com/t5/data-engineering/streamingquerylistener-onqueryterminated-in-databricks-job/m-p/39919#M27094</link>
      <description>&lt;P&gt;&lt;SPAN&gt;I am defining a &lt;/SPAN&gt;StreamingQueryListener&lt;SPAN&gt;&amp;nbsp;that collects metrics on my Spark Structured Streaming tasks and sends them to a Prometheus Pushgateway.&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;When the job is terminated, I want to use the &lt;/SPAN&gt;onQueryTerminated&lt;SPAN&gt;&amp;nbsp;to cleanup the metrics for each job from the Pushgateway so that Prometheus is not continuously pulling stale metrics for stream jobs that are not running.&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;This works perfectly when run in a notebook. When I terminate the stream job, `onQueryTerminated` is called which then successfully deletes the metrics from the pushgateway.&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;However, when the streams are running as part of a Job on a Job compute cluster and the job is cancelled, `onQueryTerminated` does not to have time to complete its execution and results in an error:&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;LI-CODE lang="markup"&gt;23/08/10 14:56:41 ERROR StreamingQueryListenerBus: Listener PythonStreamingQueryListenerWrapper threw an exception
py4j.Py4JException: Cannot obtain a new communication channel
	at py4j.CallbackClient.sendCommand(CallbackClient.java:380)
	at py4j.CallbackClient.sendCommand(CallbackClient.java:356)
	at py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:106)
	at com.sun.proxy.$Proxy136.onQueryTerminated(Unknown Source)
	at org.apache.spark.sql.streaming.PythonStreamingQueryListenerWrapper.onQueryTerminated(StreamingQueryListener.scala:114)
	at org.apache.spark.sql.execution.streaming.StreamingQueryListenerBus.doPostEvent(StreamingQueryListenerBus.scala:150)
	at org.apache.spark.sql.execution.streaming.StreamingQueryListenerBus.doPostEvent(StreamingQueryListenerBus.scala:43)
	at org.apache.spark.util.ListenerBus.postToAll(ListenerBus.scala:118)
	at org.apache.spark.util.ListenerBus.postToAll$(ListenerBus.scala:102)
	at org.apache.spark.sql.execution.streaming.StreamingQueryListenerBus.postToAll(StreamingQueryListenerBus.scala:91)
	at org.apache.spark.sql.execution.streaming.StreamingQueryListenerBus.onOtherEvent(StreamingQueryListenerBus.scala:111)
	at org.apache.spark.scheduler.SparkListenerBus.doPostEvent(SparkListenerBus.scala:102)
	at org.apache.spark.scheduler.SparkListenerBus.doPostEvent$(SparkListenerBus.scala:28)
	at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:42)
	at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:42)
	at org.apache.spark.util.ListenerBus.postToAll(ListenerBus.scala:118)
	at org.apache.spark.util.ListenerBus.postToAll$(ListenerBus.scala:102)
	at org.apache.spark.scheduler.AsyncEventQueue.super$postToAll(AsyncEventQueue.scala:114)
	at org.apache.spark.scheduler.AsyncEventQueue.$anonfun$dispatch$1(AsyncEventQueue.scala:114)
	at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
	at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:109)
	at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.$anonfun$run$1(AsyncEventQueue.scala:105)
	at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1625)
	at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.run(AsyncEventQueue.scala:105)
23/08/10 14:56:41 INFO DriverCorral$: ReplId-211ee-e6538-f01c6-3 successfully discarded
23/08/10 14:56:41 INFO DatabricksStreamingQueryListener: Query termination received for [id=41f2efdd-1276-4ea5-a254-b4d34e50160e, runId=c6d82b79-52b7-444b-b010-53db54306ed4]
23/08/10 14:56:41 INFO Executor: Executor interrupted and killed task 0.0 in stage 434.0 (TID 1087), reason: Stage cancelled: Job 213 cancelled part of cancelled job group c6d82b79-52b7-444b-b010-53db54306ed4
23/08/10 14:56:41 ERROR StreamingQueryListenerBus: Listener PythonStreamingQueryListenerWrapper threw an exception
py4j.Py4JException: Cannot obtain a new communication channel
	at py4j.CallbackClient.sendCommand(CallbackClient.java:380)
	at py4j.CallbackClient.sendCommand(CallbackClient.java:356)
	at py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:106)
	at com.sun.proxy.$Proxy156.onQueryTerminated(Unknown Source)
	at org.apache.spark.sql.streaming.PythonStreamingQueryListenerWrapper.onQueryTerminated(StreamingQueryListener.scala:114)
	at org.apache.spark.sql.execution.streaming.StreamingQueryListenerBus.doPostEvent(StreamingQueryListenerBus.scala:150)
	at org.apache.spark.sql.execution.streaming.StreamingQueryListenerBus.doPostEvent(StreamingQueryListenerBus.scala:43)
	at org.apache.spark.util.ListenerBus.postToAll(ListenerBus.scala:118)
	at org.apache.spark.util.ListenerBus.postToAll$(ListenerBus.scala:102)
	at org.apache.spark.sql.execution.streaming.StreamingQueryListenerBus.postToAll(StreamingQueryListenerBus.scala:91)
	at org.apache.spark.sql.execution.streaming.StreamingQueryListenerBus.onOtherEvent(StreamingQueryListenerBus.scala:111)
	at org.apache.spark.scheduler.SparkListenerBus.doPostEvent(SparkListenerBus.scala:102)
	at org.apache.spark.scheduler.SparkListenerBus.doPostEvent$(SparkListenerBus.scala:28)
	at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:42)
	at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:42)
	at org.apache.spark.util.ListenerBus.postToAll(ListenerBus.scala:118)
	at org.apache.spark.util.ListenerBus.postToAll$(ListenerBus.scala:102)
	at org.apache.spark.scheduler.AsyncEventQueue.super$postToAll(AsyncEventQueue.scala:114)
	at org.apache.spark.scheduler.AsyncEventQueue.$anonfun$dispatch$1(AsyncEventQueue.scala:114)
	at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
	at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:109)
	at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.$anonfun$run$1(AsyncEventQueue.scala:105)
	at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1625)
	at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.run(AsyncEventQueue.scala:105)&lt;/LI-CODE&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;&lt;A href="https://stackoverflow.com/questions/76439146/pyspark-streamingquerylistener-queryterminatedevent-not-fired-when-using-delta-t" target="_blank" rel="noopener"&gt;This StackOverflow post&lt;/A&gt; describes a very similar issue where a time.sleep(5) just after StreamingQuery.stop() solved the issue.&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;I don't think this work around would be possible to use in a Databricks Job as I'm never calling&amp;nbsp;StreamingQuery.stop() &amp;nbsp;explicitly in my code but rather cancelling the running job.&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;My question thus is, how can I ensure that onQueryTerminated has time to complete its execution?&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;My job is implemented in PySpark using Databricks Runtime 13.2.&lt;/P&gt;</description>
      <pubDate>Tue, 15 Aug 2023 11:42:08 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/streamingquerylistener-onqueryterminated-in-databricks-job/m-p/39919#M27094</guid>
      <dc:creator>Starki</dc:creator>
      <dc:date>2023-08-15T11:42:08Z</dc:date>
    </item>
    <item>
      <title>Re: StreamingQueryListener onQueryTerminated in Databricks Job</title>
      <link>https://community.databricks.com/t5/data-engineering/streamingquerylistener-onqueryterminated-in-databricks-job/m-p/40273#M27175</link>
      <description>&lt;P&gt;&lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/48859"&gt;@Starki&lt;/a&gt;&amp;nbsp; - Per documentation,&amp;nbsp;&lt;/P&gt;&lt;P&gt;StreamingQueryListener.onQueryTerminated&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;is called when the query is stopped, e.g.,&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;StreamingQuery.stop.&lt;/P&gt;&lt;P&gt;and each of these Python observable APIs work asynchronously.&amp;nbsp;&lt;/P&gt;&lt;P&gt;&lt;A href="https://www.databricks.com/blog/2022/05/27/how-to-monitor-streaming-queries-in-pyspark.html" target="_blank" rel="noopener"&gt;https://www.databricks.com/blog/2022/05/27/how-to-monitor-streaming-queries-in-pyspark.html&lt;/A&gt;&lt;/P&gt;</description>
      <pubDate>Thu, 17 Aug 2023 17:50:04 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/streamingquerylistener-onqueryterminated-in-databricks-job/m-p/40273#M27175</guid>
      <dc:creator>shan_chandra</dc:creator>
      <dc:date>2023-08-17T17:50:04Z</dc:date>
    </item>
    <item>
      <title>Re: StreamingQueryListener onQueryTerminated in Databricks Job</title>
      <link>https://community.databricks.com/t5/data-engineering/streamingquerylistener-onqueryterminated-in-databricks-job/m-p/40393#M27192</link>
      <description>&lt;P&gt;This answer is not useful as it does not answer my question nor provide an explanation for the behavior I'm observing.&amp;nbsp;&lt;/P&gt;</description>
      <pubDate>Fri, 18 Aug 2023 06:20:39 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/streamingquerylistener-onqueryterminated-in-databricks-job/m-p/40393#M27192</guid>
      <dc:creator>Starki</dc:creator>
      <dc:date>2023-08-18T06:20:39Z</dc:date>
    </item>
    <item>
      <title>Re: StreamingQueryListener onQueryTerminated in Databricks Job</title>
      <link>https://community.databricks.com/t5/data-engineering/streamingquerylistener-onqueryterminated-in-databricks-job/m-p/40484#M27211</link>
      <description>&lt;P&gt;&lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/48859"&gt;@Starki&lt;/a&gt;&amp;nbsp; - Please find the below response on why onTerminationEvent is not triggered&lt;/P&gt;&lt;P&gt;1. Cancelling the streaming query did not result in the termination of the stream properly. sleep of 5 seconds will allow termination of the underlying tasks when the stream is abruptly killed or cancelled, which would eventually result in sending the signal to the onTerminationEvent.&amp;nbsp;&lt;/P&gt;&lt;P&gt;It would be helpful if you add &lt;STRONG&gt;query.awaitAnyTermination()&lt;/STRONG&gt; instead of &lt;STRONG&gt;query.stop()&lt;/STRONG&gt; along with sleep of 5 seconds when abruptly cancelling.&amp;nbsp;&lt;/P&gt;&lt;P&gt;Please refer below for additional details.&amp;nbsp;&lt;/P&gt;&lt;P&gt;Reference:&amp;nbsp;&lt;A href="https://jaceklaskowski.gitbooks.io/spark-structured-streaming/content/spark-sql-streaming-StreamingQueryManager.html" target="_blank"&gt;https://jaceklaskowski.gitbooks.io/spark-structured-streaming/content/spark-sql-streaming-StreamingQueryManager.html&lt;/A&gt;&lt;/P&gt;</description>
      <pubDate>Fri, 18 Aug 2023 17:22:35 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/streamingquerylistener-onqueryterminated-in-databricks-job/m-p/40484#M27211</guid>
      <dc:creator>shan_chandra</dc:creator>
      <dc:date>2023-08-18T17:22:35Z</dc:date>
    </item>
  </channel>
</rss>

