cancel
Showing results for 
Search instead for 
Did you mean: 
Machine Learning
Dive into the world of machine learning on the Databricks platform. Explore discussions on algorithms, model training, deployment, and more. Connect with ML enthusiasts and experts.
cancel
Showing results for 
Search instead for 
Did you mean: 

Substantial performance issues/degradation on Databricks when migrating job over to EMR

643926
New Contributor II

Versions of Code:

Databricks: 7.3 LTS ML (includes Apache Spark 3.0.1, Scala 2.12)

AWS EMR: 6.1.0 (Spark 3.0.0, Scala 2.12)

https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-610-release.html

The problem:

Errors in Databricks when replicating job that works in AWS EMR

Description and Setup:

We have spark job that essentially runs the

```ALSModel.recommendForAllUsers( recommendations_ct)```

function and writes it AWS S3 in AWS EMR.

We are currently attempting to migrate this to our Databricks environment. We have copied over the exact same cluster configuration, and spark configuration values and the python code is identical as well.

The configuration that can execute in EMR and fails in Databricks:

6 r5.8xlarge Workers (256GB, 32 cores)

1 r5.2xlarge Driver (64GB, 8 cores)

with spark configuration values:

```

spark.serializer org.apache.spark.serializer.KryoSerializer

spark.kryoserializer.buffer.max 2000m

spark.driver.memoryOverhead 4096

spark.executor.cores 5

spark.executor.memory 35G

spark.driver.cores 5

spark.executor.memoryOverhead 4096

spark.sql.shuffle.partitions 350

spark.broadcast.blockSize 12m

spark.executor.instances 35

spark.driver.memory 35G

spark.default.parallelism 350

fs.s3a.server-side-encryption-algorithm SSE-KMS

spark.hadoop.fs.s3a.stsAssumeRole.arn arn:aws:iam::***REDACTED***:role/databricks-s3-egress

spark.hadoop.fs.s3a.acl.default BucketOwnerFullControl

spark.hadoop.fs.s3a.credentialsType AssumeRole

```

The error we observe:

Errors consistent with lost executors due to OOM and other JVM issues. What's strange is this runs comfortably within EMR.

ERROR RetryingBlockFetcher: Exception while beginning fetch of 1 outstanding blocks

java.io.IOException: Failed to connect to /***REDACTED***

at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:253)

at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:195)

at org.apache.spark.network.netty.NettyBlockTransferService$$anon$2.createAndStart(NettyBlockTransferService.scala:122)

at org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:141)

at org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:121)

at org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:143)

at org.apache.spark.network.BlockTransferService.fetchBlockSync(BlockTransferService.scala:103)

at org.apache.spark.storage.BlockManager.fetchRemoteManagedBuffer(BlockManager.scala:1011)

at org.apache.spark.storage.BlockManager.$anonfun$getRemoteBlock$8(BlockManager.scala:955)

at scala.Option.orElse(Option.scala:447)

at org.apache.spark.storage.BlockManager.getRemoteBlock(BlockManager.scala:955)

at org.apache.spark.storage.BlockManager.getRemoteBytes(BlockManager.scala:1093)

at org.apache.spark.broadcast.TorrentBroadcast.$anonfun$readBlocks$1(TorrentBroadcast.scala:195)

at scala.runtime.java8.JFunction1$mcVI$sp.apply(JFunction1$mcVI$sp.java:23)

at scala.collection.immutable.List.foreach(List.scala:392)

at org.apache.spark.broadcast.TorrentBroadcast.readBlocks(TorrentBroadcast.scala:184)

at org.apache.spark.broadcast.TorrentBroadcast.$anonfun$readBroadcastBlock$4(TorrentBroadcast.scala:268)

at scala.Option.getOrElse(Option.scala:189)

at org.apache.spark.broadcast.TorrentBroadcast.$anonfun$readBroadcastBlock$2(TorrentBroadcast.scala:246)

at org.apache.spark.util.KeyLock.withLock(KeyLock.scala:64)

at org.apache.spark.broadcast.TorrentBroadcast.$anonfun$readBroadcastBlock$1(TorrentBroadcast.scala:241)

at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1558)

at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:241)

at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:118)

at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:78)

at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat.$anonfun$buildReaderWithPartitionValues$1(ParquetFileFormat.scala:309)

at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1$$anon$2.getNext(FileScanRDD.scala:291)

at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)

at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.$anonfun$prepareNextFile$1(FileScanRDD.scala:499)

at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659)

at scala.util.Success.$anonfun$map$1(Try.scala:255)

at scala.util.Success.map(Try.scala:213)

at scala.concurrent.Future.$anonfun$map$1(Future.scala:292)

at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)

at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)

at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)

at org.apache.spark.util.threads.SparkThreadLocalCapturingRunnable.$anonfun$run$1(SparkThreadLocalForwardingThreadPoolExecutor.scala:104)

at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)

at org.apache.spark.util.threads.SparkThreadLocalCapturingHelper.runWithCaptured(SparkThreadLocalForwardingThreadPoolExecutor.scala:68)

at org.apache.spark.util.threads.SparkThreadLocalCapturingHelper.runWithCaptured$(SparkThreadLocalForwardingThreadPoolExecutor.scala:54)

at org.apache.spark.util.threads.SparkThreadLocalCapturingRunnable.runWithCaptured(SparkThreadLocalForwardingThreadPoolExecutor.scala:101)

at org.apache.spark.util.threads.SparkThreadLocalCapturingRunnable.run(SparkThreadLocalForwardingThreadPoolExecutor.scala:104)

at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

at java.lang.Thread.run(Thread.java:748)

Caused by: io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: /10.203.234.49:34347

Caused by: java.net.ConnectException: Connection refused

at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)

at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:716)

at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:330)

at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:334)

at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:702)

at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)

at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)

at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)

at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)

at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)

at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)

at java.lang.Thread.run(Thread.java:748)

Any help/thoughts would be greatly appreciated.

0 REPLIES 0

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group