cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

org.apache.spark.SparkException: Job aborted due to stage failure while saving to s3

7cb15
New Contributor

Hello, I am having issues saving a spark dataframe generated in a databricks notebook to an s3 bucket. The dataframe contains approximately 1.1M rows and 5 columns. The error is as follows:

 

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 15.0 failed 4 times, most recent failure: Lost task 0.3 in stage 15.0 (TID 1743) (10.8.144.228 executor 0): org.apache.spark.SparkException: [TASK_WRITE_FAILED] Task failed while writing rows to s3://mybucket/myfilename.csv.

File /databricks/spark/python/pyspark/instrumentation_utils.py:48, in _wrap_function.<locals>.wrapper(*args, **kwargs)
     46 start = time.perf_counter()
     47 try:
---> 48     res = func(*args, **kwargs)
     49     logger.log_success(
     50         module_name, class_name, function_name, time.perf_counter() - start, signature
     51     )
     52     return res

File /databricks/spark/python/pyspark/sql/readwriter.py:1453, in DataFrameWriter.save(self, path, format, mode, partitionBy, **options)
   1451     self._jwrite.save()
   1452 else:
-> 1453     self._jwrite.save(path)

File /databricks/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py:1322, in JavaMember.__call__(self, *args)
   1316 command = proto.CALL_COMMAND_NAME +\
   1317     self.command_header +\
   1318     args_command +\
   1319     proto.END_COMMAND_PART
   1321 answer = self.gateway_client.send_command(command)
-> 1322 return_value = get_return_value(
   1323     answer, self.gateway_client, self.target_id, self.name)
   1325 for temp_arg in temp_args:
   1326     if hasattr(temp_arg, "_detach"):

File /databricks/spark/python/pyspark/errors/exceptions/captured.py:185, in capture_sql_exception.<locals>.deco(*a, **kw)
    183 def deco(*a: Any, **kw: Any) -> Any:
    184     try:
--> 185         return f(*a, **kw)
    186     except Py4JJavaError as e:
    187         converted = convert_exception(e.java_exception)

File /databricks/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/protocol.py:326, in get_return_value(answer, gateway_client, target_id, name)
    324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
    325 if answer[1] == REFERENCE_TYPE:
--> 326     raise Py4JJavaError(
    327         "An error occurred while calling {0}{1}{2}.\n".
    328         format(target_id, ".", name), value)
    329 else:
    330     raise Py4JError(
    331         "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n".
    332         format(target_id, ".", name, value))

