Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe. : org.apache.spark.SparkException: Job aborted due to stage failure: Could not recover from a failed barrier ResultStage. Most recent failure reason: Stage failed because barrier task ResultTask(14, 3) finished unsuccessfully. org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/databricks/python/lib/python3.12/site-packages/xgboost/spark/core.py", line 1082, in _train_booster dtrain, dvalid = create_dmatrix_from_partitions( ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/databricks/python/lib/python3.12/site-packages/xgboost/spark/data.py", line 312, in create_dmatrix_from_partitions cache_partitions(iterator, append_fn) File "/databricks/python/lib/python3.12/site-packages/xgboost/spark/data.py", line 59, in cache_partitions train = part.loc[~part[alias.valid], :] ~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^ File "/databricks/python/lib/python3.12/site-packages/pandas/core/indexing.py", line 1067, in __getitem__ return self._getitem_tuple(key) ^^^^^^^^^^^^^^^^^^^^^^^^ File "/databricks/python/lib/python3.12/site-packages/pandas/core/indexing.py", line 1256, in _getitem_tuple return self._getitem_tuple_same_dim(tup) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/databricks/python/lib/python3.12/site-packages/pandas/core/indexing.py", line 924, in _getitem_tuple_same_dim retval = getattr(retval, self.name)._getitem_axis(key, axis=i) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/databricks/python/lib/python3.12/site-packages/pandas/core/indexing.py", line 1301, in _getitem_axis return self._getitem_iterable(key, axis=axis) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/databricks/python/lib/python3.12/site-packages/pandas/core/indexing.py", line 1239, in _getitem_iterable keyarr, indexer = self._get_listlike_indexer(key, axis) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/databricks/python/lib/python3.12/site-packages/pandas/core/indexing.py", line 1432, in _get_listlike_indexer keyarr, indexer = ax._get_indexer_strict(key, axis_name) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/databricks/python/lib/python3.12/site-packages/pandas/core/indexes/base.py", line 6070, in _get_indexer_strict self._raise_if_missing(keyarr, indexer, axis_name) File "/databricks/python/lib/python3.12/site-packages/pandas/core/indexes/base.py", line 6130, in _raise_if_missing raise KeyError(f"None of [{key}] are in the [{axis_name}]") KeyError: "None of [Int64Index([-1, -1, -1, -1, -1, -1, -1, -1, -1, -1,\n ...\n -1, -1, -1, -1, -1, -1, -1, -1, -1, -1],\n dtype='int64', length=10000)] are in the [index]" at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:851) at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:117) at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:800) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.hasNext(SerDeUtil.scala:90) at org.apache.spark.api.python.PythonRDD$.writeNextElementToStream(PythonRDD.scala:504) at org.apache.spark.api.python.PythonRunner$$anon$2.writeNextInputToStream(PythonRunner.scala:1283) at org.apache.spark.api.python.BasePythonRunner$ReaderInputStream.writeAdditionalInputToPythonWorker(PythonRunner.scala:1191) at org.apache.spark.api.python.BasePythonRunner$ReaderInputStream.read(PythonRunner.scala:1105) at java.base/java.io.BufferedInputStream.fill(BufferedInputStream.java:244) at java.base/java.io.BufferedInputStream.read(BufferedInputStream.java:263) at java.base/java.io.DataInputStream.readInt(DataInputStream.java:381) at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:1310) at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:1302) at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:800) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator.foreach(Iterator.scala:943) at scala.collection.Iterator.foreach$(Iterator.scala:943) at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28) at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62) at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49) at scala.collection.TraversableOnce.to(TraversableOnce.scala:366) at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364) at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28) at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358) at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358) at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28) at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345) at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339) at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28) at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1113) at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$2(ResultTask.scala:76) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:76) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:227) at org.apache.spark.scheduler.Task.doRunTask(Task.scala:204) at org.apache.spark.scheduler.Task.$anonfun$run$5(Task.scala:166) at com.databricks.unity.UCSEphemeralState$Handle.runWith(UCSEphemeralState.scala:51) at com.databricks.unity.HandleImpl.runWith(UCSHandle.scala:104) at com.databricks.unity.HandleImpl.$anonfun$runWithAndClose$1(UCSHandle.scala:109) at scala.util.Using$.resource(Using.scala:269) at com.databricks.unity.HandleImpl.runWithAndClose(UCSHandle.scala:108) at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:160) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.scheduler.Task.run(Task.scala:105) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$11(Executor.scala:1228) at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:80) at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:77) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:111) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:1232) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:1088) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) at java.base/java.lang.Thread.run(Thread.java:840) at org.apache.spark.scheduler.DAGScheduler.$anonfun$failJobAndIndependentStages$1(DAGScheduler.scala:4472) at scala.Option.getOrElse(Option.scala:189) at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:4470) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:4382) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:4369) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:4369) at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:3737) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:4730) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.liftedTree1$1(DAGScheduler.scala:4634) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:4633) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:4619) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:55) at org.apache.spark.scheduler.DAGScheduler.$anonfun$runJob$1(DAGScheduler.scala:1512) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1498) at org.apache.spark.SparkContext.runJobInternal(SparkContext.scala:3254) at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1111) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:165) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:125) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:461) at org.apache.spark.rdd.RDD.collect(RDD.scala:1109) at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:319) at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:569) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:397) at py4j.Gateway.invoke(Gateway.java:306) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:197) at py4j.ClientServerConnection.run(ClientServerConnection.java:117) at java.base/java.lang.Thread.run(Thread.java:840)
File
<command-5246778553375285>, line 1 ----> 1 pipeline_model
= pipeline
.fit(exploded)
2 ranker_model
= pipeline_model
.stages[
-1]
4 native_booster
= ranker_model
.get_booster()
File /databricks/python_shell/lib/dbruntime/MLWorkloadsInstrumentation/_pyspark.py:30, in _create_patch_function.<locals>.patched_method(self, *args, **kwargs) 28 call_succeeded = False 29 try: ---> 30 result = original_method(self, *args, **kwargs) 31 call_succeeded = True 32 return result
File /databricks/python/lib/python3.12/site-packages/mlflow/utils/autologging_utils/safety.py:483, in safe_patch.<locals>.safe_patch_function(*args, **kwargs) 479 call_original = update_wrapper_extended(call_original, original) 481 event_logger.log_patch_function_start(args, kwargs) --> 483 patch_function(call_original, *args, **kwargs) 485 session.state = "succeeded" 486 event_logger.log_patch_function_success(args, kwargs)
File /databricks/python/lib/python3.12/site-packages/mlflow/utils/autologging_utils/safety.py:182, in with_managed_run.<locals>.patch_with_managed_run(original, *args, **kwargs) 179 managed_run = create_managed_run() 181 try: --> 182 result = patch_function(original, *args, **kwargs) 183 except (Exception, KeyboardInterrupt😞 184 # In addition to standard Python exceptions, handle keyboard interrupts to ensure 185 # that runs are terminated if a user prematurely interrupts training execution 186 # (e.g. via sigint / ctrl-c) 187 if managed_run:
File /databricks/python/lib/python3.12/site-packages/mlflow/pyspark/ml/__init__.py:1172, in autolog.<locals>.patched_fit(original, self, *args, **kwargs) 1170 if t.should_log(): 1171 with _AUTOLOGGING_METRICS_MANAGER.disable_log_post_training_metrics(): -> 1172 fit_result = fit_mlflow(original, self, *args, **kwargs) 1173 # In some cases the `fit_result` may be an iterator of spark models. 1174 if should_log_post_training_metrics and isinstance(fit_result, Model):
File /databricks/python/lib/python3.12/site-packages/mlflow/pyspark/ml/__init__.py:1158, in autolog.<locals>.fit_mlflow(original, self, *args, **kwargs) 1156 input_training_df = args[0].persist(StorageLevel.MEMORY_AND_DISK) 1157 _log_pretraining_metadata(estimator, params, input_training_df) -> 1158 spark_model = original(self, *args, **kwargs) 1159 _log_posttraining_metadata(estimator, spark_model, params, input_training_df) 1160 input_training_df.unpersist()
File /databricks/python/lib/python3.12/site-packages/mlflow/utils/autologging_utils/safety.py:474, in safe_patch.<locals>.safe_patch_function.<locals>.call_original(*og_args, **og_kwargs) 471 original_result = original(*_og_args, **_og_kwargs) 472 return original_result --> 474 return call_original_fn_with_event_logging(_original_fn, og_args, og_kwargs)
File /databricks/python/lib/python3.12/site-packages/mlflow/utils/autologging_utils/safety.py:425, in safe_patch.<locals>.safe_patch_function.<locals>.call_original_fn_with_event_logging(original_fn, og_args, og_kwargs) 422 try: 423 event_logger.log_original_function_start(og_args, og_kwargs) --> 425 original_fn_result = original_fn(*og_args, **og_kwargs) 427 event_logger.log_original_function_success(og_args, og_kwargs) 428 return original_fn_result
File /databricks/python/lib/python3.12/site-packages/mlflow/utils/autologging_utils/safety.py:471, in safe_patch.<locals>.safe_patch_function.<locals>.call_original.<locals>._original_fn(*_og_args, **_og_kwargs) 463 # Show all non-MLflow warnings as normal (i.e. not as event logs) 464 # during original function execution, even if silent mode is enabled 465 # (`silent=True`), since these warnings originate from the ML framework 466 # or one of its dependencies and are likely relevant to the caller 467 with NonMlflowWarningsBehaviorForCurrentThread( 468 disable_warnings=False, 469 reroute_warnings=False, 470 😞 --> 471 original_result = original(*_og_args, **_og_kwargs) 472 return original_result
File /databricks/spark/python/pyspark/ml/base.py:203, in Estimator.fit(self, dataset, params) 201 return self.copy(params)._fit(dataset) 202 else: --> 203 return self._fit(dataset) 204 else: 205 raise TypeError( 206 "Params must be either a param map or a list/tuple of param maps, " 207 "but got %s." % type(params) 208 )
File /databricks/spark/python/pyspark/ml/pipeline.py:136, in Pipeline._fit(self, dataset) 134 dataset = stage.transform(dataset) 135 else: # must be an Estimator --> 136 model = stage.fit(dataset) 137 transformers.append(model) 138 if i < indexOfLastEstimator:
File /databricks/python_shell/lib/dbruntime/MLWorkloadsInstrumentation/_pyspark.py:30, in _create_patch_function.<locals>.patched_method(self, *args, **kwargs) 28 call_succeeded = False 29 try: ---> 30 result = original_method(self, *args, **kwargs) 31 call_succeeded = True 32 return result
File /databricks/python/lib/python3.12/site-packages/mlflow/utils/autologging_utils/safety.py:483, in safe_patch.<locals>.safe_patch_function(*args, **kwargs) 479 call_original = update_wrapper_extended(call_original, original) 481 event_logger.log_patch_function_start(args, kwargs) --> 483 patch_function(call_original, *args, **kwargs) 485 session.state = "succeeded" 486 event_logger.log_patch_function_success(args, kwargs)
File /databricks/python/lib/python3.12/site-packages/mlflow/utils/autologging_utils/safety.py:182, in with_managed_run.<locals>.patch_with_managed_run(original, *args, **kwargs) 179 managed_run = create_managed_run() 181 try: --> 182 result = patch_function(original, *args, **kwargs) 183 except (Exception, KeyboardInterrupt😞 184 # In addition to standard Python exceptions, handle keyboard interrupts to ensure 185 # that runs are terminated if a user prematurely interrupts training execution 186 # (e.g. via sigint / ctrl-c) 187 if managed_run:
File /databricks/python/lib/python3.12/site-packages/mlflow/pyspark/ml/__init__.py:1180, in autolog.<locals>.patched_fit(original, self, *args, **kwargs) 1178 return fit_result 1179 else: -> 1180 return original(self, *args, **kwargs)
File /databricks/python/lib/python3.12/site-packages/mlflow/utils/autologging_utils/safety.py:474, in safe_patch.<locals>.safe_patch_function.<locals>.call_original(*og_args, **og_kwargs) 471 original_result = original(*_og_args, **_og_kwargs) 472 return original_result --> 474 return call_original_fn_with_event_logging(_original_fn, og_args, og_kwargs)
File /databricks/python/lib/python3.12/site-packages/mlflow/utils/autologging_utils/safety.py:425, in safe_patch.<locals>.safe_patch_function.<locals>.call_original_fn_with_event_logging(original_fn, og_args, og_kwargs) 422 try: 423 event_logger.log_original_function_start(og_args, og_kwargs) --> 425 original_fn_result = original_fn(*og_args, **og_kwargs) 427 event_logger.log_original_function_success(og_args, og_kwargs) 428 return original_fn_result
File /databricks/python/lib/python3.12/site-packages/mlflow/utils/autologging_utils/safety.py:471, in safe_patch.<locals>.safe_patch_function.<locals>.call_original.<locals>._original_fn(*_og_args, **_og_kwargs) 463 # Show all non-MLflow warnings as normal (i.e. not as event logs) 464 # during original function execution, even if silent mode is enabled 465 # (`silent=True`), since these warnings originate from the ML framework 466 # or one of its dependencies and are likely relevant to the caller 467 with NonMlflowWarningsBehaviorForCurrentThread( 468 disable_warnings=False, 469 reroute_warnings=False, 470 😞 --> 471 original_result = original(*_og_args, **_og_kwargs) 472 return original_result
File /databricks/spark/python/pyspark/ml/base.py:203, in Estimator.fit(self, dataset, params) 201 return self.copy(params)._fit(dataset) 202 else: --> 203 return self._fit(dataset) 204 else: 205 raise TypeError( 206 "Params must be either a param map or a list/tuple of param maps, " 207 "but got %s." % type(params) 208 )
File /databricks/python/lib/python3.12/site-packages/xgboost/spark/core.py:1136, in _SparkXGBEstimator._fit(self, dataset) 1123 return ret[0], ret[1] 1125 get_logger("XGBoost-PySpark").info( 1126 "Running xgboost-%s on %s workers with" 1127 "\n\tbooster params: %s" (...) 1134 dmatrix_kwargs, 1135 ) -> 1136 (config, booster) = _run_job() 1137 get_logger("XGBoost-PySpark").info("Finished xgboost training!") 1139 result_xgb_model = self._convert_to_sklearn_model( 1140 bytearray(booster, "utf-8"), config 1141 )
File /databricks/python/lib/python3.12/site-packages/xgboost/spark/core.py:1122, in _SparkXGBEstimator._fit.<locals>._run_job() 1113 rdd = ( 1114 dataset.mapInPandas( 1115 _train_booster, # type: ignore (...) 1119 .mapPartitions(lambda x: x) 1120 ) 1121 rdd_with_resource = self._try_stage_level_scheduling(rdd) -> 1122 ret = rdd_with_resource.collect()[0] 1123 return ret[0], ret[1]
File /databricks/spark/python/pyspark/instrumentation_utils.py:47, in _wrap_function.<locals>.wrapper(*args, **kwargs) 45 start = time.perf_counter() 46 try: ---> 47 res = func(*args, **kwargs) 48 logger.log_success( 49 module_name, class_name, function_name, time.perf_counter() - start, signature 50 ) 51 return res
File /databricks/spark/python/pyspark/core/rdd.py:1721, in RDD.collect(self) 1719 with SCCallSiteSync(self.context): 1720 assert self.ctx._jvm is not None -> 1721 sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd()) 1722 return list(_load_from_socket(sock_info, self._jrdd_deserializer))
File /databricks/spark/python/lib/py4j-0.10.9.9-src.zip/py4j/java_gateway.py:1362, in JavaMember.__call__(self, *args) 1356 command = proto.CALL_COMMAND_NAME +\ 1357 self.command_header +\ 1358 args_command +\ 1359 proto.END_COMMAND_PART 1361 answer = self.gateway_client.send_command(command) -> 1362 return_value = get_return_value( 1363 answer, self.gateway_client, self.target_id, self.name) 1365 for temp_arg in temp_args: 1366 if hasattr(temp_arg, "_detach"😞
File /databricks/spark/python/pyspark/errors/exceptions/captured.py:269, in capture_sql_exception.<locals>.deco(*a, **kw) 266 from py4j.protocol import Py4JJavaError 268 try: --> 269 return f(*a, **kw) 270 except Py4JJavaError as e: 271 converted = convert_exception(e.java_exception)
File /databricks/spark/python/lib/py4j-0.10.9.9-src.zip/py4j/protocol.py:327, in get_return_value(answer, gateway_client, target_id, name) 325 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client) 326 if answer[1] == REFERENCE_TYPE: --> 327 raise Py4JJavaError( 328 "An error occurred while calling {0}{1}{2}.\n". 329 format(target_id, ".", name), value) 330 else: 331 raise Py4JError( 332 "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n". 333 format(target_id, ".", name, value))
```