cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

Is there any way of determining last stage of SparkSQL Application Execution?

krishnakash
New Contributor II

I have created custom UDF's that generate logs. These logs can be flushed by calling another API exposed which is exposed by an internal layer. However I want to call this API just after the execution of the UDF comes to an end. Is there any way of determining if the execution of a particular UDF has finished to invoke the API to flush logs and clean up.

For e.g. When we extend Hive's GenericUDF class for a Hive UDF, there is a close function available in the lifecycle of the UDF that will be called after the execution of the UDF.

Is a similar approach possible in SparkSQL UDFs?

1 ACCEPTED SOLUTION

Accepted Solutions

User16763506586
Contributor

@Krishna Kashiv​ 

May be ExecutorPlugin.java can help. It has all the methods you might required. Let me know if it works or not.

You need to implement this interface org.apache.spark.api.plugin.SparkPlugin

and expose it as spark.plugins = com.abc.ImplementationClass

View solution in original post

6 REPLIES 6

Kaniz
Community Manager
Community Manager

Hi @ krishnakash! My name is Kaniz, and I'm the technical moderator here. Great to meet you, and thanks for your question! Let's see if your peers in the community have an answer to your question first. Or else I will follow up with my team and get back to you soon. Thanks.

User16763506586
Contributor

TBH, I don't think callback feature yet supported for UDF. But we can workaround by implementing SparkListenerInterface or extending SparkFirehoseListener . These have several methods one such method which might help us is onStageCompleted

Interface definition can be found here

Once you implement the interface you can add it to spark by using sparkContext.addSparkListener

/**
   * :: DeveloperApi ::
   * Register a listener to receive up-calls from events that happen during execution.
   */
  @DeveloperApi
  def addSparkListener(listener: SparkListenerInterface): Unit = {
    listenerBus.addToSharedQueue(listener)
  }

We tried adding the SparkListener, we had added loggers for all type of function of the SparkListenerInterface however according to our observation, we saw that the logs are getting generated in the driver logs. That means the driver node executes the callback methods.

Is it possible to call these callback methods from the executor nodes; as the audit logs to be flushed are generated in the executor nodes?

User16763506586
Contributor

@Krishna Kashiv​ 

May be ExecutorPlugin.java can help. It has all the methods you might required. Let me know if it works or not.

You need to implement this interface org.apache.spark.api.plugin.SparkPlugin

and expose it as spark.plugins = com.abc.ImplementationClass

Hi, how to properly configure the jar containing the class and spark plugin in Databricks?

During DBR 7.3 cluster creation, I tried by setting the spark.plugins , spark.driver.extraClassPath and spark.executor.extraClassPath Spark configs. But cluster fails to start and Driver logs contains:

