Versions of Code:
Databricks: 7.3 LTS ML (includes Apache Spark 3.0.1, Scala 2.12)
AWS EMR: 6.1.0 (Spark 3.0.0, Scala 2.12)
https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-610-release.html
The problem:
Errors in Databricks when replicating job that works in AWS EMR
Description and Setup:
We have spark job that essentially runs the
```ALSModel.recommendForAllUsers( recommendations_ct)```
function and writes it AWS S3 in AWS EMR.
We are currently attempting to migrate this to our Databricks environment. We have copied over the exact same cluster configuration, and spark configuration values and the python code is identical as well.
The configuration that can execute in EMR and fails in Databricks:
6 r5.8xlarge Workers (256GB, 32 cores)
1 r5.2xlarge Driver (64GB, 8 cores)
with spark configuration values:
```
spark.serializer org.apache.spark.serializer.KryoSerializer
spark.kryoserializer.buffer.max 2000m
spark.driver.memoryOverhead 4096
spark.executor.cores 5
spark.executor.memory 35G
spark.driver.cores 5
spark.executor.memoryOverhead 4096
spark.sql.shuffle.partitions 350
spark.broadcast.blockSize 12m
spark.executor.instances 35
spark.driver.memory 35G
spark.default.parallelism 350
fs.s3a.server-side-encryption-algorithm SSE-KMS
spark.hadoop.fs.s3a.stsAssumeRole.arn arn:aws:iam::***REDACTED***:role/databricks-s3-egress
spark.hadoop.fs.s3a.acl.default BucketOwnerFullControl
spark.hadoop.fs.s3a.credentialsType AssumeRole
```
The error we observe:
Errors consistent with lost executors due to OOM and other JVM issues. What's strange is this runs comfortably within EMR.
ERROR RetryingBlockFetcher: Exception while beginning fetch of 1 outstanding blocks
java.io.IOException: Failed to connect to /***REDACTED***
at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:253)
at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:195)
at org.apache.spark.network.netty.NettyBlockTransferService$$anon$2.createAndStart(NettyBlockTransferService.scala:122)
at org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:141)
at org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:121)
at org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:143)
at org.apache.spark.network.BlockTransferService.fetchBlockSync(BlockTransferService.scala:103)
at org.apache.spark.storage.BlockManager.fetchRemoteManagedBuffer(BlockManager.scala:1011)
at org.apache.spark.storage.BlockManager.$anonfun$getRemoteBlock$8(BlockManager.scala:955)
at scala.Option.orElse(Option.scala:447)
at org.apache.spark.storage.BlockManager.getRemoteBlock(BlockManager.scala:955)
at org.apache.spark.storage.BlockManager.getRemoteBytes(BlockManager.scala:1093)
at org.apache.spark.broadcast.TorrentBroadcast.$anonfun$readBlocks$1(TorrentBroadcast.scala:195)
at scala.runtime.java8.JFunction1$mcVI$sp.apply(JFunction1$mcVI$sp.java:23)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.apache.spark.broadcast.TorrentBroadcast.readBlocks(TorrentBroadcast.scala:184)
at org.apache.spark.broadcast.TorrentBroadcast.$anonfun$readBroadcastBlock$4(TorrentBroadcast.scala:268)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.broadcast.TorrentBroadcast.$anonfun$readBroadcastBlock$2(TorrentBroadcast.scala:246)
at org.apache.spark.util.KeyLock.withLock(KeyLock.scala:64)
at org.apache.spark.broadcast.TorrentBroadcast.$anonfun$readBroadcastBlock$1(TorrentBroadcast.scala:241)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1558)
at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:241)
at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:118)
at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:78)
at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat.$anonfun$buildReaderWithPartitionValues$1(ParquetFileFormat.scala:309)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1$$anon$2.getNext(FileScanRDD.scala:291)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.$anonfun$prepareNextFile$1(FileScanRDD.scala:499)
at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659)
at scala.util.Success.$anonfun$map$1(Try.scala:255)
at scala.util.Success.map(Try.scala:213)
at scala.concurrent.Future.$anonfun$map$1(Future.scala:292)
at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
at org.apache.spark.util.threads.SparkThreadLocalCapturingRunnable.$anonfun$run$1(SparkThreadLocalForwardingThreadPoolExecutor.scala:104)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.util.threads.SparkThreadLocalCapturingHelper.runWithCaptured(SparkThreadLocalForwardingThreadPoolExecutor.scala:68)
at org.apache.spark.util.threads.SparkThreadLocalCapturingHelper.runWithCaptured$(SparkThreadLocalForwardingThreadPoolExecutor.scala:54)
at org.apache.spark.util.threads.SparkThreadLocalCapturingRunnable.runWithCaptured(SparkThreadLocalForwardingThreadPoolExecutor.scala:101)
at org.apache.spark.util.threads.SparkThreadLocalCapturingRunnable.run(SparkThreadLocalForwardingThreadPoolExecutor.scala:104)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: /10.203.234.49:34347
Caused by: java.net.ConnectException: Connection refused
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:716)
at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:330)
at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:334)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:702)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:748)
Any help/thoughts would be greatly appreciated.