cancel
Showing results for 
Search instead for 
Did you mean: 
Machine Learning
cancel
Showing results for 
Search instead for 
Did you mean: 

Substantial performance issues/degradation on Databricks when migrating job over to EMR

643926
New Contributor II

Versions of Code:

Databricks: 7.3 LTS ML (includes Apache Spark 3.0.1, Scala 2.12)

AWS EMR: 6.1.0 (Spark 3.0.0, Scala 2.12)

https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-610-release.html

The problem:

Errors in Databricks when replicating job that works in AWS EMR

Description and Setup:

We have spark job that essentially runs the

```ALSModel.recommendForAllUsers( recommendations_ct)```

function and writes it AWS S3 in AWS EMR.

We are currently attempting to migrate this to our Databricks environment. We have copied over the exact same cluster configuration, and spark configuration values and the python code is identical as well.

The configuration that can execute in EMR and fails in Databricks:

6 r5.8xlarge Workers (256GB, 32 cores)

1 r5.2xlarge Driver (64GB, 8 cores)

with spark configuration values:

```

spark.serializer org.apache.spark.serializer.KryoSerializer

spark.kryoserializer.buffer.max 2000m

spark.driver.memoryOverhead 4096

spark.executor.cores 5

spark.executor.memory 35G

spark.driver.cores 5

spark.executor.memoryOverhead 4096

spark.sql.shuffle.partitions 350

spark.broadcast.blockSize 12m

spark.executor.instances 35

spark.driver.memory 35G

spark.default.parallelism 350

fs.s3a.server-side-encryption-algorithm SSE-KMS

spark.hadoop.fs.s3a.stsAssumeRole.arn arn:aws:iam::***REDACTED***:role/databricks-s3-egress

spark.hadoop.fs.s3a.acl.default BucketOwnerFullControl

spark.hadoop.fs.s3a.credentialsType AssumeRole

```

The error we observe:

Errors consistent with lost executors due to OOM and other JVM issues. What's strange is this runs comfortably within EMR.

ERROR RetryingBlockFetcher: Exception while beginning fetch of 1 outstanding blocks

java.io.IOException: Failed to connect to /***REDACTED***

at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:253)

at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:195)

at org.apache.spark.network.netty.NettyBlockTransferService$$anon$2.createAndStart(NettyBlockTransferService.scala:122)

at org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:141)

at org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:121)

at org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:143)

at org.apache.spark.network.BlockTransferService.fetchBlockSync(BlockTransferService.scala:103)

at org.apache.spark.storage.BlockManager.fetchRemoteManagedBuffer(BlockManager.scala:1011)

at org.apache.spark.storage.BlockManager.$anonfun$getRemoteBlock$8(BlockManager.scala:955)

at scala.Option.orElse(Option.scala:447)

at org.apache.spark.storage.BlockManager.getRemoteBlock(BlockManager.scala:955)

at org.apache.spark.storage.BlockManager.getRemoteBytes(BlockManager.scala:1093)

at org.apache.spark.broadcast.TorrentBroadcast.$anonfun$readBlocks$1(TorrentBroadcast.scala:195)

at scala.runtime.java8.JFunction1$mcVI$sp.apply(JFunction1$mcVI$sp.java:23)

at scala.collection.immutable.List.foreach(List.scala:392)

at org.apache.spark.broadcast.TorrentBroadcast.readBlocks(TorrentBroadcast.scala:184)

at org.apache.spark.broadcast.TorrentBroadcast.$anonfun$readBroadcastBlock$4(TorrentBroadcast.scala:268)

at scala.Option.getOrElse(Option.scala:189)

at org.apache.spark.broadcast.TorrentBroadcast.$anonfun$readBroadcastBlock$2(TorrentBroadcast.scala:246)

at org.apache.spark.util.KeyLock.withLock(KeyLock.scala:64)

at org.apache.spark.broadcast.TorrentBroadcast.$anonfun$readBroadcastBlock$1(TorrentBroadcast.scala:241)

at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1558)

at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:241)

at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:118)

at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:78)

at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat.$anonfun$buildReaderWithPartitionValues$1(ParquetFileFormat.scala:309)

at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1$$anon$2.getNext(FileScanRDD.scala:291)

at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)

at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.$anonfun$prepareNextFile$1(FileScanRDD.scala:499)

at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659)

at scala.util.Success.$anonfun$map$1(Try.scala:255)

at scala.util.Success.map(Try.scala:213)

at scala.concurrent.Future.$anonfun$map$1(Future.scala:292)

at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)

at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)

at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)

at org.apache.spark.util.threads.SparkThreadLocalCapturingRunnable.$anonfun$run$1(SparkThreadLocalForwardingThreadPoolExecutor.scala:104)

at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)

at org.apache.spark.util.threads.SparkThreadLocalCapturingHelper.runWithCaptured(SparkThreadLocalForwardingThreadPoolExecutor.scala:68)

at org.apache.spark.util.threads.SparkThreadLocalCapturingHelper.runWithCaptured$(SparkThreadLocalForwardingThreadPoolExecutor.scala:54)

at org.apache.spark.util.threads.SparkThreadLocalCapturingRunnable.runWithCaptured(SparkThreadLocalForwardingThreadPoolExecutor.scala:101)

at org.apache.spark.util.threads.SparkThreadLocalCapturingRunnable.run(SparkThreadLocalForwardingThreadPoolExecutor.scala:104)

at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

at java.lang.Thread.run(Thread.java:748)

Caused by: io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: /10.203.234.49:34347

Caused by: java.net.ConnectException: Connection refused

at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)

at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:716)

at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:330)

at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:334)

at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:702)

at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)

at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)

at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)

at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)

at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)

at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)

at java.lang.Thread.run(Thread.java:748)

Any help/thoughts would be greatly appreciated.

0 REPLIES 0
Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.