I am trying to run the SparkXGBoostRegressor and I am getting the following error:
class SparkClassificationModelManual(MLflowModelSignatureMixin, SparkSerializationMixin)
def __init__(self, inputCols, outputCol)
super().__init__()
self.inputCols = inputCols
self.outputCol = outputCol
self.featuresCol = "features"
self.scaledFeaturesCol = "scaledFeatures"
self.path = None # Compulsory
#self._model = LogisticRegression(featuresCol="scaledFeatures", labelCol=self.outputCol)
self._model = SparkXGBRegressor(features_col="scaledFeatures", label_col=self.outputCol)
self._scaler = StandardScaler(inputCol=self.featuresCol, outputCol=self.scaledFeaturesCol, withStd=True, withMean=False)
self.assembler = VectorAssembler(inputCols=self.inputCols, outputCol=self.featuresCol)
def fit(self, df: pyspark.sql.DataFrame) -> None:
# Combine feature columns into a single vector column
assembled_df = self.assembler.transform(df)
# Scale the features
self._scaler = self._scaler.fit(assembled_df)
scaled_df = self._scaler.transform(assembled_df)
# Fit the logistic regression model
self._model= self._model.fit(scaled_df)
def save(self, path)
self.path = str(Path(path).parent)
super().save(path)
def load(self, path)
self.path = str(Path(path).parent)
return super().load(path)
def predict(self, test_df)
# Assuming the model has been fitted and the same transformations are applied to test data
assembled_test_df = self.assembler.transform(test_df)
scaled_test_df = self._scaler.transform(assembled_test_df)
predictions = self._model.predict(scaled_test_df)
return predictions
if __name__== '__main__':
spark = SparkSession.builder.appName("ExampleApp").getOrCreate()
model = SparkClassificationModelManual(inputCols=["feature1", "feature2", "feature3"], outputCol="label")
mlflow_reg = MLflowModelRegistration(
model=model,
model_reg_name='spark_testing_preprocess',
model_reg_tags={"testing": "spark"}
)
data = spark.createDataFrame([
(0, 0.1, 0.3, 1),
(1, 0.2, 0.5, 0),
(0, 0.5, 0.8, 1),
(1, 0.3, 0.7, 0)
], ["feature1", "feature2", "feature3", "label"])
mlflow.end_run()
with mlflow_reg:
model.fit(data)
model = load_model(SparkClassificationModelManual, name="spark_testing_preprocess", version=2)
res= model.predict(data)
type(res)
res.show()