I would like to consolidate all our Spark jobs in Databricks. One of those jobs that are currently running in Azure HDInsight is not properly working using a Databricks JAR job.
It uses Spark 3.3 RDDs and requires configuring Kryo serialisation. Therefore, I have setup a DB 12.2 LTS cluster (latest version using Spark 3.3.x) and setup everything like in HDInsight, adding spark config to the cluster and the JAR to the single task.
But cluster fails because executors cannot find the class defined in spark.kryo.registrator, which is included in the JAR.
How can I solve this?
Job definition:
resources:
jobs:
test_unai_layer_initial_snapshot:
name: test-unai-layer-initial-snapshot
max_concurrent_runs: 15
tasks:
- task_key: layer-A-initial-snapshot
spark_jar_task:
jar_uri: ""
main_class_name: com.tomtom.orbis.layers.io.InitialSnapshotCommand
parameters:
- --input=redacted
- --output=redacted
run_as_repl: true
job_cluster_key: test-unai-initial-snapshot-cluster
libraries:
- jar: dbfs:/FileStore/jars/osm-layers-state-io-all.jar
job_clusters:
- job_cluster_key: test-unai-initial-snapshot-cluster
new_cluster:
cluster_name: ""
spark_version: 12.2.x-scala2.12
spark_conf:
spark.network.timeout: 500s
spark.executor.cores: "24"
spark.sql.shuffle.partitions: auto
# Kryo settings copied from HDInsight
spark.serializer: org.apache.spark.serializer.KryoSerializer
spark.kryo.unsafe: "false"
spark.kryo.referenceTracking: "false"
spark.kryo.registrator: com.tomtom.orbis.layers.io.spark.LayerKryoRegistrator
spark.kryoserializer.buffer: 4m
spark.shuffle.io.retryWait: 30s
# Deleted some Hadoop azure options
azure_attributes:
first_on_demand: 1
availability: SPOT_WITH_FALLBACK_AZURE
spot_bid_max_price: 100
node_type_id: Standard_E48d_v4
driver_node_type_id: Standard_D16ds_v4
custom_tags:
project: orbis-light
cluster_log_conf:
dbfs:
destination: dbfs:/cluster-logs/test/unai
enable_elastic_disk: true
policy_id: 306312B973000A7D
data_security_mode: LEGACY_SINGLE_USER_STANDARD
runtime_engine: STANDARD
autoscale:
min_workers: 16
max_workers: 16
tags:
name: layer-initial-snapshot
The error I am getting (in all executors is the same) is:
24/10/16 07:01:15 ERROR Utils: Exception encountered
org.apache.spark.SparkException: Failed to register classes with Kryo
at org.apache.spark.serializer.KryoSerializer.$anonfun$newKryo$5(KryoSerializer.scala:184)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.util.Utils$.withContextClassLoader(Utils.scala:280)
at org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:172)
at org.apache.spark.serializer.KryoSerializer$$anon$1.create(KryoSerializer.scala:103)
at com.esotericsoftware.kryo.pool.KryoPoolQueueImpl.borrow(KryoPoolQueueImpl.java:48)
at org.apache.spark.serializer.KryoSerializer$PoolWrapper.borrow(KryoSerializer.scala:110)
at org.apache.spark.serializer.KryoSerializerInstance.borrowKryo(KryoSerializer.scala:349)
at org.apache.spark.serializer.KryoDeserializationStream.<init>(KryoSerializer.scala:303)
at org.apache.spark.serializer.KryoSerializerInstance.deserializeStream(KryoSerializer.scala:470)
at org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:452)
at org.apache.spark.broadcast.TorrentBroadcast.$anonfun$readBroadcastBlock$4(TorrentBroadcast.scala:339)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.broadcast.TorrentBroadcast.$anonfun$readBroadcastBlock$2(TorrentBroadcast.scala:313)
at org.apache.spark.util.KeyLock.withLock(KeyLock.scala:64)
at org.apache.spark.broadcast.TorrentBroadcast.$anonfun$readBroadcastBlock$1(TorrentBroadcast.scala:308)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1663)
at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:308)
at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:146)
at org.apache.spark.broadcast.Broadcast.$anonfun$value$1(Broadcast.scala:80)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:78)
at org.apache.spark.scheduler.TaskDescription.stageDescription(TaskDescription.scala:72)
at org.apache.spark.scheduler.TaskDescription.properties(TaskDescription.scala:77)
at org.apache.spark.executor.Executor$TaskRunner.<init>(Executor.scala:677)
at org.apache.spark.executor.Executor.createTaskRunner(Executor.scala:533)
at org.apache.spark.executor.Executor.launchTask(Executor.scala:537)
at org.apache.spark.executor.Executor$TaskLauncher.$anonfun$run$3(Executor.scala:506)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.executor.Executor$TaskLauncher.run(Executor.scala:506)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.ClassNotFoundException: com.tomtom.orbis.layers.io.spark.LayerKryoRegistrator
at org.apache.spark.repl.ExecutorClassLoader.findClass(ExecutorClassLoader.scala:126)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:594)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:527)
at java.base/java.lang.Class.forName0(Native Method)
at java.base/java.lang.Class.forName(Class.java:398)
at org.apache.spark.util.Utils$.classForName(Utils.scala:259)
at org.apache.spark.serializer.KryoSerializer.$anonfun$newKryo$7(KryoSerializer.scala:179)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at scala.collection.TraversableLike.map(TraversableLike.scala:286)
at scala.collection.TraversableLike.map$(TraversableLike.scala:279)