10-16-2024 01:03 AM
I would like to consolidate all our Spark jobs in Databricks. One of those jobs that are currently running in Azure HDInsight is not properly working using a Databricks JAR job.
It uses Spark 3.3 RDDs and requires configuring Kryo serialisation. Therefore, I have setup a DB 12.2 LTS cluster (latest version using Spark 3.3.x) and setup everything like in HDInsight, adding spark config to the cluster and the JAR to the single task.
But cluster fails because executors cannot find the class defined in spark.kryo.registrator, which is included in the JAR.
How can I solve this?
Job definition:
resources:
jobs:
test_unai_layer_initial_snapshot:
name: test-unai-layer-initial-snapshot
max_concurrent_runs: 15
tasks:
- task_key: layer-A-initial-snapshot
spark_jar_task:
jar_uri: ""
main_class_name: com.tomtom.orbis.layers.io.InitialSnapshotCommand
parameters:
- --input=redacted
- --output=redacted
run_as_repl: true
job_cluster_key: test-unai-initial-snapshot-cluster
libraries:
- jar: dbfs:/FileStore/jars/osm-layers-state-io-all.jar
job_clusters:
- job_cluster_key: test-unai-initial-snapshot-cluster
new_cluster:
cluster_name: ""
spark_version: 12.2.x-scala2.12
spark_conf:
spark.network.timeout: 500s
spark.executor.cores: "24"
spark.sql.shuffle.partitions: auto
# Kryo settings copied from HDInsight
spark.serializer: org.apache.spark.serializer.KryoSerializer
spark.kryo.unsafe: "false"
spark.kryo.referenceTracking: "false"
spark.kryo.registrator: com.tomtom.orbis.layers.io.spark.LayerKryoRegistrator
spark.kryoserializer.buffer: 4m
spark.shuffle.io.retryWait: 30s
# Deleted some Hadoop azure options
azure_attributes:
first_on_demand: 1
availability: SPOT_WITH_FALLBACK_AZURE
spot_bid_max_price: 100
node_type_id: Standard_E48d_v4
driver_node_type_id: Standard_D16ds_v4
custom_tags:
project: orbis-light
cluster_log_conf:
dbfs:
destination: dbfs:/cluster-logs/test/unai
enable_elastic_disk: true
policy_id: 306312B973000A7D
data_security_mode: LEGACY_SINGLE_USER_STANDARD
runtime_engine: STANDARD
autoscale:
min_workers: 16
max_workers: 16
tags:
name: layer-initial-snapshot
The error I am getting (in all executors is the same) is:
24/10/16 07:01:15 ERROR Utils: Exception encountered
org.apache.spark.SparkException: Failed to register classes with Kryo
at org.apache.spark.serializer.KryoSerializer.$anonfun$newKryo$5(KryoSerializer.scala:184)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.util.Utils$.withContextClassLoader(Utils.scala:280)
at org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:172)
at org.apache.spark.serializer.KryoSerializer$$anon$1.create(KryoSerializer.scala:103)
at com.esotericsoftware.kryo.pool.KryoPoolQueueImpl.borrow(KryoPoolQueueImpl.java:48)
at org.apache.spark.serializer.KryoSerializer$PoolWrapper.borrow(KryoSerializer.scala:110)
at org.apache.spark.serializer.KryoSerializerInstance.borrowKryo(KryoSerializer.scala:349)
at org.apache.spark.serializer.KryoDeserializationStream.<init>(KryoSerializer.scala:303)
at org.apache.spark.serializer.KryoSerializerInstance.deserializeStream(KryoSerializer.scala:470)
at org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:452)
at org.apache.spark.broadcast.TorrentBroadcast.$anonfun$readBroadcastBlock$4(TorrentBroadcast.scala:339)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.broadcast.TorrentBroadcast.$anonfun$readBroadcastBlock$2(TorrentBroadcast.scala:313)
at org.apache.spark.util.KeyLock.withLock(KeyLock.scala:64)
at org.apache.spark.broadcast.TorrentBroadcast.$anonfun$readBroadcastBlock$1(TorrentBroadcast.scala:308)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1663)
at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:308)
at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:146)
at org.apache.spark.broadcast.Broadcast.$anonfun$value$1(Broadcast.scala:80)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:78)
at org.apache.spark.scheduler.TaskDescription.stageDescription(TaskDescription.scala:72)
at org.apache.spark.scheduler.TaskDescription.properties(TaskDescription.scala:77)
at org.apache.spark.executor.Executor$TaskRunner.<init>(Executor.scala:677)
at org.apache.spark.executor.Executor.createTaskRunner(Executor.scala:533)
at org.apache.spark.executor.Executor.launchTask(Executor.scala:537)
at org.apache.spark.executor.Executor$TaskLauncher.$anonfun$run$3(Executor.scala:506)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.executor.Executor$TaskLauncher.run(Executor.scala:506)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.ClassNotFoundException: com.tomtom.orbis.layers.io.spark.LayerKryoRegistrator
at org.apache.spark.repl.ExecutorClassLoader.findClass(ExecutorClassLoader.scala:126)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:594)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:527)
at java.base/java.lang.Class.forName0(Native Method)
at java.base/java.lang.Class.forName(Class.java:398)
at org.apache.spark.util.Utils$.classForName(Utils.scala:259)
at org.apache.spark.serializer.KryoSerializer.$anonfun$newKryo$7(KryoSerializer.scala:179)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at scala.collection.TraversableLike.map(TraversableLike.scala:286)
at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
10-16-2024 06:14 AM
Kryo Registrator is properly load in the driver. If I remove the class from the JAR, I get an error in the driver instead.
10-17-2024 01:33 AM
Integrating Spark tasks with Databricks can greatly improve your workflow. For tasks that require Kryo serialization, make sure you configure your Spark session correctly. You may need to adjust the serialization settings in your Spark configuration. Checking RDD compatibility Your databricks can also help with troubleshooting. You can find more detailed instructions on configuring Kryo serialization for Spark jobs in the Databricks documentation.
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.
If there isn’t a group near you, start one and help create a community that brings people together.
Request a New Group