I am running the following structured streaming Scala code in DB 13.3LTS job:
val query = spark.readStream.format("delta")
.option("ignoreDeletes", "true")
.option("maxFilesPerTrigger", maxEqlPerBatch)
.load(tblPath)
.writeStream
.queryName(getQueryName)
.outputMode("append")
.option("checkpointLocation", runtimePath + "/_checkpoint/stream.json")
.foreachBatch(process _)
.start()
var lastBatchTime = Instant.now
while (query.isActive) {
val progress = query.lastProgress
if (progress != null) {
if (progress.numInputRows > 0) {
lastBatchTime = Instant.parse(progress.timestamp)
} else {
if (Duration.between(lastBatchTime, Instant.parse(progress.timestamp)).getSeconds > timeoutSeconds) {
LOGGER.info(s"Stopping Query after inactivity timeout: $timeoutSeconds")
query.stop
}
}
}
query.awaitTermination(10000L)
}
which should keep the stream job alive for timeoutSeconds after the last query is processed, but the job gets canceled around 30 minutes after start due to:
DAGScheduler: Asked to cancel job group
ScalaDriverWrapper: Stopping streams for commandId pattern
DatabricksStreamingQueryListener: Stopping the stream
Any help would be appreciated. Thx