Py4JJavaError: An error occurred while calling o1408.save.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 15.0 failed 4 times, most recent failure: Lost task 0.3 in stage 15.0 (TID 1743) (10.8.144.228 executor 0): org.apache.spark.SparkException: [TASK_WRITE_FAILED] Task failed while writing rows to s3://listing-recommender-scores-production/listing_neighbor_similarity_listing_id_2023-11-03 22:45:53.147332+00:00.csv.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.taskFailedWhileWritingRowsError(QueryExecutionErrors.scala:940)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:522)
	at org.apache.spark.sql.execution.datasources.WriteFilesExec.$anonfun$doExecuteWrite$1(WriteFiles.scala:113)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:929)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:929)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
	at org.apache.spark.rdd.RDD.$anonfun$computeOrReadCheckpoint$1(RDD.scala:407)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:404)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:371)
	at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:82)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:82)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:196)
	at org.apache.spark.scheduler.Task.doRunTask(Task.scala:181)
	at org.apache.spark.scheduler.Task.$anonfun$run$5(Task.scala:146)
	at com.databricks.unity.UCSEphemeralState$Handle.runWith(UCSEphemeralState.scala:41)
	at com.databricks.unity.HandleImpl.runWith(UCSHandle.scala:99)
	at com.databricks.unity.HandleImpl.$anonfun$runWithAndClose$1(UCSHandle.scala:104)
	at scala.util.Using$.resource(Using.scala:269)
	at com.databricks.unity.HandleImpl.runWithAndClose(UCSHandle.scala:103)
	at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:146)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.scheduler.Task.run(Task.scala:99)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$12(Executor.scala:910)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1717)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:913)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:763)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.AssertionError: assertion failed
	at scala.Predef$.assert(Predef.scala:208)
	at com.databricks.photon.PhotonMetricsManager.bindToMetricsBuffer(PhotonMetricsManager.scala:53)
	at com.databricks.photon.PhotonBasicEvaluatorFactory$PhotonBasicEvaluator.evalPhoton(PhotonBasicEvaluatorFactory.scala:124)
	at com.databricks.photon.PhotonBasicEvaluatorFactory$PhotonBasicEvaluator.eval(PhotonBasicEvaluatorFactory.scala:79)
	at com.databricks.photon.PhotonExec.$anonfun$executePhoton$8(PhotonExec.scala:391)
	at com.databricks.photon.PhotonExec.$anonfun$executePhoton$8$adapted(PhotonExec.scala:389)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2(RDD.scala:916)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2$adapted(RDD.scala:916)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
	at org.apache.spark.rdd.RDD.$anonfun$computeOrReadCheckpoint$1(RDD.scala:407)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:404)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:371)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
	at org.apache.spark.rdd.RDD.$anonfun$computeOrReadCheckpoint$1(RDD.scala:407)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:404)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:371)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
	at org.apache.spark.rdd.RDD.$anonfun$computeOrReadCheckpoint$1(RDD.scala:407)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:404)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:371)
	at org.apache.spark.rdd.CartesianRDD.compute(CartesianRDD.scala:81)
	at org.apache.spark.rdd.RDD.$anonfun$computeOrReadCheckpoint$1(RDD.scala:407)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:404)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:371)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
	at org.apache.spark.rdd.RDD.$anonfun$computeOrReadCheckpoint$1(RDD.scala:407)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:404)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:371)
	at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:104)
	at org.apache.spark.rdd.RDD.$anonfun$computeOrReadCheckpoint$1(RDD.scala:407)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:404)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:371)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
	at org.apache.spark.rdd.RDD.$anonfun$computeOrReadCheckpoint$1(RDD.scala:407)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:404)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:371)
	at org.apache.spark.rdd.CoalescedRDD.$anonfun$compute$1(CoalescedRDD.scala:106)
	at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
	at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.writeWithIterator(FileFormatDataWriter.scala:91)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeTask$2(FileFormatWriter.scala:502)
	at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1751)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:509)
	... 33 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:3505)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:3436)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:3423)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:3423)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1453)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1453)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1453)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3726)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3664)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3652)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:51)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$runJob$1(DAGScheduler.scala:1200)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:80)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1188)
	at org.apache.spark.SparkContext.runJobInternal(SparkContext.scala:2833)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2816)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeWrite$4(FileFormatWriter.scala:379)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.writeAndCommit(FileFormatWriter.scala:341)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeWrite(FileFormatWriter.scala:376)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$1(FileFormatWriter.scala:257)
	at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:80)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:117)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:206)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.$anonfun$sideEffectResult$3(commands.scala:132)
	at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:80)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:130)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:129)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:144)
	at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:102)
	at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:124)
	at org.apache.spark.sql.execution.qrc.InternalRowFormat$.collect(cachedSparkResults.scala:122)
	at org.apache.spark.sql.execution.qrc.InternalRowFormat$.collect(cachedSparkResults.scala:110)
	at org.apache.spark.sql.execution.qrc.InternalRowFormat$.collect(cachedSparkResults.scala:92)
	at org.apache.spark.sql.execution.qrc.ResultCacheManager.$anonfun$computeResult$1(ResultCacheManager.scala:547)
	at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:80)
	at org.apache.spark.sql.execution.qrc.ResultCacheManager.collectResult$1(ResultCacheManager.scala:535)
	at org.apache.spark.sql.execution.qrc.ResultCacheManager.$anonfun$computeResult$2(ResultCacheManager.scala:553)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$withFinalPlanUpdate$1(AdaptiveSparkPlanExec.scala:682)
	at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:80)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:680)
	at org.apache.spark.sql.execution.qrc.ResultCacheManager.computeResult(ResultCacheManager.scala:553)
	at org.apache.spark.sql.execution.qrc.ResultCacheManager.$anonfun$getOrComputeResultInternal$1(ResultCacheManager.scala:408)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.execution.qrc.ResultCacheManager.getOrComputeResultInternal(ResultCacheManager.scala:401)
	at org.apache.spark.sql.execution.qrc.ResultCacheManager.getOrComputeResult(ResultCacheManager.scala:295)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeCollectResult$1(SparkPlan.scala:504)
	at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:80)
	at org.apache.spark.sql.execution.SparkPlan.executeCollectResult(SparkPlan.scala:501)
	at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:477)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.$anonfun$applyOrElse$3(QueryExecution.scala:270)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:165)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.$anonfun$applyOrElse$2(QueryExecution.scala:270)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$8(SQLExecution.scala:236)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:436)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$1(SQLExecution.scala:181)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:1038)
	at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:131)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:386)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.$anonfun$applyOrElse$1(QueryExecution.scala:269)
	at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$withMVTagsIfNecessary(QueryExecution.scala:247)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.applyOrElse(QueryExecution.scala:265)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.applyOrElse(QueryExecution.scala:253)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:519)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:106)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:519)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:33)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:316)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:312)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:33)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:33)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:495)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$eagerlyExecuteCommands$1(QueryExecution.scala:253)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:372)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:253)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:205)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:196)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:331)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:964)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:429)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:396)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:250)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:397)
	at py4j.Gateway.invoke(Gateway.java:306)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:195)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:115)
	at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.spark.SparkException: [TASK_WRITE_FAILED] Task failed while writing rows to s3://listing-recommender-scores-production/listing_neighbor_similarity_listing_id_2023-11-03 22:45:53.147332+00:00.csv.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.taskFailedWhileWritingRowsError(QueryExecutionErrors.scala:940)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:522)
	at org.apache.spark.sql.execution.datasources.WriteFilesExec.$anonfun$doExecuteWrite$1(WriteFiles.scala:113)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:929)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:929)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
	at org.apache.spark.rdd.RDD.$anonfun$computeOrReadCheckpoint$1(RDD.scala:407)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:404)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:371)
	at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:82)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:82)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:196)
	at org.apache.spark.scheduler.Task.doRunTask(Task.scala:181)
	at org.apache.spark.scheduler.Task.$anonfun$run$5(Task.scala:146)
	at com.databricks.unity.UCSEphemeralState$Handle.runWith(UCSEphemeralState.scala:41)
	at com.databricks.unity.HandleImpl.runWith(UCSHandle.scala:99)
	at com.databricks.unity.HandleImpl.$anonfun$runWithAndClose$1(UCSHandle.scala:104)
	at scala.util.Using$.resource(Using.scala:269)
	at com.databricks.unity.HandleImpl.runWithAndClose(UCSHandle.scala:103)
	at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:146)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.scheduler.Task.run(Task.scala:99)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$12(Executor.scala:910)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1717)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:913)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:763)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more
