<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic DatabricksStreamingQueryListener Stopping the stream in Data Engineering</title>
    <link>https://community.databricks.com/t5/data-engineering/databricksstreamingquerylistener-stopping-the-stream/m-p/44395#M27645</link>
    <description>&lt;P&gt;I am running the following structured streaming Scala code in DB 13.3LTS job:&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;LI-CODE lang="java"&gt;		val query = spark.readStream.format("delta")
			.option("ignoreDeletes", "true")
			.option("maxFilesPerTrigger", maxEqlPerBatch)
			.load(tblPath)
			.writeStream
			.queryName(getQueryName)
			.outputMode("append")
			.option("checkpointLocation", runtimePath + "/_checkpoint/stream.json")
			.foreachBatch(process _)
			.start()

		var lastBatchTime = Instant.now
		while (query.isActive) {
			val progress = query.lastProgress
			if (progress != null) {
				if (progress.numInputRows &amp;gt; 0) {
					lastBatchTime = Instant.parse(progress.timestamp)
				} else {
					if (Duration.between(lastBatchTime, Instant.parse(progress.timestamp)).getSeconds &amp;gt; timeoutSeconds) {
						LOGGER.info(s"Stopping Query after inactivity timeout: $timeoutSeconds")
						query.stop
					}
				}
			}
			query.awaitTermination(10000L)
		}&lt;/LI-CODE&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;which should keep the stream job alive for timeoutSeconds after the last query is processed, but the job gets canceled around 30 minutes after start due to:&lt;BR /&gt;DAGScheduler: Asked to cancel job group&lt;BR /&gt;ScalaDriverWrapper: Stopping streams for commandId pattern&lt;BR /&gt;DatabricksStreamingQueryListener: Stopping the stream&lt;/P&gt;&lt;P&gt;Any help would be appreciated. Thx&lt;/P&gt;</description>
    <pubDate>Mon, 11 Sep 2023 15:35:48 GMT</pubDate>
    <dc:creator>DE-cat</dc:creator>
    <dc:date>2023-09-11T15:35:48Z</dc:date>
    <item>
      <title>DatabricksStreamingQueryListener Stopping the stream</title>
      <link>https://community.databricks.com/t5/data-engineering/databricksstreamingquerylistener-stopping-the-stream/m-p/44395#M27645</link>
      <description>&lt;P&gt;I am running the following structured streaming Scala code in DB 13.3LTS job:&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;LI-CODE lang="java"&gt;		val query = spark.readStream.format("delta")
			.option("ignoreDeletes", "true")
			.option("maxFilesPerTrigger", maxEqlPerBatch)
			.load(tblPath)
			.writeStream
			.queryName(getQueryName)
			.outputMode("append")
			.option("checkpointLocation", runtimePath + "/_checkpoint/stream.json")
			.foreachBatch(process _)
			.start()

		var lastBatchTime = Instant.now
		while (query.isActive) {
			val progress = query.lastProgress
			if (progress != null) {
				if (progress.numInputRows &amp;gt; 0) {
					lastBatchTime = Instant.parse(progress.timestamp)
				} else {
					if (Duration.between(lastBatchTime, Instant.parse(progress.timestamp)).getSeconds &amp;gt; timeoutSeconds) {
						LOGGER.info(s"Stopping Query after inactivity timeout: $timeoutSeconds")
						query.stop
					}
				}
			}
			query.awaitTermination(10000L)
		}&lt;/LI-CODE&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;which should keep the stream job alive for timeoutSeconds after the last query is processed, but the job gets canceled around 30 minutes after start due to:&lt;BR /&gt;DAGScheduler: Asked to cancel job group&lt;BR /&gt;ScalaDriverWrapper: Stopping streams for commandId pattern&lt;BR /&gt;DatabricksStreamingQueryListener: Stopping the stream&lt;/P&gt;&lt;P&gt;Any help would be appreciated. Thx&lt;/P&gt;</description>
      <pubDate>Mon, 11 Sep 2023 15:35:48 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/databricksstreamingquerylistener-stopping-the-stream/m-p/44395#M27645</guid>
      <dc:creator>DE-cat</dc:creator>
      <dc:date>2023-09-11T15:35:48Z</dc:date>
    </item>
  </channel>
</rss>

