cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Execute code on Application End

Eric-JoelBlanco
New Contributor II

Hello,

I want to execute a custom code onApplicationEnd. Outisde Databricks, I have used the Spark Listener onApplicationEnd without problems.

But it is not working on Databricks (I tried listener onJobEnd and this one worked).

I have also tried Spark plugin, and the logs from the Driver in the init() method are there, but not the ones of the shutdown:

package com.baikal
 
import java.util.{Map => JMap}
 
import org.apache.spark.SparkContext
import org.apache.spark.api.plugin.{DriverPlugin, ExecutorPlugin, PluginContext, SparkPlugin}
import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
import org.slf4j.LoggerFactory
 
import scala.collection.JavaConverters._
 
object Hello {
 
 
  def main(args: Array[String]): Unit = {
    val listenerEnd = new OnApplicationEnd
    import org.apache.spark.sql.SparkSession
    val spark = SparkSession.builder()
      .appName("Spark")
      .getOrCreate();
 
    spark.sparkContext.addSparkListener(listenerEnd)
 
    spark.sparkContext.parallelize(Array(1,2,3,4,5,6,7,8,9,10)).count()
    
  }
 
}
 
class OnApplicationEnd extends SparkListener {
  lazy val logger = LoggerFactory.getLogger(getClass)
  override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
    logger.info("Hellooooooooooooooo")
    println("Hellooooooooooooooo")
    throw new Exception("FAILEEEEEED")
 
  }
}
 
class DemoPlugin extends SparkPlugin {
  lazy val logger = LoggerFactory.getLogger(getClass)
  override def driverPlugin(): DriverPlugin = {
 
    new DriverPlugin() {
      override def init(sc: SparkContext, myContext: PluginContext): JMap[String, String] = {
        println("---------------------------")
        println("INIIIIIIIIIIIIIIIIIIIIIIIIIIITTTTTTTTTTTTTTTTT")
        Map.empty[String, String].asJava
      }
 
      override def shutdown(): Unit = {
        logger.info("Hellooooooooooooooo")
        println("Hellooooooooooooooo")
        throw new Exception("FAILEEEEEED")
        super.shutdown()
      }
    }
  }
 
  override def executorPlugin(): ExecutorPlugin = {
    new ExecutorPlugin {
 
    }
  }
}

I have added this config to the cluster config:

spark.plugins com.baikal.DemoPlugin
spark.extraListeners com.baikal.OnApplicationEnd,com.databricks.backend.daemon.driver.DBCEventLoggingListener

Like I said, the init of the plugin works and with the onJobEnd listener too. But I it doesn't work on application end/shutdown.

Any ideas?

Thanks

1 REPLY 1

abhilash
New Contributor II

Did you find any solution?

Join 100K+ Data Experts: Register Now & Grow with Us!

Excited to expand your horizons with us? Click here to Register and begin your journey to success!

Already a member? Login and join your local regional user group! If there isn’t one near you, fill out this form and we’ll create one for you to join!