Caused by: java.lang.AssertionError: assertion failed
	at scala.Predef$.assert(Predef.scala:208)
	at com.databricks.photon.PhotonMetricsManager.bindToMetricsBuffer(PhotonMetricsManager.scala:53)
	at com.databricks.photon.PhotonBasicEvaluatorFactory$PhotonBasicEvaluator.evalPhoton(PhotonBasicEvaluatorFactory.scala:124)
	at com.databricks.photon.PhotonBasicEvaluatorFactory$PhotonBasicEvaluator.eval(PhotonBasicEvaluatorFactory.scala:79)
	at com.databricks.photon.PhotonExec.$anonfun$executePhoton$8(PhotonExec.scala:391)
	at com.databricks.photon.PhotonExec.$anonfun$executePhoton$8$adapted(PhotonExec.scala:389)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2(RDD.scala:916)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2$adapted(RDD.scala:916)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
	at org.apache.spark.rdd.RDD.$anonfun$computeOrReadCheckpoint$1(RDD.scala:407)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:404)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:371)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
	at org.apache.spark.rdd.RDD.$anonfun$computeOrReadCheckpoint$1(RDD.scala:407)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:404)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:371)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
	at org.apache.spark.rdd.RDD.$anonfun$computeOrReadCheckpoint$1(RDD.scala:407)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:404)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:371)
	at org.apache.spark.rdd.CartesianRDD.compute(CartesianRDD.scala:81)
	at org.apache.spark.rdd.RDD.$anonfun$computeOrReadCheckpoint$1(RDD.scala:407)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:404)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:371)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
	at org.apache.spark.rdd.RDD.$anonfun$computeOrReadCheckpoint$1(RDD.scala:407)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:404)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:371)
	at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:104)
	at org.apache.spark.rdd.RDD.$anonfun$computeOrReadCheckpoint$1(RDD.scala:407)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:404)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:371)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
	at org.apache.spark.rdd.RDD.$anonfun$computeOrReadCheckpoint$1(RDD.scala:407)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:404)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:371)
	at org.apache.spark.rdd.CoalescedRDD.$anonfun$compute$1(CoalescedRDD.scala:106)
	at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
	at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.writeWithIterator(FileFormatDataWriter.scala:91)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeTask$2(FileFormatWriter.scala:502)
	at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1751)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:509)

 

