04-15-2024 03:14 PM
Hello,
I am training a SparkXGBRegressor model. It runs without errors if the complexity is low, however when I increase the max_depth and/or num_parallel_tree parameters, I get an error. I checked the cluster metrics during training and it doesn't looks like it is running out of memory or compute. Could you help me out?
org.apache.spark.SparkException: Job aborted due to stage failure: Task 47 in stage 2842.0 failed 4 times, most recent failure: Lost task 47.4 in stage 2842.0 (TID 87806) (10.141.64.43 executor 57): ExecutorLostFailure (executor 57 exited caused by one of the running tasks) Reason: Command exited with code 137
--------------------------------------------------------------------------- Py4JJavaError Traceback (most recent call last) File <command-3365462109699991>, line 33 30 with mlflow.start_run() as run: 31 # Declare the CrossValidator, which performs the model tuning. 32 print(f'Runinng model for max_depth: {md} and num_parallel_tree: {ntree}') ---> 33 current_predictions, model = generate_forecast(category, data = data, frequency = frequency, state = state, xgb_params = params) 34 print(f'Model training complete') 36 #generate predictions for future periods with price change data File <command-3365462109700202>, line 98, in generate_forecast(category, data, start_date, end_date, frequency, state, xgb_params, save_model) 94 cv = CrossValidator(estimator=xgb_regressor, evaluator=evaluator, estimatorParamMaps=paramGrid) 96 pipeline = Pipeline(stages=[vectorAssembler, vectorIndexer, cv]) ---> 98 pipelineModel = pipeline.fit(train) 100 # Save the trained model 101 #if save_model ==True: 102 # save_model_to_dbfs(xgb_model, category) 104 print('Generating predictions...') File /databricks/python_shell/dbruntime/MLWorkloadsInstrumentation/_pyspark.py:30, in _create_patch_function.<locals>.patched_method(self, *args, **kwargs) 28 call_succeeded = False 29 try: ---> 30 result = original_method(self, *args, **kwargs) 31 call_succeeded = True 32 return result File /databricks/python/lib/python3.10/site-packages/mlflow/utils/autologging_utils/safety.py:432, in safe_patch.<locals>.safe_patch_function(*args, **kwargs) 417 if ( 418 active_session_failed 419 or autologging_is_disabled(autologging_integration) (...) 426 # warning behavior during original function execution, since autologging is being 427 # skipped 428 with set_non_mlflow_warnings_behavior_for_current_thread( 429 disable_warnings=False, 430 reroute_warnings=False, 431 😞 --> 432 return original(*args, **kwargs) 434 # Whether or not the original / underlying function has been called during the 435 # execution of patched code 436 original_has_been_called = False File /databricks/spark/python/pyspark/ml/base.py:205, in Estimator.fit(self, dataset, params) 203 return self.copy(params)._fit(dataset) 204 else: --> 205 return self._fit(dataset) 206 else: 207 raise TypeError( 208 "Params must be either a param map or a list/tuple of param maps, " 209 "but got %s." % type(params) 210 ) File /databricks/spark/python/pyspark/ml/pipeline.py:134, in Pipeline._fit(self, dataset) 132 dataset = stage.transform(dataset) 133 else: # must be an Estimator --> 134 model = stage.fit(dataset) 135 transformers.append(model) 136 if i < indexOfLastEstimator: File /databricks/python_shell/dbruntime/MLWorkloadsInstrumentation/_pyspark.py:30, in _create_patch_function.<locals>.patched_method(self, *args, **kwargs) 28 call_succeeded = False 29 try: ---> 30 result = original_method(self, *args, **kwargs) 31 call_succeeded = True 32 return result File /databricks/python/lib/python3.10/site-packages/mlflow/utils/autologging_utils/safety.py:432, in safe_patch.<locals>.safe_patch_function(*args, **kwargs) 417 if ( 418 active_session_failed 419 or autologging_is_disabled(autologging_integration) (...) 426 # warning behavior during original function execution, since autologging is being 427 # skipped 428 with set_non_mlflow_warnings_behavior_for_current_thread( 429 disable_warnings=False, 430 reroute_warnings=False, 431 😞 --> 432 return original(*args, **kwargs) 434 # Whether or not the original / underlying function has been called during the 435 # execution of patched code 436 original_has_been_called = False File /databricks/spark/python/pyspark/ml/base.py:205, in Estimator.fit(self, dataset, params) 203 return self.copy(params)._fit(dataset) 204 else: --> 205 return self._fit(dataset) 206 else: 207 raise TypeError( 208 "Params must be either a param map or a list/tuple of param maps, " 209 "but got %s." % type(params) 210 ) File /databricks/spark/python/pyspark/ml/tuning.py:864, in CrossValidator._fit(self, dataset) 861 assert subModels is not None 862 subModels[i][j] = subModel --> 864 _cancel_on_failure(dataset._sc, self.uid, sub_task_failed, calculate_metrics) 865 # END-EDGE 867 validation.unpersist() File /databricks/spark/python/pyspark/ml/util.py:98, in _cancel_on_failure(sc, uid, sub_task_failed, f) 96 except Exception: 97 pass ---> 98 raise e 99 finally: 100 sc.setLocalProperty("spark.jobGroup.id", old_job_group) File /databricks/spark/python/pyspark/ml/util.py:86, in _cancel_on_failure(sc, uid, sub_task_failed, f) 84 sc.setLocalProperty("spark.jobGroup.id", new_job_group) 85 try: ---> 86 return f() 87 except Exception as e: 88 # Hack: this is best effort for other tasks to fail fast. Futures will 89 # read this and fail before running jobs. There's no guarantee here. 90 sub_task_failed[0] = True File /databricks/spark/python/pyspark/ml/util.py:134, in _cancel_on_ipython_post_run_cell.<locals>.decorator.<locals>.wrapper(*args, **kwargs) 131 ipython.events.unregister("post_run_cell", on_cancel) 133 ipython.events.register("post_run_cell", on_cancel) --> 134 return f(*args, **kwargs) File /databricks/spark/python/pyspark/ml/tuning.py:858, in CrossValidator._fit.<locals>.calculate_metrics() 855 raise RuntimeError("Terminate this task because one of other task failed.") 856 return task() --> 858 for j, metric, subModel in pool.imap_unordered(run_task, tasks): 859 metrics_all[i][j] = metric 860 if collectSubModelsParam: File /usr/lib/python3.10/multiprocessing/pool.py:873, in IMapIterator.next(self, timeout) 871 if success: 872 return value --> 873 raise value File /usr/lib/python3.10/multiprocessing/pool.py:125, in worker(inqueue, outqueue, initializer, initargs, maxtasks, wrap_exception) 123 job, i, func, args, kwds = task 124 try: --> 125 result = (True, func(*args, **kwds)) 126 except Exception as e: 127 if wrap_exception and func is not _helper_reraises_exception: File /databricks/spark/python/pyspark/util.py:361, in inheritable_thread_target.<locals>.wrapped(*args, **kwargs) 359 assert SparkContext._active_spark_context is not None 360 SparkContext._active_spark_context._jsc.sc().setLocalProperties(properties) --> 361 return f(*args, **kwargs) File /databricks/spark/python/pyspark/ml/tuning.py:856, in CrossValidator._fit.<locals>.calculate_metrics.<locals>.run_task(task) 854 if sub_task_failed[0]: 855 raise RuntimeError("Terminate this task because one of other task failed.") --> 856 return task() File /databricks/spark/python/pyspark/ml/tuning.py:118, in _parallelFitTasks.<locals>.singleTask() 113 index, model = next(modelIter) 114 # TODO: duplicate evaluator to take extra params from input 115 # Note: Supporting tuning params in evaluator need update method 116 # `MetaAlgorithmReadWrite.getAllNestedStages`, make it return 117 # all nested stages and evaluators --> 118 metric = eva.evaluate(model.transform(validation, epm[index])) 119 return index, metric, model if collectSubModel else None File /databricks/python/lib/python3.10/site-packages/mlflow/utils/autologging_utils/safety.py:432, in safe_patch.<locals>.safe_patch_function(*args, **kwargs) 417 if ( 418 active_session_failed 419 or autologging_is_disabled(autologging_integration) (...) 426 # warning behavior during original function execution, since autologging is being 427 # skipped 428 with set_non_mlflow_warnings_behavior_for_current_thread( 429 disable_warnings=False, 430 reroute_warnings=False, 431 😞 --> 432 return original(*args, **kwargs) 434 # Whether or not the original / underlying function has been called during the 435 # execution of patched code 436 original_has_been_called = False File /databricks/spark/python/pyspark/ml/evaluation.py:111, in Evaluator.evaluate(self, dataset, params) 109 return self.copy(params)._evaluate(dataset) 110 else: --> 111 return self._evaluate(dataset) 112 else: 113 raise TypeError("Params must be a param map but got %s." % type(params)) File /databricks/spark/python/pyspark/ml/evaluation.py:148, in JavaEvaluator._evaluate(self, dataset) 146 self._transfer_params_to_java() 147 assert self._java_obj is not None --> 148 return self._java_obj.evaluate(dataset._jdf) File /databricks/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py:1355, in JavaMember.__call__(self, *args) 1349 command = proto.CALL_COMMAND_NAME +\ 1350 self.command_header +\ 1351 args_command +\ 1352 proto.END_COMMAND_PART 1354 answer = self.gateway_client.send_command(command) -> 1355 return_value = get_return_value( 1356 answer, self.gateway_client, self.target_id, self.name) 1358 for temp_arg in temp_args: 1359 if hasattr(temp_arg, "_detach"): File /databricks/spark/python/pyspark/errors/exceptions/captured.py:188, in capture_sql_exception.<locals>.deco(*a, **kw) 186 def deco(*a: Any, **kw: Any) -> Any: 187 try: --> 188 return f(*a, **kw) 189 except Py4JJavaError as e: 190 converted = convert_exception(e.java_exception) File /databricks/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/protocol.py:326, in get_return_value(answer, gateway_client, target_id, name) 324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client) 325 if answer[1] == REFERENCE_TYPE: --> 326 raise Py4JJavaError( 327 "An error occurred while calling {0}{1}{2}.\n". 328 format(target_id, ".", name), value) 329 else: 330 raise Py4JError( 331 "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n". 332 format(target_id, ".", name, value)) Py4JJavaError: An error occurred while calling o1233.evaluate. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 47 in stage 2842.0 failed 4 times, most recent failure: Lost task 47.4 in stage 2842.0 (TID 87806) (10.141.64.43 executor 57): ExecutorLostFailure (executor 57 exited caused by one of the running tasks) Reason: Command exited with code 137 Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:3645) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:3567) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:3554) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:3554) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1521) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1521) at scala.Option.foreach(Option.scala:407) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1521) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3890) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3802) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3790) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:51) at org.apache.spark.scheduler.DAGScheduler.$anonfun$runJob$1(DAGScheduler.scala:1245) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1233) at org.apache.spark.SparkContext.runJobInternal(SparkContext.scala:2959) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2942) at org.apache.spark.SparkContext.runJob(SparkContext.scala:3054) at org.apache.spark.rdd.RDD.$anonfun$fold$1(RDD.scala:1257) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:165) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:125) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:451) at org.apache.spark.rdd.RDD.fold(RDD.scala:1251) at org.apache.spark.rdd.RDD.$anonfun$treeAggregate$2(RDD.scala:1344) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:165) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:125) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:451) at org.apache.spark.rdd.RDD.treeAggregate(RDD.scala:1311) at org.apache.spark.rdd.RDD.$anonfun$treeAggregate$1(RDD.scala:1297) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:165) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:125) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:451) at org.apache.spark.rdd.RDD.treeAggregate(RDD.scala:1297) at org.apache.spark.mllib.stat.Statistics$.colStats(Statistics.scala:58) at org.apache.spark.mllib.evaluation.RegressionMetrics.summary$lzycompute(RegressionMetrics.scala:70) at org.apache.spark.mllib.evaluation.RegressionMetrics.summary(RegressionMetrics.scala:62) at org.apache.spark.mllib.evaluation.RegressionMetrics.SSerr$lzycompute(RegressionMetrics.scala:74) at org.apache.spark.mllib.evaluation.RegressionMetrics.SSerr(RegressionMetrics.scala:74) at org.apache.spark.mllib.evaluation.RegressionMetrics.meanSquaredError(RegressionMetrics.scala:106) at org.apache.spark.mllib.evaluation.RegressionMetrics.rootMeanSquaredError(RegressionMetrics.scala:115) at org.apache.spark.ml.evaluation.RegressionEvaluator.evaluate(RegressionEvaluator.scala:100) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:397) at py4j.Gateway.invoke(Gateway.java:306) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:199) at py4j.ClientServerConnection.run(ClientServerConnection.java:119) at java.lang.Thread.run(Thread.java:750)
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.
If there isn’t a group near you, start one and help create a community that brings people together.
Request a New Group