11-12-2024 05:13 AM
Hi
I have created a model and pipeline using xgboost.spark's sparkXGBregressor and pyspark.ml's Pipeline instance. However, i run into a "RuntimeError: _get_spark_session should not be invoked from executor side." when i try to save the predictions i generate with the model pipeline to a featurestore table. Code:
import mlflow
from mlflow.tracking import MlflowClient
from datetime import datetime
from pyspark.sql.functions import col, struct, lit
from databricks.feature_engineering.entities.feature_lookup import FeatureLookup
from databricks.feature_engineering import FeatureEngineeringClient
fs = FeatureEngineeringClient()
def create_training_data(training_days):
return (
fs.read_table(name='dev_fs.xxx_featurestore_experiment.label')
.where(col("calendardate").isin(training_days))
)
training_data = create_training_data(training_days)
# Create or get existing feature table
aacc_features_lookup = [
FeatureLookup(
table_name='dev_fs.xxx_featurestore_experiment.xxx',
lookup_key=['xxxid'],
timestamp_lookup_key=['calendardate'],
feature_names=['xxx4', 'xxx5', 'xxx6']
)
]
# Create or get existing feature table
apcp_features_lookup = [
FeatureLookup(
table_name='dev_fs.xxx_featurestore_experiment.xxx',
lookup_key=['xxxid'],
timestamp_lookup_key=['calendardate'],
feature_names=['xxx1', 'xxx2', 'xxx3']
)
]
feature_lookups = aacc_features_lookup + apcp_features_lookup
# ---------------------
# the model pipeline
from pyspark.ml.feature import VectorAssembler, Imputer
from pyspark.ml import Pipeline
from xgboost.spark import SparkXGBRegressor
from typing import List
def create_new_xgb_model(features: List[str], **kwargs) -> Pipeline:
"""Create a new untrained XGBoost model using PySpark's built-in Imputer"""
# Use PySpark's built-in Imputer
imputer = Imputer(
inputCols=features,
outputCols=[f"{col}_imputed" for col in features]
)
# Assemble the features into a single vector column
assembler = VectorAssembler(
inputCols=[f"{col}_imputed" for col in features],
outputCol="features"
)
# Define the Spark XGBoost Regressor
xgb_classifier = SparkXGBRegressor(
enable_sparse_data_optim=True,
features_col="features",
label_col="age_truncated",
prediction_col="prediction",
num_workers=4,
missing=0.0,
**kwargs
)
# Create and return the pipeline
return Pipeline(stages=[imputer, assembler, xgb_classifier])
# ---------------------
# mlflow experiment
# Start MLflow run with feature store
with mlflow.start_run(run_name='feature_store_model') as run:
# Create training set with features from Feature Store
training_set = fs.create_training_set(
df=training_data,
feature_lookups=feature_lookups,
label='age_truncated'
)
# Get training data as pandas DataFrame
training_data = training_set.load_df()
# Train model
model = create_new_xgb_model(features=list_features, **optim_params)
pipeline = model.fit(training_data)
# Log model with feature store
fs.log_model(
model=pipeline,
artifact_path="feature_store_experiment_model",
flavor=mlflow.spark,
training_set=training_set,
registered_model_name="feature_store_experiment-model"
)
# ----------------------
# batch scoring
batch_df = spark.table("dev_fs.xxx_featurestore_experiment.label")
predictions = fs.score_batch(model_uri=model_uri, df=batch_df.sample(fraction=0.001))
fs.create_table(
name="dev_fs.xxx_featurestore_experiment.predictions",
primary_keys=["xxxid","calendardate"], df=predictions
)
the full error:
PythonException:
An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
File "/databricks/python/lib/python3.11/site-packages/mlflow/pyfunc/__init__.py", line 2109, in udf
loaded_model = mlflow.pyfunc.load_model(local_model_path)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/databricks/python/lib/python3.11/site-packages/mlflow/tracing/provider.py", line 244, in wrapper
except MlflowTracingException as e:
File "/databricks/python/lib/python3.11/site-packages/mlflow/pyfunc/__init__.py", line 1046, in load_model
_clear_dependencies_schemas()
File "/databricks/python/lib/python3.11/site-packages/mlflow/spark/__init__.py", line 957, in _load_pyfunc
spark_model = _load_model(model_uri=path)
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/databricks/python/lib/python3.11/site-packages/mlflow/spark/__init__.py", line 838, in _load_model
return PipelineModel.load(model_uri)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/databricks/spark/python/pyspark/ml/util.py", line 465, in load
return cls.read().load(path)
^^^^^^^^^^^^^^^^^^^^^
File "/databricks/spark/python/pyspark/ml/pipeline.py", line 288, in load
uid, stages = PipelineSharedReadWrite.load(metadata, self.sc, path)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/databricks/spark/python/pyspark/ml/pipeline.py", line 442, in load
stage: "PipelineStage" = DefaultParamsReader.loadParamsInstance(stagePath, sc)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/databricks/spark/python/pyspark/ml/util.py", line 750, in loadParamsInstance
instance = py_type.load(path)
^^^^^^^^^^^^^^^^^^
File "/databricks/spark/python/pyspark/ml/util.py", line 465, in load
return cls.read().load(path)
^^^^^^^^^^^^^^^^^^^^^
File "/databricks/python/lib/python3.11/site-packages/xgboost/spark/core.py", line 1733, in load
_get_spark_session().sparkContext.textFile(model_load_path).collect()[0]
^^^^^^^^^^^^^^^^^^^^
File "/databricks/python/lib/python3.11/site-packages/xgboost/spark/utils.py", line 94, in _get_spark_session
raise RuntimeError(
RuntimeError: _get_spark_session should not be invoked from executor side.
any help would be much appreciated. I am trying to test out databricks featurestore clients functions (like fs.log_model, fs.batch_score, etc.), so i would like suggestions where these functions are used.
11-14-2024 02:02 PM
The error you're encountering is due to attempting to access the Spark session on the executor side, which is not allowed in Spark's distributed computing model. This typically happens when trying to use Spark-specific functionality within a UDF or during model inference on executors.To resolve this issue and use Databricks Feature Store functions with your XGBoost model, you need to make some adjustments to your approach. Here are some suggestions:
When logging the model, use mlflow.pyfunc
instead of mlflow.spark
. This will allow you to use the model for batch scoring without encountering Spark session issues on executors.
fs.log_model(
model=pipeline,
artifact_path="feature_store_experiment_model",
flavor=mlflow.pyfunc,
training_set=training_set,
registered_model_name="feature_store_experiment-model"
)
fs.score_batch()
for batch scoring:Instead of loading the model and applying it directly, use the Feature Store's score_batch()
function, which is designed to handle this scenario:
batch_df = spark.table("dev_fs.xxx_featurestore_experiment.label")
predictions = fs.score_batch(
model_uri="models:/feature_store_experiment-model/latest",
df=batch_df.sample(fraction=0.001)
)
After getting the predictions, you can create or update the feature store table:
fs.create_table(
name="dev_fs.xxx_featurestore_experiment.predictions",
primary_keys=["xxxid", "calendardate"],
df=predictions
)
Make sure that the feature names used during training match those in your feature lookups and batch scoring dataframe.
If you need to perform operations that require access to the Spark session on executors, consider using Koalas or Pandas UDFs, which are designed to work in a distributed environment.
If you're working with large datasets, consider using Spark's built-in ML library (MLlib) instead of XGBoost, as it's designed to work natively with Spark's distributed computing model.
04-17-2025 09:30 AM
Did you ever find a resolution to this? I've been running into the same error with a Spark XGBoost classification model, and haven't had any success in finding a solution. Setting it to a pyfunc model in logging resulted in an error, and clearly you already were using the score_batch function he recommended.
06-09-2025 09:03 AM
I have tried the recommended solution logging the model with the parameter flavor = mlflow.pyfunc but it returns the following error when logging the model using FeatureEngineeringClient.log_model function:
_validate_function_python_model(python_model)
File "/databricks/python/lib/python3.11/site-packages/mlflow/pyfunc/__init__.py", line 2597, in _validate_function_python_model
raise MlflowException(
mlflow.exceptions.MlflowException: `python_model` must be a PythonModel instance, callable object, or path to a script that uses set_model() to set a PythonModel instance or callable object.
Is there a different approach?. I'm able to do batch score using different models (pyspark.ml ones) but not with xgboost.
Passionate about hosting events and connecting people? Help us grow a vibrant local community—sign up today to get started!
Sign Up Now