Any help would be much appreciated! 

1 ACCEPTED SOLUTION

Accepted Solutions

Kaniz_Fatma
Community Manager
Community Manager

Hi @7cb15, I understand you’re encountering issues while saving a Spark DataFrame to an S3 bucket.

Let’s troubleshoot this together!

Here are some steps and recommendations to address the problem:

  1. Check S3 Permissions:

    • Ensure that the IAM role or user associated with your Databricks cluster has the necessary permissions to write to the specified S3 bucket.
    • Verify that the access key and secret key (if applicable) are correctly configured.
  2. S3 Endpoint Configuration:

    • If you’re using an S3-compatible storage service (e.g., MinIO), ensure you’ve configured the correct endpoint URL.
    • For Amazon S3, the default endpoint is s3.amazonaws.com.
  3. File Format and Compression:

    • Consider using a more efficient file format like Parquet or ORC instead of CSV. Parquet is particularly well-suited for large datasets.
    • Parquet files are columnar and highly compressible, which can significantly reduce storage costs and improve read performance.
  4. Partitioning and Coalescing:

    • If your DataFrame has a large number of partitions, consider reducing them  df.coalesce(1) before writing to S3. This will consolidate the data into a single partition for efficient writing.
    • Example: df.coalesce(1).write.parquet("s3a://bucket-name/path/to/file.parquet", mode="overwrite")
  5. Retry and Error Handling:

    • Sometimes, transient network issues or temporary S3 glitches can cause write failures. Consider retrying the operation.
    • Implement proper error handling and logging to capture detailed error messages.
  6. Hadoop Configuration:

  7. Use s3a Protocol:

    • Use the s3a protocol instead of s3n for better performance and compatibility with modern S3 features.
    • Example: df.write.parquet("s3a://bucket-name/path/to/file.parquet", mode="overwrite")
  8. Logging and Debugging:

    • Enable detailed logging in your Spark job to capture additional information about the failure.
    • Check the Databricks logs for any specific error messages related to S3 writes.

Remember to adapt these recommendations based on your specific use case and environment.

If you encounter any further issues, please ask for more assistance! 🚀

View solution in original post

1 REPLY 1

Kaniz_Fatma
Community Manager
Community Manager

Hi @7cb15, I understand you’re encountering issues while saving a Spark DataFrame to an S3 bucket.

Let’s troubleshoot this together!

Here are some steps and recommendations to address the problem:

  1. Check S3 Permissions:

    • Ensure that the IAM role or user associated with your Databricks cluster has the necessary permissions to write to the specified S3 bucket.
    • Verify that the access key and secret key (if applicable) are correctly configured.
  2. S3 Endpoint Configuration:

    • If you’re using an S3-compatible storage service (e.g., MinIO), ensure you’ve configured the correct endpoint URL.
    • For Amazon S3, the default endpoint is s3.amazonaws.com.
  3. File Format and Compression:

    • Consider using a more efficient file format like Parquet or ORC instead of CSV. Parquet is particularly well-suited for large datasets.
    • Parquet files are columnar and highly compressible, which can significantly reduce storage costs and improve read performance.
  4. Partitioning and Coalescing:

    • If your DataFrame has a large number of partitions, consider reducing them  df.coalesce(1) before writing to S3. This will consolidate the data into a single partition for efficient writing.
    • Example: df.coalesce(1).write.parquet("s3a://bucket-name/path/to/file.parquet", mode="overwrite")
  5. Retry and Error Handling:

    • Sometimes, transient network issues or temporary S3 glitches can cause write failures. Consider retrying the operation.
    • Implement proper error handling and logging to capture detailed error messages.
  6. Hadoop Configuration:

  7. Use s3a Protocol:

    • Use the s3a protocol instead of s3n for better performance and compatibility with modern S3 features.
    • Example: df.write.parquet("s3a://bucket-name/path/to/file.parquet", mode="overwrite")
  8. Logging and Debugging:

    • Enable detailed logging in your Spark job to capture additional information about the failure.
    • Check the Databricks logs for any specific error messages related to S3 writes.

Remember to adapt these recommendations based on your specific use case and environment.

If you encounter any further issues, please ask for more assistance! 🚀

Join 100K+ Data Experts: Register Now & Grow with Us!

Excited to expand your horizons with us? Click here to Register and begin your journey to success!

Already a member? Login and join your local regional user group! If there isn’t one near you, fill out this form and we’ll create one for you to join!