cancel
Showing results forĀ 
Search instead forĀ 
Did you mean:Ā 
Data Engineering
cancel
Showing results forĀ 
Search instead forĀ 
Did you mean:Ā 

DatabricksStreamingQueryListener Stopping the stream

DE-cat
New Contributor III

I am running the following structured streaming Scala code in DB 13.3LTS job:

 

		val query = spark.readStream.format("delta")
			.option("ignoreDeletes", "true")
			.option("maxFilesPerTrigger", maxEqlPerBatch)
			.load(tblPath)
			.writeStream
			.queryName(getQueryName)
			.outputMode("append")
			.option("checkpointLocation", runtimePath + "/_checkpoint/stream.json")
			.foreachBatch(process _)
			.start()

		var lastBatchTime = Instant.now
		while (query.isActive) {
			val progress = query.lastProgress
			if (progress != null) {
				if (progress.numInputRows > 0) {
					lastBatchTime = Instant.parse(progress.timestamp)
				} else {
					if (Duration.between(lastBatchTime, Instant.parse(progress.timestamp)).getSeconds > timeoutSeconds) {
						LOGGER.info(s"Stopping Query after inactivity timeout: $timeoutSeconds")
						query.stop
					}
				}
			}
			query.awaitTermination(10000L)
		}

 

which should keep the stream job alive for timeoutSeconds after the last query is processed, but the job gets canceled around 30 minutes after start due to:
DAGScheduler: Asked to cancel job group
ScalaDriverWrapper: Stopping streams for commandId pattern
DatabricksStreamingQueryListener: Stopping the stream

Any help would be appreciated. Thx

1 ACCEPTED SOLUTION

Accepted Solutions

Kaniz
Community Manager
Community Manager

Hi @DE-cat , 

ā€¢ The given code is a structured streaming Scala code that reads data from a Delta table, processes it, and writes the output to a streaming sink.
ā€¢ The job gets cancelled around 30 minutes after starting with error messages like DAGScheduler: Asked to cancel job group, ScalaDriverWrapper: Stopping streams for commanded pattern, and DatabricksStreamingQueryListener: Stopping the stream.


ā€¢ To troubleshoot the issue, follow these steps:


 - Check the logs and error messages for specific reasons mentioned for the job cancellation.
 - Verify if any resource limitations or constraints are causing the cancellation.
 - Review the code for logical errors or issues, especially in the process function called in foreachBatch.
 - Check for external factors like network connectivity or infrastructure issues.
 - Adjust the timeoutSeconds parameter to a higher value to prevent job cancellation.
ā€¢ Without more specific information or error messages, it isn't easy to provide a definitive answer.
ā€¢ Review the logs and error messages in more detail and follow the above troubleshooting steps.

View solution in original post

1 REPLY 1

Kaniz
Community Manager
Community Manager

Hi @DE-cat , 

ā€¢ The given code is a structured streaming Scala code that reads data from a Delta table, processes it, and writes the output to a streaming sink.
ā€¢ The job gets cancelled around 30 minutes after starting with error messages like DAGScheduler: Asked to cancel job group, ScalaDriverWrapper: Stopping streams for commanded pattern, and DatabricksStreamingQueryListener: Stopping the stream.


ā€¢ To troubleshoot the issue, follow these steps:


 - Check the logs and error messages for specific reasons mentioned for the job cancellation.
 - Verify if any resource limitations or constraints are causing the cancellation.
 - Review the code for logical errors or issues, especially in the process function called in foreachBatch.
 - Check for external factors like network connectivity or infrastructure issues.
 - Adjust the timeoutSeconds parameter to a higher value to prevent job cancellation.
ā€¢ Without more specific information or error messages, it isn't easy to provide a definitive answer.
ā€¢ Review the logs and error messages in more detail and follow the above troubleshooting steps.

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.