I am trying to create a custom transformer as a stage in my pipeline. A few of the transformations I am doing via SparkNLP and the next few using MLlib. To pass the result of SparkNLP transformation at a stage to the next MLlib transformation, I need to extract the spark_nlp_col.result column and pass it, and I am using a custom transformation stage for that.
After I fit my pipeline, I am able to persist it but when I am loading it back again, I am getting an error:
AttributeError: 'DummyMod' object has no attribute 'MyTransformer'
Here is my class:
from pyspark.ml import Transformer
from pyspark.ml.param.shared import Param,Params,TypeConverters
class MyTransformer(Transformer,DefaultParamsWritable,DefaultParamsReadable):
inputCol = Param(Params._dummy(), "inputCol", "",TypeConverters.toString)
outputCol = Param(Params._dummy(), "outputCol", "",TypeConverters.toString)
def __init__(self,inputCol=None,outputCol=None):
super(MyTransformer, self).__init__()
self._setDefault(inputCol=None)
self._set(inputCol = inputCol)
self._setDefault(outputCol=None)
self._set(outputCol = outputCol)
def getInputCol(self):
return self.getOrDefault(self.inputCol)
def setInputCol(self, inputCol):
self._set(inputCol=inputCol)
def getOutputCol(self):
return self.getOrDefault(self.outputCol)
def setOutputCol(self, outputCol):
self._set(outputCol=outputCol)
def _transform(self, dataset):
in_col = self.getInputCol()
out_col = self.getOutputCol()
final_in_col = in_col+".result"
result = dataset.withColumn(out_col, dataset[final_in_col])
return result
I have created a simple wrapper function over it for standardisation and then used it to create pipeline, fit and save it:
def extract_col(cols, in_suffix, out_suffix):
return [MyTransformer(inputCol=col+in_suff, outputCol=col+out_suffix) for col in cols]
'''
stages before custom transformer
'''
extractors = extract_col(cols, "_in", "_out")
'''
stages after custom transformer
'''
stages = s1 + s2 + .. + extractors + .. + sn-1 + sn
pipeline = Pipeline(stages = stages)
fit_pipeline = pipeline.fit(data)
fit_pipeline.write().overwrite().save(path_to_store_at)
How I am reading it back:
saved_pipeline = PipelinModel.load("path_where_stored")
And then I am encountering the error.
I have tried multiple ways of writing the custom class, using HasInputCol, HasOutputCol, etc, nothing working so far. Any idea on how I can resolve it?