spark_session invocation from executor side error, when using sparkXGBregressor and fe client
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
11-12-2024 05:13 AM
Hi
I have created a model and pipeline using xgboost.spark's sparkXGBregressor and pyspark.ml's Pipeline instance. However, i run into a "RuntimeError: _get_spark_session should not be invoked from executor side." when i try to save the predictions i generate with the model pipeline to a featurestore table. Code:
import mlflow
from mlflow.tracking import MlflowClient
from datetime import datetime
from pyspark.sql.functions import col, struct, lit
from databricks.feature_engineering.entities.feature_lookup import FeatureLookup
from databricks.feature_engineering import FeatureEngineeringClient
fs = FeatureEngineeringClient()
def create_training_data(training_days):
return (
fs.read_table(name='dev_fs.xxx_featurestore_experiment.label')
.where(col("calendardate").isin(training_days))
)
training_data = create_training_data(training_days)
# Create or get existing feature table
aacc_features_lookup = [
FeatureLookup(
table_name='dev_fs.xxx_featurestore_experiment.xxx',
lookup_key=['xxxid'],
timestamp_lookup_key=['calendardate'],
feature_names=['xxx4', 'xxx5', 'xxx6']
)
]
# Create or get existing feature table
apcp_features_lookup = [
FeatureLookup(
table_name='dev_fs.xxx_featurestore_experiment.xxx',
lookup_key=['xxxid'],
timestamp_lookup_key=['calendardate'],
feature_names=['xxx1', 'xxx2', 'xxx3']
)
]
feature_lookups = aacc_features_lookup + apcp_features_lookup
# ---------------------
# the model pipeline
from pyspark.ml.feature import VectorAssembler, Imputer
from pyspark.ml import Pipeline
from xgboost.spark import SparkXGBRegressor
from typing import List
def create_new_xgb_model(features: List[str], **kwargs) -> Pipeline:
"""Create a new untrained XGBoost model using PySpark's built-in Imputer"""
# Use PySpark's built-in Imputer
imputer = Imputer(
inputCols=features,
outputCols=[f"{col}_imputed" for col in features]
)
# Assemble the features into a single vector column
assembler = VectorAssembler(
inputCols=[f"{col}_imputed" for col in features],
outputCol="features"
)
# Define the Spark XGBoost Regressor
xgb_classifier = SparkXGBRegressor(
enable_sparse_data_optim=True,
features_col="features",
label_col="age_truncated",
prediction_col="prediction",
num_workers=4,
missing=0.0,
**kwargs
)
# Create and return the pipeline
return Pipeline(stages=[imputer, assembler, xgb_classifier])
# ---------------------
# mlflow experiment
# Start MLflow run with feature store
with mlflow.start_run(run_name='feature_store_model') as run:
# Create training set with features from Feature Store
training_set = fs.create_training_set(
df=training_data,
feature_lookups=feature_lookups,
label='age_truncated'
)
# Get training data as pandas DataFrame
training_data = training_set.load_df()
# Train model
model = create_new_xgb_model(features=list_features, **optim_params)
pipeline = model.fit(training_data)
# Log model with feature store
fs.log_model(
model=pipeline,
artifact_path="feature_store_experiment_model",
flavor=mlflow.spark,
training_set=training_set,
registered_model_name="feature_store_experiment-model"
)
# ----------------------
# batch scoring
batch_df = spark.table("dev_fs.xxx_featurestore_experiment.label")
predictions = fs.score_batch(model_uri=model_uri, df=batch_df.sample(fraction=0.001))
fs.create_table(
name="dev_fs.xxx_featurestore_experiment.predictions",
primary_keys=["xxxid","calendardate"], df=predictions
)
the full error:
PythonException:
An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
File "/databricks/python/lib/python3.11/site-packages/mlflow/pyfunc/__init__.py", line 2109, in udf
loaded_model = mlflow.pyfunc.load_model(local_model_path)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/databricks/python/lib/python3.11/site-packages/mlflow/tracing/provider.py", line 244, in wrapper
except MlflowTracingException as e:
File "/databricks/python/lib/python3.11/site-packages/mlflow/pyfunc/__init__.py", line 1046, in load_model
_clear_dependencies_schemas()
File "/databricks/python/lib/python3.11/site-packages/mlflow/spark/__init__.py", line 957, in _load_pyfunc
spark_model = _load_model(model_uri=path)
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/databricks/python/lib/python3.11/site-packages/mlflow/spark/__init__.py", line 838, in _load_model
return PipelineModel.load(model_uri)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/databricks/spark/python/pyspark/ml/util.py", line 465, in load
return cls.read().load(path)
^^^^^^^^^^^^^^^^^^^^^
File "/databricks/spark/python/pyspark/ml/pipeline.py", line 288, in load
uid, stages = PipelineSharedReadWrite.load(metadata, self.sc, path)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/databricks/spark/python/pyspark/ml/pipeline.py", line 442, in load
stage: "PipelineStage" = DefaultParamsReader.loadParamsInstance(stagePath, sc)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/databricks/spark/python/pyspark/ml/util.py", line 750, in loadParamsInstance
instance = py_type.load(path)
^^^^^^^^^^^^^^^^^^
File "/databricks/spark/python/pyspark/ml/util.py", line 465, in load
return cls.read().load(path)
^^^^^^^^^^^^^^^^^^^^^
File "/databricks/python/lib/python3.11/site-packages/xgboost/spark/core.py", line 1733, in load
_get_spark_session().sparkContext.textFile(model_load_path).collect()[0]
^^^^^^^^^^^^^^^^^^^^
File "/databricks/python/lib/python3.11/site-packages/xgboost/spark/utils.py", line 94, in _get_spark_session
raise RuntimeError(
RuntimeError: _get_spark_session should not be invoked from executor side.
any help would be much appreciated. I am trying to test out databricks featurestore clients functions (like fs.log_model, fs.batch_score, etc.), so i would like suggestions where these functions are used.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
11-14-2024 02:02 PM
The error you're encountering is due to attempting to access the Spark session on the executor side, which is not allowed in Spark's distributed computing model. This typically happens when trying to use Spark-specific functionality within a UDF or during model inference on executors.To resolve this issue and use Databricks Feature Store functions with your XGBoost model, you need to make some adjustments to your approach. Here are some suggestions:
- Use MLflow's pyfunc flavor instead of Spark flavor:
When logging the model, use mlflow.pyfunc
instead of mlflow.spark
. This will allow you to use the model for batch scoring without encountering Spark session issues on executors.
fs.log_model(
model=pipeline,
artifact_path="feature_store_experiment_model",
flavor=mlflow.pyfunc,
training_set=training_set,
registered_model_name="feature_store_experiment-model"
)
- Use
fs.score_batch()
for batch scoring:
Instead of loading the model and applying it directly, use the Feature Store's score_batch()
function, which is designed to handle this scenario:
batch_df = spark.table("dev_fs.xxx_featurestore_experiment.label")
predictions = fs.score_batch(
model_uri="models:/feature_store_experiment-model/latest",
df=batch_df.sample(fraction=0.001)
)
- Create the feature store table with predictions:
After getting the predictions, you can create or update the feature store table:
fs.create_table(
name="dev_fs.xxx_featurestore_experiment.predictions",
primary_keys=["xxxid", "calendardate"],
df=predictions
)
- Ensure consistent feature names:
Make sure that the feature names used during training match those in your feature lookups and batch scoring dataframe.
- Consider using Koalas or Pandas UDFs:
If you need to perform operations that require access to the Spark session on executors, consider using Koalas or Pandas UDFs, which are designed to work in a distributed environment.
- Optimize for large datasets:
If you're working with large datasets, consider using Spark's built-in ML library (MLlib) instead of XGBoost, as it's designed to work natively with Spark's distributed computing model.

