cancel
Showing results for 
Search instead for 
Did you mean: 
Machine Learning
Dive into the world of machine learning on the Databricks platform. Explore discussions on algorithms, model training, deployment, and more. Connect with ML enthusiasts and experts.
cancel
Showing results for 
Search instead for 
Did you mean: 

spark_session invocation from executor side error, when using sparkXGBregressor and fe client

NielsMH
New Contributor III

Hi 

I have created a model and pipeline using xgboost.spark's sparkXGBregressor and pyspark.ml's Pipeline instance. However, i run into a "RuntimeError: _get_spark_session should not be invoked from executor side." when i try to save the predictions i generate with the model pipeline to a featurestore table. Code:

import mlflow
from mlflow.tracking import MlflowClient
from datetime import datetime
from pyspark.sql.functions import col, struct, lit
from databricks.feature_engineering.entities.feature_lookup import FeatureLookup

from databricks.feature_engineering import FeatureEngineeringClient
fs = FeatureEngineeringClient()

def create_training_data(training_days):
    return (
    fs.read_table(name='dev_fs.xxx_featurestore_experiment.label')
    .where(col("calendardate").isin(training_days))
    )

training_data = create_training_data(training_days)

# Create or get existing feature table
aacc_features_lookup = [
        FeatureLookup(
            table_name='dev_fs.xxx_featurestore_experiment.xxx',
            lookup_key=['xxxid'],
            timestamp_lookup_key=['calendardate'],
            feature_names=['xxx4', 'xxx5', 'xxx6']
        )
    ]

# Create or get existing feature table
apcp_features_lookup = [
        FeatureLookup(
            table_name='dev_fs.xxx_featurestore_experiment.xxx',
            lookup_key=['xxxid'],
            timestamp_lookup_key=['calendardate'],
            feature_names=['xxx1', 'xxx2', 'xxx3']
        )
    ]

feature_lookups = aacc_features_lookup + apcp_features_lookup

# ---------------------
# the model pipeline

from pyspark.ml.feature import VectorAssembler, Imputer
from pyspark.ml import Pipeline
from xgboost.spark import SparkXGBRegressor
from typing import List

def create_new_xgb_model(features: List[str], **kwargs) -> Pipeline:
    """Create a new untrained XGBoost model using PySpark's built-in Imputer"""

    # Use PySpark's built-in Imputer
    imputer = Imputer(
        inputCols=features,
        outputCols=[f"{col}_imputed" for col in features]
    )

    # Assemble the features into a single vector column
    assembler = VectorAssembler(
        inputCols=[f"{col}_imputed" for col in features],
        outputCol="features"
    )

    # Define the Spark XGBoost Regressor
    xgb_classifier = SparkXGBRegressor(
        enable_sparse_data_optim=True,
        features_col="features",
        label_col="age_truncated",
        prediction_col="prediction",
        num_workers=4,
        missing=0.0,
        **kwargs
    )

    # Create and return the pipeline
    return Pipeline(stages=[imputer, assembler, xgb_classifier])

# ---------------------
# mlflow experiment

# Start MLflow run with feature store
with mlflow.start_run(run_name='feature_store_model') as run:
        # Create training set with features from Feature Store
    training_set = fs.create_training_set(
            df=training_data,
            feature_lookups=feature_lookups,
            label='age_truncated'
        )
        
    # Get training data as pandas DataFrame
    training_data = training_set.load_df()
        
    # Train model
    model = create_new_xgb_model(features=list_features, **optim_params)
    pipeline = model.fit(training_data)

    # Log model with feature store
    fs.log_model(
            model=pipeline,
            artifact_path="feature_store_experiment_model",
            flavor=mlflow.spark,
            training_set=training_set,
            registered_model_name="feature_store_experiment-model"
        )

# ----------------------
# batch scoring

batch_df = spark.table("dev_fs.xxx_featurestore_experiment.label")
predictions = fs.score_batch(model_uri=model_uri, df=batch_df.sample(fraction=0.001))
fs.create_table(
    name="dev_fs.xxx_featurestore_experiment.predictions", 
    primary_keys=["xxxid","calendardate"], df=predictions
    )

the full error:

PythonException: 
  An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
  File "/databricks/python/lib/python3.11/site-packages/mlflow/pyfunc/__init__.py", line 2109, in udf
    loaded_model = mlflow.pyfunc.load_model(local_model_path)
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/databricks/python/lib/python3.11/site-packages/mlflow/tracing/provider.py", line 244, in wrapper
    except MlflowTracingException as e:
  File "/databricks/python/lib/python3.11/site-packages/mlflow/pyfunc/__init__.py", line 1046, in load_model
    _clear_dependencies_schemas()
  File "/databricks/python/lib/python3.11/site-packages/mlflow/spark/__init__.py", line 957, in _load_pyfunc
    spark_model = _load_model(model_uri=path)
                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/databricks/python/lib/python3.11/site-packages/mlflow/spark/__init__.py", line 838, in _load_model
    return PipelineModel.load(model_uri)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/databricks/spark/python/pyspark/ml/util.py", line 465, in load
    return cls.read().load(path)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/databricks/spark/python/pyspark/ml/pipeline.py", line 288, in load
    uid, stages = PipelineSharedReadWrite.load(metadata, self.sc, path)
                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/databricks/spark/python/pyspark/ml/pipeline.py", line 442, in load
    stage: "PipelineStage" = DefaultParamsReader.loadParamsInstance(stagePath, sc)
                             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/databricks/spark/python/pyspark/ml/util.py", line 750, in loadParamsInstance
    instance = py_type.load(path)
               ^^^^^^^^^^^^^^^^^^
  File "/databricks/spark/python/pyspark/ml/util.py", line 465, in load
    return cls.read().load(path)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/databricks/python/lib/python3.11/site-packages/xgboost/spark/core.py", line 1733, in load
    _get_spark_session().sparkContext.textFile(model_load_path).collect()[0]
    ^^^^^^^^^^^^^^^^^^^^
  File "/databricks/python/lib/python3.11/site-packages/xgboost/spark/utils.py", line 94, in _get_spark_session
    raise RuntimeError(
RuntimeError: _get_spark_session should not be invoked from executor side.

any help would be much appreciated. I am trying to test out databricks featurestore clients functions (like fs.log_model, fs.batch_score, etc.), so i would like suggestions where these functions are used. 

0 REPLIES 0

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group