Hello,
I want to execute a custom code onApplicationEnd. Outisde Databricks, I have used the Spark Listener onApplicationEnd without problems.
But it is not working on Databricks (I tried listener onJobEnd and this one worked).
I have also tried Spark plugin, and the logs from the Driver in the init() method are there, but not the ones of the shutdown:
package com.baikal
import java.util.{Map => JMap}
import org.apache.spark.SparkContext
import org.apache.spark.api.plugin.{DriverPlugin, ExecutorPlugin, PluginContext, SparkPlugin}
import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
import org.slf4j.LoggerFactory
import scala.collection.JavaConverters._
object Hello {
def main(args: Array[String]): Unit = {
val listenerEnd = new OnApplicationEnd
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder()
.appName("Spark")
.getOrCreate();
spark.sparkContext.addSparkListener(listenerEnd)
spark.sparkContext.parallelize(Array(1,2,3,4,5,6,7,8,9,10)).count()
}
}
class OnApplicationEnd extends SparkListener {
lazy val logger = LoggerFactory.getLogger(getClass)
override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
logger.info("Hellooooooooooooooo")
println("Hellooooooooooooooo")
throw new Exception("FAILEEEEEED")
}
}
class DemoPlugin extends SparkPlugin {
lazy val logger = LoggerFactory.getLogger(getClass)
override def driverPlugin(): DriverPlugin = {
new DriverPlugin() {
override def init(sc: SparkContext, myContext: PluginContext): JMap[String, String] = {
println("---------------------------")
println("INIIIIIIIIIIIIIIIIIIIIIIIIIIITTTTTTTTTTTTTTTTT")
Map.empty[String, String].asJava
}
override def shutdown(): Unit = {
logger.info("Hellooooooooooooooo")
println("Hellooooooooooooooo")
throw new Exception("FAILEEEEEED")
super.shutdown()
}
}
}
override def executorPlugin(): ExecutorPlugin = {
new ExecutorPlugin {
}
}
}
I have added this config to the cluster config:
spark.plugins com.baikal.DemoPlugin
spark.extraListeners com.baikal.OnApplicationEnd,com.databricks.backend.daemon.driver.DBCEventLoggingListener
Like I said, the init of the plugin works and with the onJobEnd listener too. But I it doesn't work on application end/shutdown.
Any ideas?
Thanks