cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Execute code on Application End

Eric-JoelBlanco
New Contributor II

Hello,

I want to execute a custom code onApplicationEnd. Outisde Databricks, I have used the Spark Listener onApplicationEnd without problems.

But it is not working on Databricks (I tried listener onJobEnd and this one worked).

I have also tried Spark plugin, and the logs from the Driver in the init() method are there, but not the ones of the shutdown:

package com.baikal
 
import java.util.{Map => JMap}
 
import org.apache.spark.SparkContext
import org.apache.spark.api.plugin.{DriverPlugin, ExecutorPlugin, PluginContext, SparkPlugin}
import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
import org.slf4j.LoggerFactory
 
import scala.collection.JavaConverters._
 
object Hello {
 
 
  def main(args: Array[String]): Unit = {
    val listenerEnd = new OnApplicationEnd
    import org.apache.spark.sql.SparkSession
    val spark = SparkSession.builder()
      .appName("Spark")
      .getOrCreate();
 
    spark.sparkContext.addSparkListener(listenerEnd)
 
    spark.sparkContext.parallelize(Array(1,2,3,4,5,6,7,8,9,10)).count()
    
  }
 
}
 
class OnApplicationEnd extends SparkListener {
  lazy val logger = LoggerFactory.getLogger(getClass)
  override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
    logger.info("Hellooooooooooooooo")
    println("Hellooooooooooooooo")
    throw new Exception("FAILEEEEEED")
 
  }
}
 
class DemoPlugin extends SparkPlugin {
  lazy val logger = LoggerFactory.getLogger(getClass)
  override def driverPlugin(): DriverPlugin = {
 
    new DriverPlugin() {
      override def init(sc: SparkContext, myContext: PluginContext): JMap[String, String] = {
        println("---------------------------")
        println("INIIIIIIIIIIIIIIIIIIIIIIIIIIITTTTTTTTTTTTTTTTT")
        Map.empty[String, String].asJava
      }
 
      override def shutdown(): Unit = {
        logger.info("Hellooooooooooooooo")
        println("Hellooooooooooooooo")
        throw new Exception("FAILEEEEEED")
        super.shutdown()
      }
    }
  }
 
  override def executorPlugin(): ExecutorPlugin = {
    new ExecutorPlugin {
 
    }
  }
}

I have added this config to the cluster config:

spark.plugins com.baikal.DemoPlugin
spark.extraListeners com.baikal.OnApplicationEnd,com.databricks.backend.daemon.driver.DBCEventLoggingListener

Like I said, the init of the plugin works and with the onJobEnd listener too. But I it doesn't work on application end/shutdown.

Any ideas?

Thanks

1 REPLY 1

abhilash
New Contributor II

Did you find any solution?

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group