Hi everyone,
I'm building a Pyspark ML Pipeline where the first stage is to fill nulls with zero. I wrote a custom class to do this since I cannot find a Transformer that will do this imputation.
I am able to log this pipeline using ML Flow log model method and load it for scoring but when I log it with the Feature Engineering package, the score batch method throws an error saying that the custom class does not exist. I need to log it via the Feature Engineering package so I can properly leverage featurestores and the lineage in unity catalog. Is anyone able to help? The sample pipeline code is below. inputs are loaded using feature lookups and the "create training set" method
All assistance is appreciated!
from pyspark.ml import Transformer
from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable
from pyspark.ml.regression import RandomForestRegressor
class FillNA(Transformer, DefaultParamsReadable, DefaultParamsWritable, mlflow.pyfunc.PythonModel๐
def __init__(self, fill_value=0, inputCols=None๐
super(FillNA, self).__init__()
self.fill_value = fill_value
self.inputCols = inputCols
def _transform(self, df๐
return df.fillna(self.fill_value, subset=self.inputCols)
def predict(self, context, model_input, params=None๐
return self._transform(model_input)
fill_na = FillNA(fill_value=0.0, inputCols=cols_to_fill)
rfr = RandomForestRegressor()
pipeline = Pipeline(stages=[fill_na,rfr])