cancel
Showing results for 
Search instead for 
Did you mean: 
Administration & Architecture
Explore discussions on Databricks administration, deployment strategies, and architectural best practices. Connect with administrators and architects to optimize your Databricks environment for performance, scalability, and security.
cancel
Showing results for 
Search instead for 
Did you mean: 

Databricks Kryo setup

UnaiUribarri
New Contributor II

I would like to consolidate all our Spark jobs in Databricks. One of those jobs that are currently running in Azure HDInsight is not properly working using a Databricks JAR job.

It uses Spark 3.3 RDDs and requires configuring Kryo serialisation. Therefore, I have setup a DB 12.2 LTS cluster (latest version using Spark 3.3.x) and setup everything like in HDInsight, adding spark config to the cluster and the JAR to the single task.

But cluster fails because executors cannot find the class defined in spark.kryo.registrator, which is included in the JAR.

How can I solve this?

Job definition:

resources:
  jobs:
    test_unai_layer_initial_snapshot:
      name: test-unai-layer-initial-snapshot
      max_concurrent_runs: 15
      tasks:
        - task_key: layer-A-initial-snapshot
          spark_jar_task:
            jar_uri: ""
            main_class_name: com.tomtom.orbis.layers.io.InitialSnapshotCommand
            parameters:
              - --input=redacted
              - --output=redacted
            run_as_repl: true
          job_cluster_key: test-unai-initial-snapshot-cluster
          libraries:
            - jar: dbfs:/FileStore/jars/osm-layers-state-io-all.jar

      job_clusters:
        - job_cluster_key: test-unai-initial-snapshot-cluster
          new_cluster:
            cluster_name: ""
            spark_version: 12.2.x-scala2.12
            spark_conf:
              spark.network.timeout: 500s
              spark.executor.cores: "24"
              spark.sql.shuffle.partitions: auto
# Kryo settings copied from HDInsight
              spark.serializer: org.apache.spark.serializer.KryoSerializer
              spark.kryo.unsafe: "false"
              spark.kryo.referenceTracking: "false"
              spark.kryo.registrator: com.tomtom.orbis.layers.io.spark.LayerKryoRegistrator
              spark.kryoserializer.buffer: 4m
              spark.shuffle.io.retryWait: 30s
# Deleted some Hadoop azure options
            azure_attributes:
              first_on_demand: 1
              availability: SPOT_WITH_FALLBACK_AZURE
              spot_bid_max_price: 100
            node_type_id: Standard_E48d_v4
            driver_node_type_id: Standard_D16ds_v4
            custom_tags:
              project: orbis-light
            cluster_log_conf:
              dbfs:
                destination: dbfs:/cluster-logs/test/unai
            enable_elastic_disk: true
            policy_id: 306312B973000A7D
            data_security_mode: LEGACY_SINGLE_USER_STANDARD
            runtime_engine: STANDARD
            autoscale:
              min_workers: 16
              max_workers: 16
      tags:
        name: layer-initial-snapshot

The error I am getting (in all executors is the same) is:

24/10/16 07:01:15 ERROR Utils: Exception encountered
org.apache.spark.SparkException: Failed to register classes with Kryo
at org.apache.spark.serializer.KryoSerializer.$anonfun$newKryo$5(KryoSerializer.scala:184)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.util.Utils$.withContextClassLoader(Utils.scala:280)
at org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:172)
at org.apache.spark.serializer.KryoSerializer$$anon$1.create(KryoSerializer.scala:103)
at com.esotericsoftware.kryo.pool.KryoPoolQueueImpl.borrow(KryoPoolQueueImpl.java:48)
at org.apache.spark.serializer.KryoSerializer$PoolWrapper.borrow(KryoSerializer.scala:110)
at org.apache.spark.serializer.KryoSerializerInstance.borrowKryo(KryoSerializer.scala:349)
at org.apache.spark.serializer.KryoDeserializationStream.<init>(KryoSerializer.scala:303)
at org.apache.spark.serializer.KryoSerializerInstance.deserializeStream(KryoSerializer.scala:470)
at org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:452)
at org.apache.spark.broadcast.TorrentBroadcast.$anonfun$readBroadcastBlock$4(TorrentBroadcast.scala:339)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.broadcast.TorrentBroadcast.$anonfun$readBroadcastBlock$2(TorrentBroadcast.scala:313)
at org.apache.spark.util.KeyLock.withLock(KeyLock.scala:64)
at org.apache.spark.broadcast.TorrentBroadcast.$anonfun$readBroadcastBlock$1(TorrentBroadcast.scala:308)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1663)
at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:308)
at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:146)
at org.apache.spark.broadcast.Broadcast.$anonfun$value$1(Broadcast.scala:80)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:78)
at org.apache.spark.scheduler.TaskDescription.stageDescription(TaskDescription.scala:72)
at org.apache.spark.scheduler.TaskDescription.properties(TaskDescription.scala:77)
at org.apache.spark.executor.Executor$TaskRunner.<init>(Executor.scala:677)
at org.apache.spark.executor.Executor.createTaskRunner(Executor.scala:533)
at org.apache.spark.executor.Executor.launchTask(Executor.scala:537)
at org.apache.spark.executor.Executor$TaskLauncher.$anonfun$run$3(Executor.scala:506)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.executor.Executor$TaskLauncher.run(Executor.scala:506)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.ClassNotFoundException: com.tomtom.orbis.layers.io.spark.LayerKryoRegistrator
at org.apache.spark.repl.ExecutorClassLoader.findClass(ExecutorClassLoader.scala:126)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:594)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:527)
at java.base/java.lang.Class.forName0(Native Method)
at java.base/java.lang.Class.forName(Class.java:398)
at org.apache.spark.util.Utils$.classForName(Utils.scala:259)
at org.apache.spark.serializer.KryoSerializer.$anonfun$newKryo$7(KryoSerializer.scala:179)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at scala.collection.TraversableLike.map(TraversableLike.scala:286)
at scala.collection.TraversableLike.map$(TraversableLike.scala:279)

 

2 REPLIES 2

UnaiUribarri
New Contributor II

Kryo Registrator is properly load in the driver. If I remove the class from the JAR, I get an error in the driver instead.

dilsan77
New Contributor II

Integrating Spark tasks with Databricks can greatly improve your workflow. For tasks that require Kryo serialization, make sure you configure your Spark session correctly. You may need to adjust the serialization settings in your Spark configuration. Checking RDD compatibility Your databricks can also help with troubleshooting. You can find more detailed instructions on configuring Kryo serialization for Spark jobs in the Databricks documentation.

Join Us as a Local Community Builder!

Passionate about hosting events and connecting people? Help us grow a vibrant local community—sign up today to get started!

Sign Up Now