cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

Execute code on Application End

Eric-JoelBlanco
New Contributor II

Hello,

I want to execute a custom code onApplicationEnd. Outisde Databricks, I have used the Spark Listener onApplicationEnd without problems.

But it is not working on Databricks (I tried listener onJobEnd and this one worked).

I have also tried Spark plugin, and the logs from the Driver in the init() method are there, but not the ones of the shutdown:

package com.baikal
 
import java.util.{Map => JMap}
 
import org.apache.spark.SparkContext
import org.apache.spark.api.plugin.{DriverPlugin, ExecutorPlugin, PluginContext, SparkPlugin}
import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
import org.slf4j.LoggerFactory
 
import scala.collection.JavaConverters._
 
object Hello {
 
 
  def main(args: Array[String]): Unit = {
    val listenerEnd = new OnApplicationEnd
    import org.apache.spark.sql.SparkSession
    val spark = SparkSession.builder()
      .appName("Spark")
      .getOrCreate();
 
    spark.sparkContext.addSparkListener(listenerEnd)
 
    spark.sparkContext.parallelize(Array(1,2,3,4,5,6,7,8,9,10)).count()
    
  }
 
}
 
class OnApplicationEnd extends SparkListener {
  lazy val logger = LoggerFactory.getLogger(getClass)
  override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
    logger.info("Hellooooooooooooooo")
    println("Hellooooooooooooooo")
    throw new Exception("FAILEEEEEED")
 
  }
}
 
class DemoPlugin extends SparkPlugin {
  lazy val logger = LoggerFactory.getLogger(getClass)
  override def driverPlugin(): DriverPlugin = {
 
    new DriverPlugin() {
      override def init(sc: SparkContext, myContext: PluginContext): JMap[String, String] = {
        println("---------------------------")
        println("INIIIIIIIIIIIIIIIIIIIIIIIIIIITTTTTTTTTTTTTTTTT")
        Map.empty[String, String].asJava
      }
 
      override def shutdown(): Unit = {
        logger.info("Hellooooooooooooooo")
        println("Hellooooooooooooooo")
        throw new Exception("FAILEEEEEED")
        super.shutdown()
      }
    }
  }
 
  override def executorPlugin(): ExecutorPlugin = {
    new ExecutorPlugin {
 
    }
  }
}

I have added this config to the cluster config:

spark.plugins com.baikal.DemoPlugin
spark.extraListeners com.baikal.OnApplicationEnd,com.databricks.backend.daemon.driver.DBCEventLoggingListener

Like I said, the init of the plugin works and with the onJobEnd listener too. But I it doesn't work on application end/shutdown.

Any ideas?

Thanks

1 REPLY 1

abhilash
New Contributor II

Did you find any solution?

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.