Help - org.apache.spark.SparkException: Job aborted due to stage failure: Task 47 in stage 2842.0
Options
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
04-15-2024 03:14 PM
Hello,
I am training a SparkXGBRegressor model. It runs without errors if the complexity is low, however when I increase the max_depth and/or num_parallel_tree parameters, I get an error. I checked the cluster metrics during training and it doesn't looks like it is running out of memory or compute. Could you help me out?
org.apache.spark.SparkException: Job aborted due to stage failure: Task 47 in stage 2842.0 failed 4 times, most recent failure: Lost task 47.4 in stage 2842.0 (TID 87806) (10.141.64.43 executor 57): ExecutorLostFailure (executor 57 exited caused by one of the running tasks) Reason: Command exited with code 137
--------------------------------------------------------------------------- Py4JJavaError Traceback (most recent call last) File <command-3365462109699991>, line 33 30 with mlflow.start_run() as run: 31 # Declare the CrossValidator, which performs the model tuning. 32 print(f'Runinng model for max_depth: {md} and num_parallel_tree: {ntree}') ---> 33 current_predictions, model = generate_forecast(category, data = data, frequency = frequency, state = state, xgb_params = params) 34 print(f'Model training complete') 36 #generate predictions for future periods with price change data File <command-3365462109700202>, line 98, in generate_forecast(category, data, start_date, end_date, frequency, state, xgb_params, save_model) 94 cv = CrossValidator(estimator=xgb_regressor, evaluator=evaluator, estimatorParamMaps=paramGrid) 96 pipeline = Pipeline(stages=[vectorAssembler, vectorIndexer, cv]) ---> 98 pipelineModel = pipeline.fit(train) 100 # Save the trained model 101 #if save_model ==True: 102 # save_model_to_dbfs(xgb_model, category) 104 print('Generating predictions...') File /databricks/python_shell/dbruntime/MLWorkloadsInstrumentation/_pyspark.py:30, in _create_patch_function.<locals>.patched_method(self, *args, **kwargs) 28 call_succeeded = False 29 try: ---> 30 result = original_method(self, *args, **kwargs) 31 call_succeeded = True 32 return result File /databricks/python/lib/python3.10/site-packages/mlflow/utils/autologging_utils/safety.py:432, in safe_patch.<locals>.safe_patch_function(*args, **kwargs) 417 if ( 418 active_session_failed 419 or autologging_is_disabled(autologging_integration) (...) 426 # warning behavior during original function execution, since autologging is being 427 # skipped 428 with set_non_mlflow_warnings_behavior_for_current_thread( 429 disable_warnings=False, 430 reroute_warnings=False, 431 😞 --> 432 return original(*args, **kwargs) 434 # Whether or not the original / underlying function has been called during the 435 # execution of patched code 436 original_has_been_called = False File /databricks/spark/python/pyspark/ml/base.py:205, in Estimator.fit(self, dataset, params) 203 return self.copy(params)._fit(dataset) 204 else: --> 205 return self._fit(dataset) 206 else: 207 raise TypeError( 208 "Params must be either a param map or a list/tuple of param maps, " 209 "but got %s." % type(params) 210 ) File /databricks/spark/python/pyspark/ml/pipeline.py:134, in Pipeline._fit(self, dataset) 132 dataset = stage.transform(dataset) 133 else: # must be an Estimator --> 134 model = stage.fit(dataset) 135 transformers.append(model) 136 if i < indexOfLastEstimator: File /databricks/python_shell/dbruntime/MLWorkloadsInstrumentation/_pyspark.py:30, in _create_patch_function.<locals>.patched_method(self, *args, **kwargs) 28 call_succeeded = False 29 try: ---> 30 result = original_method(self, *args, **kwargs) 31 call_succeeded = True 32 return result File /databricks/python/lib/python3.10/site-packages/mlflow/utils/autologging_utils/safety.py:432, in safe_patch.<locals>.safe_patch_function(*args, **kwargs) 417 if ( 418 active_session_failed 419 or autologging_is_disabled(autologging_integration) (...) 426 # warning behavior during original function execution, since autologging is being 427 # skipped 428 with set_non_mlflow_warnings_behavior_for_current_thread( 429 disable_warnings=False, 430 reroute_warnings=False, 431 😞 --> 432 return original(*args, **kwargs) 434 # Whether or not the original / underlying function has been called during the 435 # execution of patched code 436 original_has_been_called = False File /databricks/spark/python/pyspark/ml/base.py:205, in Estimator.fit(self, dataset, params) 203 return self.copy(params)._fit(dataset) 204 else: --> 205 return self._fit(dataset) 206 else: 207 raise TypeError( 208 "Params must be either a param map or a list/tuple of param maps, " 209 "but got %s." % type(params) 210 ) File /databricks/spark/python/pyspark/ml/tuning.py:864, in CrossValidator._fit(self, dataset) 861 assert subModels is not None 862 subModels[i][j] = subModel --> 864 _cancel_on_failure(dataset._sc, self.uid, sub_task_failed, calculate_metrics) 865 # END-EDGE 867 validation.unpersist() File /databricks/spark/python/pyspark/ml/util.py:98, in _cancel_on_failure(sc, uid, sub_task_failed, f) 96 except Exception: 97 pass ---> 98 raise e 99 finally: 100 sc.setLocalProperty("spark.jobGroup.id", old_job_group) File /databricks/spark/python/pyspark/ml/util.py:86, in _cancel_on_failure(sc, uid, sub_task_failed, f) 84 sc.setLocalProperty("spark.jobGroup.id", new_job_group) 85 try: ---> 86 return f() 87 except Exception as e: 88 # Hack: this is best effort for other tasks to fail fast. Futures will 89 # read this and fail before running jobs. There's no guarantee here. 90 sub_task_failed[0] = True File /databricks/spark/python/pyspark/ml/util.py:134, in _cancel_on_ipython_post_run_cell.<locals>.decorator.<locals>.wrapper(*args, **kwargs) 131 ipython.events.unregister("post_run_cell", on_cancel) 133 ipython.events.register("post_run_cell", on_cancel) --> 134 return f(*args, **kwargs) File /databricks/spark/python/pyspark/ml/tuning.py:858, in CrossValidator._fit.<locals>.calculate_metrics() 855 raise RuntimeError("Terminate this task because one of other task failed.") 856 return task() --> 858 for j, metric, subModel in pool.imap_unordered(run_task, tasks): 859 metrics_all[i][j] = metric 860 if collectSubModelsParam: File /usr/lib/python3.10/multiprocessing/pool.py:873, in IMapIterator.next(self, timeout) 871 if success: 872 return value --> 873 raise value File /usr/lib/python3.10/multiprocessing/pool.py:125, in worker(inqueue, outqueue, initializer, initargs, maxtasks, wrap_exception) 123 job, i, func, args, kwds = task 124 try: --> 125 result = (True, func(*args, **kwds)) 126 except Exception as e: 127 if wrap_exception and func is not _helper_reraises_exception: File /databricks/spark/python/pyspark/util.py:361, in inheritable_thread_target.<locals>.wrapped(*args, **kwargs) 359 assert SparkContext._active_spark_context is not None 360 SparkContext._active_spark_context._jsc.sc().setLocalProperties(properties) --> 361 return f(*args, **kwargs) File /databricks/spark/python/pyspark/ml/tuning.py:856, in CrossValidator._fit.<locals>.calculate_metrics.<locals>.run_task(task) 854 if sub_task_failed[0]: 855 raise RuntimeError("Terminate this task because one of other task failed.") --> 856 return task() File /databricks/spark/python/pyspark/ml/tuning.py:118, in _parallelFitTasks.<locals>.singleTask() 113 index, model = next(modelIter) 114 # TODO: duplicate evaluator to take extra params from input 115 # Note: Supporting tuning params in evaluator need update method 116 # `MetaAlgorithmReadWrite.getAllNestedStages`, make it return 117 # all nested stages and evaluators --> 118 metric = eva.evaluate(model.transform(validation, epm[index])) 119 return index, metric, model if collectSubModel else None File /databricks/python/lib/python3.10/site-packages/mlflow/utils/autologging_utils/safety.py:432, in safe_patch.<locals>.safe_patch_function(*args, **kwargs) 417 if ( 418 active_session_failed 419 or autologging_is_disabled(autologging_integration) (...) 426 # warning behavior during original function execution, since autologging is being 427 # skipped 428 with set_non_mlflow_warnings_behavior_for_current_thread( 429 disable_warnings=False, 430 reroute_warnings=False, 431 😞 --> 432 return original(*args, **kwargs) 434 # Whether or not the original / underlying function has been called during the 435 # execution of patched code 436 original_has_been_called = False File /databricks/spark/python/pyspark/ml/evaluation.py:111, in Evaluator.evaluate(self, dataset, params) 109 return self.copy(params)._evaluate(dataset) 110 else: --> 111 return self._evaluate(dataset) 112 else: 113 raise TypeError("Params must be a param map but got %s." % type(params)) File /databricks/spark/python/pyspark/ml/evaluation.py:148, in JavaEvaluator._evaluate(self, dataset) 146 self._transfer_params_to_java() 147 assert self._java_obj is not None --> 148 return self._java_obj.evaluate(dataset._jdf) File /databricks/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py:1355, in JavaMember.__call__(self, *args) 1349 command = proto.CALL_COMMAND_NAME +\ 1350 self.command_header +\ 1351 args_command +\ 1352 proto.END_COMMAND_PART 1354 answer = self.gateway_client.send_command(command) -> 1355 return_value = get_return_value( 1356 answer, self.gateway_client, self.target_id, self.name) 1358 for temp_arg in temp_args: 1359 if hasattr(temp_arg, "_detach"): File /databricks/spark/python/pyspark/errors/exceptions/captured.py:188, in capture_sql_exception.<locals>.deco(*a, **kw) 186 def deco(*a: Any, **kw: Any) -> Any: 187 try: --> 188 return f(*a, **kw) 189 except Py4JJavaError as e: 190 converted = convert_exception(e.java_exception) File /databricks/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/protocol.py:326, in get_return_value(answer, gateway_client, target_id, name) 324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client) 325 if answer[1] == REFERENCE_TYPE: --> 326 raise Py4JJavaError( 327 "An error occurred while calling {0}{1}{2}.\n". 328 format(target_id, ".", name), value) 329 else: 330 raise Py4JError( 331 "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n". 332 format(target_id, ".", name, value)) Py4JJavaError: An error occurred while calling o1233.evaluate. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 47 in stage 2842.0 failed 4 times, most recent failure: Lost task 47.4 in stage 2842.0 (TID 87806) (10.141.64.43 executor 57): ExecutorLostFailure (executor 57 exited caused by one of the running tasks) Reason: Command exited with code 137 Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:3645) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:3567) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:3554) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:3554) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1521) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1521) at scala.Option.foreach(Option.scala:407) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1521) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3890) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3802) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3790) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:51) at org.apache.spark.scheduler.DAGScheduler.$anonfun$runJob$1(DAGScheduler.scala:1245) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1233) at org.apache.spark.SparkContext.runJobInternal(SparkContext.scala:2959) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2942) at org.apache.spark.SparkContext.runJob(SparkContext.scala:3054) at org.apache.spark.rdd.RDD.$anonfun$fold$1(RDD.scala:1257) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:165) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:125) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:451) at org.apache.spark.rdd.RDD.fold(RDD.scala:1251) at org.apache.spark.rdd.RDD.$anonfun$treeAggregate$2(RDD.scala:1344) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:165) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:125) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:451) at org.apache.spark.rdd.RDD.treeAggregate(RDD.scala:1311) at org.apache.spark.rdd.RDD.$anonfun$treeAggregate$1(RDD.scala:1297) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:165) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:125) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:451) at org.apache.spark.rdd.RDD.treeAggregate(RDD.scala:1297) at org.apache.spark.mllib.stat.Statistics$.colStats(Statistics.scala:58) at org.apache.spark.mllib.evaluation.RegressionMetrics.summary$lzycompute(RegressionMetrics.scala:70) at org.apache.spark.mllib.evaluation.RegressionMetrics.summary(RegressionMetrics.scala:62) at org.apache.spark.mllib.evaluation.RegressionMetrics.SSerr$lzycompute(RegressionMetrics.scala:74) at org.apache.spark.mllib.evaluation.RegressionMetrics.SSerr(RegressionMetrics.scala:74) at org.apache.spark.mllib.evaluation.RegressionMetrics.meanSquaredError(RegressionMetrics.scala:106) at org.apache.spark.mllib.evaluation.RegressionMetrics.rootMeanSquaredError(RegressionMetrics.scala:115) at org.apache.spark.ml.evaluation.RegressionEvaluator.evaluate(RegressionEvaluator.scala:100) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:397) at py4j.Gateway.invoke(Gateway.java:306) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:199) at py4j.ClientServerConnection.run(ClientServerConnection.java:119) at java.lang.Thread.run(Thread.java:750)
0 REPLIES 0

