Pyspark custom Transformer class -AttributeError: 'DummyMod' object has no attribute 'MyTransformer'

simranisanewbie
New Contributor II

I am trying to create a custom transformer as a stage in my pipeline. A few of the transformations I am doing via SparkNLP and the next few using MLlib. To pass the result of SparkNLP transformation at a stage to the next MLlib transformation, I need to extract the spark_nlp_col.result column and pass it, and I am using a custom transformation stage for that.
After I fit my pipeline, I am able to persist it but when I am loading it back again, I am getting an error:

AttributeError: 'DummyMod' object has no attribute 'MyTransformer'

Here is my class:

 

from pyspark.ml import Transformer
from pyspark.ml.param.shared import Param,Params,TypeConverters

class MyTransformer(Transformer,DefaultParamsWritable,DefaultParamsReadable):
    inputCol = Param(Params._dummy(), "inputCol", "",TypeConverters.toString)
    outputCol = Param(Params._dummy(), "outputCol", "",TypeConverters.toString)

    def __init__(self,inputCol=None,outputCol=None):
        super(MyTransformer, self).__init__()
        self._setDefault(inputCol=None)
        self._set(inputCol = inputCol)
        self._setDefault(outputCol=None)
        self._set(outputCol = outputCol)

    def getInputCol(self):
        return self.getOrDefault(self.inputCol)

    def setInputCol(self, inputCol):
        self._set(inputCol=inputCol)

    def getOutputCol(self):
        return self.getOrDefault(self.outputCol)

    def setOutputCol(self, outputCol):
        self._set(outputCol=outputCol)

    def _transform(self, dataset):
        in_col = self.getInputCol()
        out_col = self.getOutputCol()

        final_in_col = in_col+".result"
        result = dataset.withColumn(out_col, dataset[final_in_col])
        return result

 

I have created a simple wrapper function over it for standardisation and then used it to create pipeline, fit and save it:

 

def extract_col(cols, in_suffix, out_suffix):
   return [MyTransformer(inputCol=col+in_suff, outputCol=col+out_suffix) for col in cols]
'''
stages before custom transformer
'''
extractors = extract_col(cols, "_in", "_out")
'''
stages after custom transformer
'''

stages = s1 + s2 + .. + extractors + .. + sn-1 + sn
pipeline = Pipeline(stages = stages)
fit_pipeline = pipeline.fit(data)
fit_pipeline.write().overwrite().save(path_to_store_at)

 

How I am reading it back:

 

saved_pipeline = PipelinModel.load("path_where_stored")

 

And then I am encountering the error.
I have tried multiple ways of writing the custom class, using HasInputCol, HasOutputCol, etc, nothing working so far. Any idea on how I can resolve it?