@PiotrM thanks for your question! Adding to @Alberto_Umana comment, could you please clarify what do you mean with: "I tried thing like spark.task.reaper.killTimeout, but it seems like UC clusters won't accept it." ?
Is it throwing an error or is it just that it doesn't meet your expectations of killing the task?
This property will essentially be used to specify a timeout after which the executor JVM will kill itself if a killed task has not stopped running. This setting is part of the task reaper configurations, which are used to monitor and manage killed or interrupted tasks, and needs spark.task.reaper.enabled to be set to "true". So, only in scenarios where queries are canceled, killed or interrupted but the tasks do not exit in a timely fashion, then the task reaper ensures that such tasks do not continue running indefinitely by enforcing the killTimeout.
With "The issue we're facing are exploratory queries from those clusters that query SQL Server DBs through Lakehouse Federation. It occurred that those queries got stuck and clusters were finally terminated with the nightly clean-up.", do you mean to say it was in a "deadlock" (or similar) state?
It is a bit tricky to detect whether a task is indeed stuck or just very slowly progressing, or even in zombie state. So, handling such scenario is not always straightforward, in your case though you may want to try out something like this:
from concurrent.futures import ThreadPoolExecutor, TimeoutError
from pyspark.sql import SparkSession
# Initialize Spark Session
spark = SparkSession.builder.appName("Timeout Example").getOrCreate()
def run_query():
spark.sql("SELECT * FROM my_table WHERE ...").collect()
executor = ThreadPoolExecutor(max_workers=1)
future = executor.submit(run_query)
try:
result = future.result(timeout=10) # Timeout in seconds
except TimeoutError:
print("Query timed out and will be terminated.")
future.cancel()
Or in Scala:
import scala.concurrent.{Await, Future}
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
val futureQuery = Future {
spark.sql("SELECT * FROM my_table WHERE ...").collect()
}
try {
Await.result(futureQuery, 10.seconds)
} catch {
case _: TimeoutException => println("Query timed out and will be terminated.")
}
Monitoring all queries in the cluster is much more complicated, as you would required to have a global way to monitor all queries, e.g.:
import org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart
import org.apache.spark.scheduler.SparkListener
import org.apache.spark.scheduler.SparkListenerJobEnd
val sparkListener = new SparkListener {
override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = {
val runtime = jobEnd.time - jobEnd.startTime
if (runtime > 10000) { // 10 seconds in milliseconds
spark.sparkContext.cancelJob(jobEnd.jobId)
}
}
}
spark.sparkContext.addSparkListener(sparkListener)
Or a much simpler way, using REST API via jobs/runs/list, identify the active query and their run_id, and then similarly job/runs/cancel it, but this will terminate the job run, not the query.
If the query is executed via JDBC, then you may set the queryTimeout. Or handle the query termination as well from the target Database maybe? configure timeouts at the database connection or query level in SQL Server.
These are just some ideas, please review them, should you need more specific advice and help with troubleshooting (e.g.: to understand why is the query going hung), please raise a support ticket for better guidance.