10-04-2021 05:49 AM
I have created custom UDF's that generate logs. These logs can be flushed by calling another API exposed which is exposed by an internal layer. However I want to call this API just after the execution of the UDF comes to an end. Is there any way of determining if the execution of a particular UDF has finished to invoke the API to flush logs and clean up.
For e.g. When we extend Hive's GenericUDF class for a Hive UDF, there is a close function available in the lifecycle of the UDF that will be called after the execution of the UDF.
Is a similar approach possible in SparkSQL UDFs?
10-13-2021 05:16 AM
@Krishna Kashiv
May be ExecutorPlugin.java can help. It has all the methods you might required. Let me know if it works or not.
You need to implement this interface org.apache.spark.api.plugin.SparkPlugin
and expose it as spark.plugins = com.abc.ImplementationClass
10-04-2021 10:28 AM
Hi @ krishnakash! My name is Kaniz, and I'm the technical moderator here. Great to meet you, and thanks for your question! Let's see if your peers in the community have an answer to your question first. Or else I will follow up with my team and get back to you soon. Thanks.
10-06-2021 01:54 AM
TBH, I don't think callback feature yet supported for UDF. But we can workaround by implementing SparkListenerInterface or extending SparkFirehoseListener . These have several methods one such method which might help us is onStageCompleted
Interface definition can be found here
Once you implement the interface you can add it to spark by using sparkContext.addSparkListener
/**
* :: DeveloperApi ::
* Register a listener to receive up-calls from events that happen during execution.
*/
@DeveloperApi
def addSparkListener(listener: SparkListenerInterface): Unit = {
listenerBus.addToSharedQueue(listener)
}
10-07-2021 10:40 PM
We tried adding the SparkListener, we had added loggers for all type of function of the SparkListenerInterface however according to our observation, we saw that the logs are getting generated in the driver logs. That means the driver node executes the callback methods.
Is it possible to call these callback methods from the executor nodes; as the audit logs to be flushed are generated in the executor nodes?
10-13-2021 05:16 AM
@Krishna Kashiv
May be ExecutorPlugin.java can help. It has all the methods you might required. Let me know if it works or not.
You need to implement this interface org.apache.spark.api.plugin.SparkPlugin
and expose it as spark.plugins = com.abc.ImplementationClass
10-28-2021 08:50 PM
Hi, how to properly configure the jar containing the class and spark plugin in Databricks?
During DBR 7.3 cluster creation, I tried by setting the spark.plugins , spark.driver.extraClassPath and spark.executor.extraClassPath Spark configs. But cluster fails to start and Driver logs contains:
21/10/28 17:20:34 ERROR SparkContext: Error initializing SparkContext.
java.lang.ClassNotFoundException:com.example.PtyExecSparkPlugin not found in com.databricks.backend.daemon.driver.ClassLoaders$LibraryClassLoader@47a4eee2
at com.databricks.backend.daemon.driver.ClassLoaders$MultiReplClassLoader.loadClass(ClassLoaders.scala:115)
at java.lang.ClassLoader.loadClass(ClassLoader.java:352)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.apache.spark.util.Utils$.classForName(Utils.scala:226)
at org.apache.spark.util.Utils$.$anonfun$loadExtensions$1(Utils.scala:3006)
at scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:245)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at scala.collection.TraversableLike.flatMap(TraversableLike.scala:245)
at scala.collection.TraversableLike.flatMap$(TraversableLike.scala:242)
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:108)
at org.apache.spark.util.Utils$.loadExtensions(Utils.scala:3004)
at org.apache.spark.internal.plugin.PluginContainer$.apply(PluginContainer.scala:160)
at org.apache.spark.internal.plugin.PluginContainer$.apply(PluginContainer.scala:146)
at org.apache.spark.SparkContext.<init>(SparkContext.scala:591)
at com.databricks.backend.daemon.driver.DatabricksILoop$.$anonfun$initializeSharedDriverContext$1(DatabricksILoop.scala:347)
at com.databricks.backend.daemon.driver.ClassLoaders$.withContextClassLoader(ClassLoaders.scala:29)
at com.databricks.backend.daemon.driver.DatabricksILoop$.initializeSharedDriverContext(DatabricksILoop.scala:347)
at com.databricks.backend.daemon.driver.DatabricksILoop$.getOrCreateSharedDriverContext(DatabricksILoop.scala:277)
at com.databricks.backend.daemon.driver.DriverCorral.com$databricks$backend$daemon$driver$DriverCorral$$driverContext(DriverCorral.scala:179)
at com.databricks.backend.daemon.driver.DriverCorral.<init>(DriverCorral.scala:216)
at com.databricks.backend.daemon.driver.DriverDaemon.<init>(DriverDaemon.scala:39)
at com.databricks.backend.daemon.driver.DriverDaemon$.create(DriverDaemon.scala:211)
at com.databricks.backend.daemon.driver.DriverDaemon$.wrappedMain(DriverDaemon.scala:216)
at com.databricks.DatabricksMain.$anonfun$main$1(DatabricksMain.scala:106)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at com.databricks.DatabricksMain.$anonfun$withStartupProfilingData$1(DatabricksMain.scala:321)
at com.databricks.logging.UsageLogging.$anonfun$recordOperation$4(UsageLogging.scala:431)
at com.databricks.logging.UsageLogging.$anonfun$withAttributionContext$1(UsageLogging.scala:239)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
at com.databricks.logging.UsageLogging.withAttributionContext(UsageLogging.scala:234)
at com.databricks.logging.UsageLogging.withAttributionContext$(UsageLogging.scala:231)
at com.databricks.DatabricksMain.withAttributionContext(DatabricksMain.scala:74)
at com.databricks.logging.UsageLogging.withAttributionTags(UsageLogging.scala:276)
at com.databricks.logging.UsageLogging.withAttributionTags$(UsageLogging.scala:269)
at com.databricks.DatabricksMain.withAttributionTags(DatabricksMain.scala:74)
at com.databricks.logging.UsageLogging.recordOperation(UsageLogging.scala:412)
at com.databricks.logging.UsageLogging.recordOperation$(UsageLogging.scala:338)
at com.databricks.DatabricksMain.recordOperation(DatabricksMain.scala:74)
at com.databricks.DatabricksMain.withStartupProfilingData(DatabricksMain.scala:321)
at com.databricks.DatabricksMain.main(DatabricksMain.scala:105)
at com.databricks.backend.daemon.driver.DriverDaemon.main(DriverDaemon.scala)
Caused by: java.lang.ClassNotFoundException: com.example.PtyExecSparkPlugin
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:419)
at com.databricks.backend.daemon.driver.ClassLoaders$LibraryClassLoader.loadClass(ClassLoaders.scala:151)
at java.lang.ClassLoader.loadClass(ClassLoader.java:352)
at com.databricks.backend.daemon.driver.ClassLoaders$MultiReplClassLoader.loadClass(ClassLoaders.scala:112)
... 43 more
I have also tried by installing the Jar under Cluster Library. Still Class is not loading.
11-21-2021 04:35 PM
Hi @Krishna Kashiv and @Franklin George ,
You might consider adding this as an init script (https://docs.microsoft.com/en-us/azure/databricks/clusters/init-scripts).
The init scripts give you an opportunity to add jars to the cluster before spark even begins which is probably what the spark plugin is expecting.
#!/bin/bash
STAGE_DIR="/dbfs/databricks/plugins/
echo "BEGIN: Upload Spark Plugins"
cp -f $STAGE_DIR/*.jar /mnt/driver-daemon/jars || { echo "Error copying Spark Plugin library file"; exit 1;}
echo "END: Upload Spark Plugin JARs"
echo "BEGIN: Modify Spark config settings"
cat << 'EOF' > /databricks/driver/conf/spark-plugin-driver-defaults.conf
[driver] {
"spark.plugins" = "com.example.CustomExecSparkPlugin"
}
EOF
echo "END: Modify Spark config settings"
I believe the copying of the jar to /mnt/driver-daemons/jars will make Spark aware of the jar before Spark fully initializes (https://docs.microsoft.com/en-us/azure/databricks/clusters/configure#--spark-configuration).
Less certain it will make to the executor though
Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections.
Click here to register and join today!
Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.