cancel
Showing results for 
Search instead for 
Did you mean: 
Machine Learning
Dive into the world of machine learning on the Databricks platform. Explore discussions on algorithms, model training, deployment, and more. Connect with ML enthusiasts and experts.
cancel
Showing results for 
Search instead for 
Did you mean: 

spark_session invocation from executor side error, when using sparkXGBregressor and fe client

NielsMH
New Contributor III

Hi 

I have created a model and pipeline using xgboost.spark's sparkXGBregressor and pyspark.ml's Pipeline instance. However, i run into a "RuntimeError: _get_spark_session should not be invoked from executor side." when i try to save the predictions i generate with the model pipeline to a featurestore table. Code:

import mlflow
from mlflow.tracking import MlflowClient
from datetime import datetime
from pyspark.sql.functions import col, struct, lit
from databricks.feature_engineering.entities.feature_lookup import FeatureLookup

from databricks.feature_engineering import FeatureEngineeringClient
fs = FeatureEngineeringClient()

def create_training_data(training_days):
    return (
    fs.read_table(name='dev_fs.xxx_featurestore_experiment.label')
    .where(col("calendardate").isin(training_days))
    )

training_data = create_training_data(training_days)

# Create or get existing feature table
aacc_features_lookup = [
        FeatureLookup(
            table_name='dev_fs.xxx_featurestore_experiment.xxx',
            lookup_key=['xxxid'],
            timestamp_lookup_key=['calendardate'],
            feature_names=['xxx4', 'xxx5', 'xxx6']
        )
    ]

# Create or get existing feature table
apcp_features_lookup = [
        FeatureLookup(
            table_name='dev_fs.xxx_featurestore_experiment.xxx',
            lookup_key=['xxxid'],
            timestamp_lookup_key=['calendardate'],
            feature_names=['xxx1', 'xxx2', 'xxx3']
        )
    ]

feature_lookups = aacc_features_lookup + apcp_features_lookup

# ---------------------
# the model pipeline

from pyspark.ml.feature import VectorAssembler, Imputer
from pyspark.ml import Pipeline
from xgboost.spark import SparkXGBRegressor
from typing import List

def create_new_xgb_model(features: List[str], **kwargs) -> Pipeline:
    """Create a new untrained XGBoost model using PySpark's built-in Imputer"""

    # Use PySpark's built-in Imputer
    imputer = Imputer(
        inputCols=features,
        outputCols=[f"{col}_imputed" for col in features]
    )

    # Assemble the features into a single vector column
    assembler = VectorAssembler(
        inputCols=[f"{col}_imputed" for col in features],
        outputCol="features"
    )

    # Define the Spark XGBoost Regressor
    xgb_classifier = SparkXGBRegressor(
        enable_sparse_data_optim=True,
        features_col="features",
        label_col="age_truncated",
        prediction_col="prediction",
        num_workers=4,
        missing=0.0,
        **kwargs
    )

    # Create and return the pipeline
    return Pipeline(stages=[imputer, assembler, xgb_classifier])

# ---------------------
# mlflow experiment

# Start MLflow run with feature store
with mlflow.start_run(run_name='feature_store_model') as run:
        # Create training set with features from Feature Store
    training_set = fs.create_training_set(
            df=training_data,
            feature_lookups=feature_lookups,
            label='age_truncated'
        )
        
    # Get training data as pandas DataFrame
    training_data = training_set.load_df()
        
    # Train model
    model = create_new_xgb_model(features=list_features, **optim_params)
    pipeline = model.fit(training_data)

    # Log model with feature store
    fs.log_model(
            model=pipeline,
            artifact_path="feature_store_experiment_model",
            flavor=mlflow.spark,
            training_set=training_set,
            registered_model_name="feature_store_experiment-model"
        )

# ----------------------
# batch scoring

batch_df = spark.table("dev_fs.xxx_featurestore_experiment.label")
predictions = fs.score_batch(model_uri=model_uri, df=batch_df.sample(fraction=0.001))
fs.create_table(
    name="dev_fs.xxx_featurestore_experiment.predictions", 
    primary_keys=["xxxid","calendardate"], df=predictions
    )

the full error:

PythonException: 
  An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
  File "/databricks/python/lib/python3.11/site-packages/mlflow/pyfunc/__init__.py", line 2109, in udf
    loaded_model = mlflow.pyfunc.load_model(local_model_path)
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/databricks/python/lib/python3.11/site-packages/mlflow/tracing/provider.py", line 244, in wrapper
    except MlflowTracingException as e:
  File "/databricks/python/lib/python3.11/site-packages/mlflow/pyfunc/__init__.py", line 1046, in load_model
    _clear_dependencies_schemas()
  File "/databricks/python/lib/python3.11/site-packages/mlflow/spark/__init__.py", line 957, in _load_pyfunc
    spark_model = _load_model(model_uri=path)
                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/databricks/python/lib/python3.11/site-packages/mlflow/spark/__init__.py", line 838, in _load_model
    return PipelineModel.load(model_uri)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/databricks/spark/python/pyspark/ml/util.py", line 465, in load
    return cls.read().load(path)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/databricks/spark/python/pyspark/ml/pipeline.py", line 288, in load
    uid, stages = PipelineSharedReadWrite.load(metadata, self.sc, path)
                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/databricks/spark/python/pyspark/ml/pipeline.py", line 442, in load
    stage: "PipelineStage" = DefaultParamsReader.loadParamsInstance(stagePath, sc)
                             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/databricks/spark/python/pyspark/ml/util.py", line 750, in loadParamsInstance
    instance = py_type.load(path)
               ^^^^^^^^^^^^^^^^^^
  File "/databricks/spark/python/pyspark/ml/util.py", line 465, in load
    return cls.read().load(path)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/databricks/python/lib/python3.11/site-packages/xgboost/spark/core.py", line 1733, in load
    _get_spark_session().sparkContext.textFile(model_load_path).collect()[0]
    ^^^^^^^^^^^^^^^^^^^^
  File "/databricks/python/lib/python3.11/site-packages/xgboost/spark/utils.py", line 94, in _get_spark_session
    raise RuntimeError(
RuntimeError: _get_spark_session should not be invoked from executor side.

any help would be much appreciated. I am trying to test out databricks featurestore clients functions (like fs.log_model, fs.batch_score, etc.), so i would like suggestions where these functions are used. 

1 REPLY 1

Walter_C
Databricks Employee
Databricks Employee

The error you're encountering is due to attempting to access the Spark session on the executor side, which is not allowed in Spark's distributed computing model. This typically happens when trying to use Spark-specific functionality within a UDF or during model inference on executors.To resolve this issue and use Databricks Feature Store functions with your XGBoost model, you need to make some adjustments to your approach. Here are some suggestions:

  1. Use MLflow's pyfunc flavor instead of Spark flavor:

When logging the model, use mlflow.pyfunc instead of mlflow.spark. This will allow you to use the model for batch scoring without encountering Spark session issues on executors.

python
fs.log_model( model=pipeline, artifact_path="feature_store_experiment_model", flavor=mlflow.pyfunc, training_set=training_set, registered_model_name="feature_store_experiment-model" )
  1. Use fs.score_batch() for batch scoring:

Instead of loading the model and applying it directly, use the Feature Store's score_batch() function, which is designed to handle this scenario:

python
batch_df = spark.table("dev_fs.xxx_featurestore_experiment.label") predictions = fs.score_batch( model_uri="models:/feature_store_experiment-model/latest", df=batch_df.sample(fraction=0.001) )
  1. Create the feature store table with predictions:

After getting the predictions, you can create or update the feature store table:

python
fs.create_table( name="dev_fs.xxx_featurestore_experiment.predictions", primary_keys=["xxxid", "calendardate"], df=predictions )
  1. Ensure consistent feature names:

Make sure that the feature names used during training match those in your feature lookups and batch scoring dataframe.

  1. Consider using Koalas or Pandas UDFs:

If you need to perform operations that require access to the Spark session on executors, consider using Koalas or Pandas UDFs, which are designed to work in a distributed environment.

  1. Optimize for large datasets:

If you're working with large datasets, consider using Spark's built-in ML library (MLlib) instead of XGBoost, as it's designed to work natively with Spark's distributed computing model.

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group