cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Is there any way of determining last stage of SparkSQL Application Execution?

krishnakash
New Contributor II

I have created custom UDF's that generate logs. These logs can be flushed by calling another API exposed which is exposed by an internal layer. However I want to call this API just after the execution of the UDF comes to an end. Is there any way of determining if the execution of a particular UDF has finished to invoke the API to flush logs and clean up.

For e.g. When we extend Hive's GenericUDF class for a Hive UDF, there is a close function available in the lifecycle of the UDF that will be called after the execution of the UDF.

Is a similar approach possible in SparkSQL UDFs?

1 ACCEPTED SOLUTION

Accepted Solutions

User16763506586
Contributor

@Krishna Kashiv​ 

May be ExecutorPlugin.java can help. It has all the methods you might required. Let me know if it works or not.

You need to implement this interface org.apache.spark.api.plugin.SparkPlugin

and expose it as spark.plugins = com.abc.ImplementationClass

View solution in original post

4 REPLIES 4

User16763506586
Contributor

TBH, I don't think callback feature yet supported for UDF. But we can workaround by implementing SparkListenerInterface or extending SparkFirehoseListener . These have several methods one such method which might help us is onStageCompleted

Interface definition can be found here

Once you implement the interface you can add it to spark by using sparkContext.addSparkListener

/**
   * :: DeveloperApi ::
   * Register a listener to receive up-calls from events that happen during execution.
   */
  @DeveloperApi
  def addSparkListener(listener: SparkListenerInterface): Unit = {
    listenerBus.addToSharedQueue(listener)
  }

We tried adding the SparkListener, we had added loggers for all type of function of the SparkListenerInterface however according to our observation, we saw that the logs are getting generated in the driver logs. That means the driver node executes the callback methods.

Is it possible to call these callback methods from the executor nodes; as the audit logs to be flushed are generated in the executor nodes?

User16763506586
Contributor

@Krishna Kashiv​ 

May be ExecutorPlugin.java can help. It has all the methods you might required. Let me know if it works or not.

You need to implement this interface org.apache.spark.api.plugin.SparkPlugin

and expose it as spark.plugins = com.abc.ImplementationClass

Hi, how to properly configure the jar containing the class and spark plugin in Databricks?

During DBR 7.3 cluster creation, I tried by setting the spark.plugins , spark.driver.extraClassPath and spark.executor.extraClassPath Spark configs. But cluster fails to start and Driver logs contains:

21/10/28 17:20:34 ERROR SparkContext: Error initializing SparkContext.
java.lang.ClassNotFoundException:com.example.PtyExecSparkPlugin not found in com.databricks.backend.daemon.driver.ClassLoaders$LibraryClassLoader@47a4eee2
	at com.databricks.backend.daemon.driver.ClassLoaders$MultiReplClassLoader.loadClass(ClassLoaders.scala:115)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:352)
	at java.lang.Class.forName0(Native Method)
	at java.lang.Class.forName(Class.java:348)
	at org.apache.spark.util.Utils$.classForName(Utils.scala:226)
	at org.apache.spark.util.Utils$.$anonfun$loadExtensions$1(Utils.scala:3006)
	at scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:245)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at scala.collection.TraversableLike.flatMap(TraversableLike.scala:245)
	at scala.collection.TraversableLike.flatMap$(TraversableLike.scala:242)
	at scala.collection.AbstractTraversable.flatMap(Traversable.scala:108)
	at org.apache.spark.util.Utils$.loadExtensions(Utils.scala:3004)
	at org.apache.spark.internal.plugin.PluginContainer$.apply(PluginContainer.scala:160)
	at org.apache.spark.internal.plugin.PluginContainer$.apply(PluginContainer.scala:146)
	at org.apache.spark.SparkContext.<init>(SparkContext.scala:591)
	at com.databricks.backend.daemon.driver.DatabricksILoop$.$anonfun$initializeSharedDriverContext$1(DatabricksILoop.scala:347)
	at com.databricks.backend.daemon.driver.ClassLoaders$.withContextClassLoader(ClassLoaders.scala:29)
	at com.databricks.backend.daemon.driver.DatabricksILoop$.initializeSharedDriverContext(DatabricksILoop.scala:347)
	at com.databricks.backend.daemon.driver.DatabricksILoop$.getOrCreateSharedDriverContext(DatabricksILoop.scala:277)
	at com.databricks.backend.daemon.driver.DriverCorral.com$databricks$backend$daemon$driver$DriverCorral$$driverContext(DriverCorral.scala:179)
	at com.databricks.backend.daemon.driver.DriverCorral.<init>(DriverCorral.scala:216)
	at com.databricks.backend.daemon.driver.DriverDaemon.<init>(DriverDaemon.scala:39)
	at com.databricks.backend.daemon.driver.DriverDaemon$.create(DriverDaemon.scala:211)
	at com.databricks.backend.daemon.driver.DriverDaemon$.wrappedMain(DriverDaemon.scala:216)
	at com.databricks.DatabricksMain.$anonfun$main$1(DatabricksMain.scala:106)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at com.databricks.DatabricksMain.$anonfun$withStartupProfilingData$1(DatabricksMain.scala:321)
	at com.databricks.logging.UsageLogging.$anonfun$recordOperation$4(UsageLogging.scala:431)
	at com.databricks.logging.UsageLogging.$anonfun$withAttributionContext$1(UsageLogging.scala:239)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
	at com.databricks.logging.UsageLogging.withAttributionContext(UsageLogging.scala:234)
	at com.databricks.logging.UsageLogging.withAttributionContext$(UsageLogging.scala:231)
	at com.databricks.DatabricksMain.withAttributionContext(DatabricksMain.scala:74)
	at com.databricks.logging.UsageLogging.withAttributionTags(UsageLogging.scala:276)
	at com.databricks.logging.UsageLogging.withAttributionTags$(UsageLogging.scala:269)
	at com.databricks.DatabricksMain.withAttributionTags(DatabricksMain.scala:74)
	at com.databricks.logging.UsageLogging.recordOperation(UsageLogging.scala:412)
	at com.databricks.logging.UsageLogging.recordOperation$(UsageLogging.scala:338)
	at com.databricks.DatabricksMain.recordOperation(DatabricksMain.scala:74)
	at com.databricks.DatabricksMain.withStartupProfilingData(DatabricksMain.scala:321)
	at com.databricks.DatabricksMain.main(DatabricksMain.scala:105)
	at com.databricks.backend.daemon.driver.DriverDaemon.main(DriverDaemon.scala)
Caused by: java.lang.ClassNotFoundException: com.example.PtyExecSparkPlugin
	at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:419)
	at com.databricks.backend.daemon.driver.ClassLoaders$LibraryClassLoader.loadClass(ClassLoaders.scala:151)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:352)
	at com.databricks.backend.daemon.driver.ClassLoaders$MultiReplClassLoader.loadClass(ClassLoaders.scala:112)
	... 43 more

I have also tried by installing the Jar under Cluster Library. Still Class is not loading.

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group