Hi
I have created a model and pipeline using xgboost.spark's sparkXGBregressor and pyspark.ml's Pipeline instance. However, i run into a "RuntimeError: _get_spark_session should not be invoked from executor side." when i try to save the predictions i generate with the model pipeline to a featurestore table. Code:
import mlflow
from mlflow.tracking import MlflowClient
from datetime import datetime
from pyspark.sql.functions import col, struct, lit
from databricks.feature_engineering.entities.feature_lookup import FeatureLookup
from databricks.feature_engineering import FeatureEngineeringClient
fs = FeatureEngineeringClient()
def create_training_data(training_days):
return (
fs.read_table(name='dev_fs.xxx_featurestore_experiment.label')
.where(col("calendardate").isin(training_days))
)
training_data = create_training_data(training_days)
# Create or get existing feature table
aacc_features_lookup = [
FeatureLookup(
table_name='dev_fs.xxx_featurestore_experiment.xxx',
lookup_key=['xxxid'],
timestamp_lookup_key=['calendardate'],
feature_names=['xxx4', 'xxx5', 'xxx6']
)
]
# Create or get existing feature table
apcp_features_lookup = [
FeatureLookup(
table_name='dev_fs.xxx_featurestore_experiment.xxx',
lookup_key=['xxxid'],
timestamp_lookup_key=['calendardate'],
feature_names=['xxx1', 'xxx2', 'xxx3']
)
]
feature_lookups = aacc_features_lookup + apcp_features_lookup
# ---------------------
# the model pipeline
from pyspark.ml.feature import VectorAssembler, Imputer
from pyspark.ml import Pipeline
from xgboost.spark import SparkXGBRegressor
from typing import List
def create_new_xgb_model(features: List[str], **kwargs) -> Pipeline:
"""Create a new untrained XGBoost model using PySpark's built-in Imputer"""
# Use PySpark's built-in Imputer
imputer = Imputer(
inputCols=features,
outputCols=[f"{col}_imputed" for col in features]
)
# Assemble the features into a single vector column
assembler = VectorAssembler(
inputCols=[f"{col}_imputed" for col in features],
outputCol="features"
)
# Define the Spark XGBoost Regressor
xgb_classifier = SparkXGBRegressor(
enable_sparse_data_optim=True,
features_col="features",
label_col="age_truncated",
prediction_col="prediction",
num_workers=4,
missing=0.0,
**kwargs
)
# Create and return the pipeline
return Pipeline(stages=[imputer, assembler, xgb_classifier])
# ---------------------
# mlflow experiment
# Start MLflow run with feature store
with mlflow.start_run(run_name='feature_store_model') as run:
# Create training set with features from Feature Store
training_set = fs.create_training_set(
df=training_data,
feature_lookups=feature_lookups,
label='age_truncated'
)
# Get training data as pandas DataFrame
training_data = training_set.load_df()
# Train model
model = create_new_xgb_model(features=list_features, **optim_params)
pipeline = model.fit(training_data)
# Log model with feature store
fs.log_model(
model=pipeline,
artifact_path="feature_store_experiment_model",
flavor=mlflow.spark,
training_set=training_set,
registered_model_name="feature_store_experiment-model"
)
# ----------------------
# batch scoring
batch_df = spark.table("dev_fs.xxx_featurestore_experiment.label")
predictions = fs.score_batch(model_uri=model_uri, df=batch_df.sample(fraction=0.001))
fs.create_table(
name="dev_fs.xxx_featurestore_experiment.predictions",
primary_keys=["xxxid","calendardate"], df=predictions
)
the full error:
PythonException:
An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
File "/databricks/python/lib/python3.11/site-packages/mlflow/pyfunc/__init__.py", line 2109, in udf
loaded_model = mlflow.pyfunc.load_model(local_model_path)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/databricks/python/lib/python3.11/site-packages/mlflow/tracing/provider.py", line 244, in wrapper
except MlflowTracingException as e:
File "/databricks/python/lib/python3.11/site-packages/mlflow/pyfunc/__init__.py", line 1046, in load_model
_clear_dependencies_schemas()
File "/databricks/python/lib/python3.11/site-packages/mlflow/spark/__init__.py", line 957, in _load_pyfunc
spark_model = _load_model(model_uri=path)
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/databricks/python/lib/python3.11/site-packages/mlflow/spark/__init__.py", line 838, in _load_model
return PipelineModel.load(model_uri)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/databricks/spark/python/pyspark/ml/util.py", line 465, in load
return cls.read().load(path)
^^^^^^^^^^^^^^^^^^^^^
File "/databricks/spark/python/pyspark/ml/pipeline.py", line 288, in load
uid, stages = PipelineSharedReadWrite.load(metadata, self.sc, path)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/databricks/spark/python/pyspark/ml/pipeline.py", line 442, in load
stage: "PipelineStage" = DefaultParamsReader.loadParamsInstance(stagePath, sc)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/databricks/spark/python/pyspark/ml/util.py", line 750, in loadParamsInstance
instance = py_type.load(path)
^^^^^^^^^^^^^^^^^^
File "/databricks/spark/python/pyspark/ml/util.py", line 465, in load
return cls.read().load(path)
^^^^^^^^^^^^^^^^^^^^^
File "/databricks/python/lib/python3.11/site-packages/xgboost/spark/core.py", line 1733, in load
_get_spark_session().sparkContext.textFile(model_load_path).collect()[0]
^^^^^^^^^^^^^^^^^^^^
File "/databricks/python/lib/python3.11/site-packages/xgboost/spark/utils.py", line 94, in _get_spark_session
raise RuntimeError(
RuntimeError: _get_spark_session should not be invoked from executor side.
any help would be much appreciated. I am trying to test out databricks featurestore clients functions (like fs.log_model, fs.batch_score, etc.), so i would like suggestions where these functions are used.