cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

Canceling long running on UC-enabled all-purpose clusters

PiotrM
New Contributor II

Hey, 

as in the subject. Is it possible to set timeout for long running queries on all-purpose clusters that are UC enabled? 

I know there is such setting for SQL Warehouses and Workflows, but I was unable to find one for all-purpose clusters. The issue we're facing are exploratory queries from those clusters that query SQL Server DBs through Lakehouse Federation. It occurred that those queries got stuck and clusters were finally terminated with the nightly clean-up. 

I tried thing like spark.task.reaper.killTimeout, but it seems like UC clusters won't accept it. 

2 REPLIES 2

Alberto_Umana
Databricks Employee
Databricks Employee

Hi @PiotrM,

Currently, there is no specific setting to directly set a timeout for long-running queries on all-purpose clusters that are Unity Catalog (UC) enabled

As a workaround, you might consider implementing custom monitoring and termination logic within your application or scripts to detect and handle long-running queries. This could involve periodically checking the status of queries and terminating those that exceed a certain duration

VZLA
Databricks Employee
Databricks Employee

@PiotrM thanks for your question! Adding to @Alberto_Umana comment, could you please clarify what do you mean with: "I tried thing like spark.task.reaper.killTimeout, but it seems like UC clusters won't accept it." ?

Is it throwing an error or is it just that it doesn't meet your expectations of killing the task?

This property will essentially be used to specify a timeout after which the executor JVM will kill itself if a killed task has not stopped running. This setting is part of the task reaper configurations, which are used to monitor and manage killed or interrupted tasks, and needs spark.task.reaper.enabled to be set to "true". So, only in scenarios where queries are canceled, killed or interrupted but the tasks do not exit in a timely fashion, then the task reaper ensures that such tasks do not continue running indefinitely by enforcing the killTimeout.

With "The issue we're facing are exploratory queries from those clusters that query SQL Server DBs through Lakehouse Federation. It occurred that those queries got stuck and clusters were finally terminated with the nightly clean-up.", do you mean to say it was in a "deadlock" (or similar) state? 

It is a bit tricky to detect whether a task is indeed stuck or just very slowly progressing, or even in zombie state. So, handling such scenario is not always straightforward, in your case though you may want to try out something like this:

from concurrent.futures import ThreadPoolExecutor, TimeoutError
from pyspark.sql import SparkSession

# Initialize Spark Session
spark = SparkSession.builder.appName("Timeout Example").getOrCreate()

def run_query():
    spark.sql("SELECT * FROM my_table WHERE ...").collect()

executor = ThreadPoolExecutor(max_workers=1)
future = executor.submit(run_query)

try:
    result = future.result(timeout=10)  # Timeout in seconds
except TimeoutError:
    print("Query timed out and will be terminated.")
    future.cancel()

Or in Scala:

import scala.concurrent.{Await, Future}
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global

val futureQuery = Future {
  spark.sql("SELECT * FROM my_table WHERE ...").collect()
}

try {
  Await.result(futureQuery, 10.seconds)
} catch {
  case _: TimeoutException => println("Query timed out and will be terminated.")
}

Monitoring all queries in the cluster is much more complicated, as you would required to have a global way to monitor all queries, e.g.:

import org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart
import org.apache.spark.scheduler.SparkListener
import org.apache.spark.scheduler.SparkListenerJobEnd

val sparkListener = new SparkListener {
  override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = {
    val runtime = jobEnd.time - jobEnd.startTime
    if (runtime > 10000) { // 10 seconds in milliseconds
      spark.sparkContext.cancelJob(jobEnd.jobId)
    }
  }
}

spark.sparkContext.addSparkListener(sparkListener)

Or a much simpler way, using REST API via jobs/runs/list, identify the active query and their run_id, and then similarly job/runs/cancel it, but this will terminate the job run, not the query.

If the query is executed via JDBC, then you may set the queryTimeout. Or handle the query termination as well from the target Database maybe? configure timeouts at the database connection or query level in SQL Server.

These are just some ideas, please review them, should you need more specific advice and help with troubleshooting (e.g.: to understand why is the query going hung), please raise a support ticket for better guidance.

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโ€™t want to miss the chance to attend and share knowledge.

If there isnโ€™t a group near you, start one and help create a community that brings people together.

Request a New Group