21/10/28 17:20:34 ERROR SparkContext: Error initializing SparkContext.
java.lang.ClassNotFoundException:com.example.PtyExecSparkPlugin not found in com.databricks.backend.daemon.driver.ClassLoaders$LibraryClassLoader@47a4eee2
	at com.databricks.backend.daemon.driver.ClassLoaders$MultiReplClassLoader.loadClass(ClassLoaders.scala:115)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:352)
	at java.lang.Class.forName0(Native Method)
	at java.lang.Class.forName(Class.java:348)
	at org.apache.spark.util.Utils$.classForName(Utils.scala:226)
	at org.apache.spark.util.Utils$.$anonfun$loadExtensions$1(Utils.scala:3006)
	at scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:245)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at scala.collection.TraversableLike.flatMap(TraversableLike.scala:245)
	at scala.collection.TraversableLike.flatMap$(TraversableLike.scala:242)
	at scala.collection.AbstractTraversable.flatMap(Traversable.scala:108)
	at org.apache.spark.util.Utils$.loadExtensions(Utils.scala:3004)
	at org.apache.spark.internal.plugin.PluginContainer$.apply(PluginContainer.scala:160)
	at org.apache.spark.internal.plugin.PluginContainer$.apply(PluginContainer.scala:146)
	at org.apache.spark.SparkContext.<init>(SparkContext.scala:591)
	at com.databricks.backend.daemon.driver.DatabricksILoop$.$anonfun$initializeSharedDriverContext$1(DatabricksILoop.scala:347)
	at com.databricks.backend.daemon.driver.ClassLoaders$.withContextClassLoader(ClassLoaders.scala:29)
	at com.databricks.backend.daemon.driver.DatabricksILoop$.initializeSharedDriverContext(DatabricksILoop.scala:347)
	at com.databricks.backend.daemon.driver.DatabricksILoop$.getOrCreateSharedDriverContext(DatabricksILoop.scala:277)
	at com.databricks.backend.daemon.driver.DriverCorral.com$databricks$backend$daemon$driver$DriverCorral$$driverContext(DriverCorral.scala:179)
	at com.databricks.backend.daemon.driver.DriverCorral.<init>(DriverCorral.scala:216)
	at com.databricks.backend.daemon.driver.DriverDaemon.<init>(DriverDaemon.scala:39)
	at com.databricks.backend.daemon.driver.DriverDaemon$.create(DriverDaemon.scala:211)
	at com.databricks.backend.daemon.driver.DriverDaemon$.wrappedMain(DriverDaemon.scala:216)
	at com.databricks.DatabricksMain.$anonfun$main$1(DatabricksMain.scala:106)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at com.databricks.DatabricksMain.$anonfun$withStartupProfilingData$1(DatabricksMain.scala:321)
	at com.databricks.logging.UsageLogging.$anonfun$recordOperation$4(UsageLogging.scala:431)
	at com.databricks.logging.UsageLogging.$anonfun$withAttributionContext$1(UsageLogging.scala:239)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
	at com.databricks.logging.UsageLogging.withAttributionContext(UsageLogging.scala:234)
	at com.databricks.logging.UsageLogging.withAttributionContext$(UsageLogging.scala:231)
	at com.databricks.DatabricksMain.withAttributionContext(DatabricksMain.scala:74)
	at com.databricks.logging.UsageLogging.withAttributionTags(UsageLogging.scala:276)
	at com.databricks.logging.UsageLogging.withAttributionTags$(UsageLogging.scala:269)
	at com.databricks.DatabricksMain.withAttributionTags(DatabricksMain.scala:74)
	at com.databricks.logging.UsageLogging.recordOperation(UsageLogging.scala:412)
	at com.databricks.logging.UsageLogging.recordOperation$(UsageLogging.scala:338)
	at com.databricks.DatabricksMain.recordOperation(DatabricksMain.scala:74)
	at com.databricks.DatabricksMain.withStartupProfilingData(DatabricksMain.scala:321)
	at com.databricks.DatabricksMain.main(DatabricksMain.scala:105)
	at com.databricks.backend.daemon.driver.DriverDaemon.main(DriverDaemon.scala)
Caused by: java.lang.ClassNotFoundException: com.example.PtyExecSparkPlugin
	at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:419)
	at com.databricks.backend.daemon.driver.ClassLoaders$LibraryClassLoader.loadClass(ClassLoaders.scala:151)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:352)
	at com.databricks.backend.daemon.driver.ClassLoaders$MultiReplClassLoader.loadClass(ClassLoaders.scala:112)
	... 43 more

I have also tried by installing the Jar under Cluster Library. Still Class is not loading.

Kaniz
Community Manager
Community Manager

Hi @Krishna Kashiv​  and @Franklin George​ ,

You might consider adding this as an init script (https://docs.microsoft.com/en-us/azure/databricks/clusters/init-scripts).

The init scripts give you an opportunity to add jars to the cluster before spark even begins which is probably what the spark plugin is expecting.

  • Upload your jar to dbfs, somewhere like 
  • dbfs:/databricks/plugins
  • Create and upload a bash script like below to the same place.
  • Create / Edit a cluster with the init script specified.
#!/bin/bash
 
STAGE_DIR="/dbfs/databricks/plugins/
 
echo "BEGIN: Upload Spark Plugins"
cp -f $STAGE_DIR/*.jar /mnt/driver-daemon/jars || { echo "Error copying Spark Plugin library file"; exit 1;}
echo "END: Upload Spark Plugin JARs"
 
echo "BEGIN: Modify Spark config settings"
cat << 'EOF' > /databricks/driver/conf/spark-plugin-driver-defaults.conf
[driver] {
   "spark.plugins" = "com.example.CustomExecSparkPlugin"
}
EOF
echo "END: Modify Spark config settings"

I believe the copying of the jar to /mnt/driver-daemons/jars will make Spark aware of the jar before Spark fully initializes (https://docs.microsoft.com/en-us/azure/databricks/clusters/configure#--spark-configuration).

Less certain it will make to the executor though 

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.