10-04-2021 05:49 AM
I have created custom UDF's that generate logs. These logs can be flushed by calling another API exposed which is exposed by an internal layer. However I want to call this API just after the execution of the UDF comes to an end. Is there any way of determining if the execution of a particular UDF has finished to invoke the API to flush logs and clean up.
For e.g. When we extend Hive's GenericUDF class for a Hive UDF, there is a close function available in the lifecycle of the UDF that will be called after the execution of the UDF.
Is a similar approach possible in SparkSQL UDFs?
10-13-2021 05:16 AM
@Krishna Kashiv
May be ExecutorPlugin.java can help. It has all the methods you might required. Let me know if it works or not.
You need to implement this interface org.apache.spark.api.plugin.SparkPlugin
and expose it as spark.plugins = com.abc.ImplementationClass
10-06-2021 01:54 AM
TBH, I don't think callback feature yet supported for UDF. But we can workaround by implementing SparkListenerInterface or extending SparkFirehoseListener . These have several methods one such method which might help us is onStageCompleted
Interface definition can be found here
Once you implement the interface you can add it to spark by using sparkContext.addSparkListener
/**
* :: DeveloperApi ::
* Register a listener to receive up-calls from events that happen during execution.
*/
@DeveloperApi
def addSparkListener(listener: SparkListenerInterface): Unit = {
listenerBus.addToSharedQueue(listener)
}
10-07-2021 10:40 PM
We tried adding the SparkListener, we had added loggers for all type of function of the SparkListenerInterface however according to our observation, we saw that the logs are getting generated in the driver logs. That means the driver node executes the callback methods.
Is it possible to call these callback methods from the executor nodes; as the audit logs to be flushed are generated in the executor nodes?
10-13-2021 05:16 AM
@Krishna Kashiv
May be ExecutorPlugin.java can help. It has all the methods you might required. Let me know if it works or not.
You need to implement this interface org.apache.spark.api.plugin.SparkPlugin
and expose it as spark.plugins = com.abc.ImplementationClass
10-28-2021 08:50 PM
Hi, how to properly configure the jar containing the class and spark plugin in Databricks?
During DBR 7.3 cluster creation, I tried by setting the spark.plugins , spark.driver.extraClassPath and spark.executor.extraClassPath Spark configs. But cluster fails to start and Driver logs contains:
21/10/28 17:20:34 ERROR SparkContext: Error initializing SparkContext.
java.lang.ClassNotFoundException:com.example.PtyExecSparkPlugin not found in com.databricks.backend.daemon.driver.ClassLoaders$LibraryClassLoader@47a4eee2
at com.databricks.backend.daemon.driver.ClassLoaders$MultiReplClassLoader.loadClass(ClassLoaders.scala:115)
at java.lang.ClassLoader.loadClass(ClassLoader.java:352)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.apache.spark.util.Utils$.classForName(Utils.scala:226)
at org.apache.spark.util.Utils$.$anonfun$loadExtensions$1(Utils.scala:3006)
at scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:245)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at scala.collection.TraversableLike.flatMap(TraversableLike.scala:245)
at scala.collection.TraversableLike.flatMap$(TraversableLike.scala:242)
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:108)
at org.apache.spark.util.Utils$.loadExtensions(Utils.scala:3004)
at org.apache.spark.internal.plugin.PluginContainer$.apply(PluginContainer.scala:160)
at org.apache.spark.internal.plugin.PluginContainer$.apply(PluginContainer.scala:146)
at org.apache.spark.SparkContext.<init>(SparkContext.scala:591)
at com.databricks.backend.daemon.driver.DatabricksILoop$.$anonfun$initializeSharedDriverContext$1(DatabricksILoop.scala:347)
at com.databricks.backend.daemon.driver.ClassLoaders$.withContextClassLoader(ClassLoaders.scala:29)
at com.databricks.backend.daemon.driver.DatabricksILoop$.initializeSharedDriverContext(DatabricksILoop.scala:347)
at com.databricks.backend.daemon.driver.DatabricksILoop$.getOrCreateSharedDriverContext(DatabricksILoop.scala:277)
at com.databricks.backend.daemon.driver.DriverCorral.com$databricks$backend$daemon$driver$DriverCorral$$driverContext(DriverCorral.scala:179)
at com.databricks.backend.daemon.driver.DriverCorral.<init>(DriverCorral.scala:216)
at com.databricks.backend.daemon.driver.DriverDaemon.<init>(DriverDaemon.scala:39)
at com.databricks.backend.daemon.driver.DriverDaemon$.create(DriverDaemon.scala:211)
at com.databricks.backend.daemon.driver.DriverDaemon$.wrappedMain(DriverDaemon.scala:216)
at com.databricks.DatabricksMain.$anonfun$main$1(DatabricksMain.scala:106)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at com.databricks.DatabricksMain.$anonfun$withStartupProfilingData$1(DatabricksMain.scala:321)
at com.databricks.logging.UsageLogging.$anonfun$recordOperation$4(UsageLogging.scala:431)
at com.databricks.logging.UsageLogging.$anonfun$withAttributionContext$1(UsageLogging.scala:239)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
at com.databricks.logging.UsageLogging.withAttributionContext(UsageLogging.scala:234)
at com.databricks.logging.UsageLogging.withAttributionContext$(UsageLogging.scala:231)
at com.databricks.DatabricksMain.withAttributionContext(DatabricksMain.scala:74)
at com.databricks.logging.UsageLogging.withAttributionTags(UsageLogging.scala:276)
at com.databricks.logging.UsageLogging.withAttributionTags$(UsageLogging.scala:269)
at com.databricks.DatabricksMain.withAttributionTags(DatabricksMain.scala:74)
at com.databricks.logging.UsageLogging.recordOperation(UsageLogging.scala:412)
at com.databricks.logging.UsageLogging.recordOperation$(UsageLogging.scala:338)
at com.databricks.DatabricksMain.recordOperation(DatabricksMain.scala:74)
at com.databricks.DatabricksMain.withStartupProfilingData(DatabricksMain.scala:321)
at com.databricks.DatabricksMain.main(DatabricksMain.scala:105)
at com.databricks.backend.daemon.driver.DriverDaemon.main(DriverDaemon.scala)
Caused by: java.lang.ClassNotFoundException: com.example.PtyExecSparkPlugin
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:419)
at com.databricks.backend.daemon.driver.ClassLoaders$LibraryClassLoader.loadClass(ClassLoaders.scala:151)
at java.lang.ClassLoader.loadClass(ClassLoader.java:352)
at com.databricks.backend.daemon.driver.ClassLoaders$MultiReplClassLoader.loadClass(ClassLoaders.scala:112)
... 43 more
I have also tried by installing the Jar under Cluster Library. Still Class is not loading.
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.
If there isn’t a group near you, start one and help create a community that brings people together.
Request a New Group