cancel
Showing results for 
Search instead for 
Did you mean: 
Administration & Architecture
Explore discussions on Databricks administration, deployment strategies, and architectural best practices. Connect with administrators and architects to optimize your Databricks environment for performance, scalability, and security.
cancel
Showing results for 
Search instead for 
Did you mean: 

Databricks Kryo setup

UnaiUribarri
New Contributor II

I would like to consolidate all our Spark jobs in Databricks. One of those jobs that are currently running in Azure HDInsight is not properly working using a Databricks JAR job.

It uses Spark 3.3 RDDs and requires configuring Kryo serialisation. Therefore, I have setup a DB 12.2 LTS cluster (latest version using Spark 3.3.x) and setup everything like in HDInsight, adding spark config to the cluster and the JAR to the single task.

But cluster fails because executors cannot find the class defined in spark.kryo.registrator, which is included in the JAR.

How can I solve this?

Job definition:

resources:
  jobs:
    test_unai_layer_initial_snapshot:
      name: test-unai-layer-initial-snapshot
      max_concurrent_runs: 15
      tasks:
        - task_key: layer-A-initial-snapshot
          spark_jar_task:
            jar_uri: ""
            main_class_name: com.tomtom.orbis.layers.io.InitialSnapshotCommand
            parameters:
              - --input=redacted
              - --output=redacted
            run_as_repl: true
          job_cluster_key: test-unai-initial-snapshot-cluster
          libraries:
            - jar: dbfs:/FileStore/jars/osm-layers-state-io-all.jar

      job_clusters:
        - job_cluster_key: test-unai-initial-snapshot-cluster
          new_cluster:
            cluster_name: ""
            spark_version: 12.2.x-scala2.12
            spark_conf:
              spark.network.timeout: 500s
              spark.executor.cores: "24"
              spark.sql.shuffle.partitions: auto
# Kryo settings copied from HDInsight
              spark.serializer: org.apache.spark.serializer.KryoSerializer
              spark.kryo.unsafe: "false"
              spark.kryo.referenceTracking: "false"
              spark.kryo.registrator: com.tomtom.orbis.layers.io.spark.LayerKryoRegistrator
              spark.kryoserializer.buffer: 4m
              spark.shuffle.io.retryWait: 30s
# Deleted some Hadoop azure options
            azure_attributes:
              first_on_demand: 1
              availability: SPOT_WITH_FALLBACK_AZURE
              spot_bid_max_price: 100
            node_type_id: Standard_E48d_v4
            driver_node_type_id: Standard_D16ds_v4
            custom_tags:
              project: orbis-light
            cluster_log_conf:
              dbfs:
                destination: dbfs:/cluster-logs/test/unai
            enable_elastic_disk: true
            policy_id: 306312B973000A7D
            data_security_mode: LEGACY_SINGLE_USER_STANDARD
            runtime_engine: STANDARD
            autoscale:
              min_workers: 16
              max_workers: 16
      tags:
        name: layer-initial-snapshot

The error I am getting (in all executors is the same) is:

24/10/16 07:01:15 ERROR Utils: Exception encountered
org.apache.spark.SparkException: Failed to register classes with Kryo
at org.apache.spark.serializer.KryoSerializer.$anonfun$newKryo$5(KryoSerializer.scala:184)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.util.Utils$.withContextClassLoader(Utils.scala:280)
at org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:172)
at org.apache.spark.serializer.KryoSerializer$$anon$1.create(KryoSerializer.scala:103)
at com.esotericsoftware.kryo.pool.KryoPoolQueueImpl.borrow(KryoPoolQueueImpl.java:48)
at org.apache.spark.serializer.KryoSerializer$PoolWrapper.borrow(KryoSerializer.scala:110)
at org.apache.spark.serializer.KryoSerializerInstance.borrowKryo(KryoSerializer.scala:349)
at org.apache.spark.serializer.KryoDeserializationStream.<init>(KryoSerializer.scala:303)
at org.apache.spark.serializer.KryoSerializerInstance.deserializeStream(KryoSerializer.scala:470)
at org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:452)
at org.apache.spark.broadcast.TorrentBroadcast.$anonfun$readBroadcastBlock$4(TorrentBroadcast.scala:339)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.broadcast.TorrentBroadcast.$anonfun$readBroadcastBlock$2(TorrentBroadcast.scala:313)
at org.apache.spark.util.KeyLock.withLock(KeyLock.scala:64)
at org.apache.spark.broadcast.TorrentBroadcast.$anonfun$readBroadcastBlock$1(TorrentBroadcast.scala:308)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1663)
at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:308)
at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:146)
at org.apache.spark.broadcast.Broadcast.$anonfun$value$1(Broadcast.scala:80)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:78)
at org.apache.spark.scheduler.TaskDescription.stageDescription(TaskDescription.scala:72)
at org.apache.spark.scheduler.TaskDescription.properties(TaskDescription.scala:77)
at org.apache.spark.executor.Executor$TaskRunner.<init>(Executor.scala:677)
at org.apache.spark.executor.Executor.createTaskRunner(Executor.scala:533)
at org.apache.spark.executor.Executor.launchTask(Executor.scala:537)
at org.apache.spark.executor.Executor$TaskLauncher.$anonfun$run$3(Executor.scala:506)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.executor.Executor$TaskLauncher.run(Executor.scala:506)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.ClassNotFoundException: com.tomtom.orbis.layers.io.spark.LayerKryoRegistrator
at org.apache.spark.repl.ExecutorClassLoader.findClass(ExecutorClassLoader.scala:126)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:594)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:527)
at java.base/java.lang.Class.forName0(Native Method)
at java.base/java.lang.Class.forName(Class.java:398)
at org.apache.spark.util.Utils$.classForName(Utils.scala:259)
at org.apache.spark.serializer.KryoSerializer.$anonfun$newKryo$7(KryoSerializer.scala:179)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at scala.collection.TraversableLike.map(TraversableLike.scala:286)
at scala.collection.TraversableLike.map$(TraversableLike.scala:279)

 

2 REPLIES 2

UnaiUribarri
New Contributor II

Kryo Registrator is properly load in the driver. If I remove the class from the JAR, I get an error in the driver instead.

dilsan77
New Contributor II

Integrating Spark tasks with Databricks can greatly improve your workflow. For tasks that require Kryo serialization, make sure you configure your Spark session correctly. You may need to adjust the serialization settings in your Spark configuration. Checking RDD compatibility Your databricks can also help with troubleshooting. You can find more detailed instructions on configuring Kryo serialization for Spark jobs in the Databricks documentation.

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group