โ02-15-2024 08:48 AM
I am looking for a way to log my `pyspark.ml.regression.LinearRegression` model with input and signature ata. The usual example that I found around are using sklearn and they can simply do
# Log the model with signature and input example
signature = infer_signature(X_train, pd.DataFrame(y_train))
input_example = X_train.head(3)
mlflow.sklearn.log_model(rf_model, "rf_model", signature=signature, input_example=input_example)
but it doesn't work for `LinearRegression` because the "feature" column should be Spark `VectorUDT ` and it is not support by mlflow. This is how I generate my feature column
cat_input_cols = ["gender","occupation","zip_code","age_category"]
cat_index_output_cols = [x + '_index' for x in cat_input_cols]
ohe_output_cols = [x + '_ohe' for x in cat_input_cols]
stringIndexer = StringIndexer(inputCols=cat_input_cols, outputCols=cat_index_output_cols, handleInvalid="error", stringOrderType="alphabetDesc")
ohe_encoder = OneHotEncoder(inputCols=cat_index_output_cols, outputCols=ohe_output_cols)
assembler = VectorAssembler(inputCols=features, outputCol="features")
pipeline = Pipeline(stages=[stringIndexer, ohe_encoder,assembler])
df_training_transformed = pipeline.fit(df_trainig_set).transform(df_trainig_set)
now when I build my model I like to use `infer_signature` but I'm not able to find the right way to do it.
with mlflow.start_run(run_name="content_based_linReg") as run:
# ---- set hyperparameters
lr = LinearRegression(featuresCol=COL_FEATURES, labelCol=COL_LABEL)
lr.setMaxIter(MAX_ITER)
lr.setRegParam(REG_PARAM)
lr.setElasticNetParam(ELASTIC_NET_PARAM)
lr.setFitIntercept(FIT_INTERCEPT)
# ---- Split data into training and validation sets
(df_training_sampled, df_validation_sampled) =
df_content.randomSplit([0.6,0.4],SEEDS)
#Create the model.
lr_model = lr.fit(df_training_sampled)
# Log the model parameters used for this run.
mlflow.log_param("MAX_ITER", MAX_ITER)
mlflow.log_param("REG_PARAM", REG_PARAM)
mlflow.log_param("ELASTIC_NET_PARAM", ELASTIC_NET_PARAM)
mlflow.log_param("FIT_INTERCEPT", FIT_INTERCEPT)
# Run the model to create a prediction. Predict against the validation_df.
df_validation_predictions = lr_model.transform(df_validation_sampled)
# -->> this won't work
# Log the model with signature and input example
signature = infer_signature(df_training_sampled["features"], df_validation_predictions["prediction"])
input_example = df_training_sampled.select(col("features"),col("rating")).head(3)
mlflow.spark.log_model(lr_model, "LinearRegression",sample_input=input_example,signature=signature)
here is the error I get when I call the `infer_signature`
Exception: Unsupported Spark Type '<class 'pyspark.ml.linalg.VectorUDT'>', MLflow schema is only supported for scalar Spark types.
any idea how should I go about it?
โ02-16-2024 01:24 AM - edited โ02-16-2024 01:26 AM
Hi @MohsenJ, The error youโre encountering infer_signature
is due to the fact that it doesnโt directly support Sparkโs VectorUDT
type. However, we can work around this limitation.
Letโs create a custom signature for your LinearRegression
model using the input features from your transformed DataFrame.
Hereโs how you can proceed:
Create a Custom Signature:
VectorUDT
column.df_training_transformed
is your transformed DataFrame, weโll extract the relevant columns for the signature:from mlflow.models.signature import ModelSignature, Schema
# Extract input features (excluding the VectorUDT column)
input_columns = [col for col in df_training_transformed.columns if col != "features"]
# Create a signature with input features
signature = ModelSignature(inputs=Schema(input_columns))
Log the Model with the Custom Signature:
LinearRegression
model, log it using MLflow."features"
(which is the default for LinearRegression
).setFeaturesCol
.import mlflow
from pyspark.ml.regression import LinearRegression
# ... (your existing code)
# Create and fit the Linear Regression model
lr = LinearRegression(featuresCol="features", labelCol=COL_LABEL)
lr_model = lr.fit(df_training_sampled)
# Log the model with the custom signature
mlflow.sklearn.log_model(lr_model, "linear_regression_model", signature=signature)
Start an MLflow Run:
mlflow.start_run()
to start an MLflow run.Log Model Parameters:
mlflow.log_param()
to log any hyperparameters or other relevant parameters.If you encounter any further issues or need additional assistance, feel free to ask! ๐
โ02-16-2024 02:43 AM
thanks @Kaniz_Fatma
three clarification questions:
1. wouldn't this cause issues when I load the model for inference? because in this case the model signature is different from the input?
2. I also need to log the signature of prediction output. should I just do
df_predictions = lr_model.transform(df_validation_sampled)
signature = ModelSignature(inputs=Schema(input_columns), output=Schema(df_predictions[["prediction]]))
3. to pass the input_sample, should I also just pass the rows of my dataset before transformation?
โ02-16-2024 05:09 AM
and one more question. you use mlflow.sklearn instead of mlflow.spark. why is that?
โ05-14-2024 12:09 AM
Hey, at my team we are faced with the same problem.
Given that MLflow usage is part of the "Scalable Machine Learning with Apache Spark" course I got the impression that Spark ML models would work well with MLflow and Databricks model logging, but it's clearly not the case. I went back to the course material and, sure enough, when explaining the model registry it switches to using sklearn models. I gotta say, I feel a bit betrayed...
The solution given by @Kaniz_Fatma partially works but it is a hack and, as @MohsenJ said, it doesn't really cover the signature of the output. Is there a proper/more complete solution?
โ05-21-2024 07:54 AM
Hi @Kaniz_Fatma, @MohsenJ, did we get this working? I am facing a similar issue when trying to migrate one of the registered model to unity catalog. The infer signature from Mlflow doesn't seem to work as the input spark data frame contains features from VectorAssember that is recognised as type VectorUDT().
mlflow.pyspark.ml: Model inputs contain unsupported Spark data types: [StructField('prd_dt', DateType(), False), StructField('features', VectorUDT(), True), StructField('features_scaled', VectorUDT(), True)]. Model signature is not logged.
mlflow.data.spark_dataset: Failed to infer schema for Spark dataset. Exception: Unsupported Spark Type '<class 'pyspark.ml.linalg.VectorUDT'>' for MLflow schema.
Have we found a proper solution for MLFlow to support datatypes other than scalar?
Would appreciate any inputs.
โ05-21-2024 07:56 AM
@Abi105 I wasn't able to make it work, sorry
โ05-22-2024 05:28 AM
me neither. But the mlflow documentation suggests the new version of mlflow should be able to handle Array and Objects (dict). maybe that could help? I haven't tired it myself.
Support for Array and Object types was introduced in MLflow version 2.10.0. These types will not be recognized in previous versions of MLflow. If you are saving a model that uses these signature types, you should ensure that any other environment that attempts to load these models has a version of MLflow installed that is at least 2.10.0.
3 weeks ago - last edited 3 weeks ago
@MohsenJ @javierbg @Abi105 I have found a solution to this issue as I was trying to deploy Spark ML Models to Unity Catalog. Please view my blog and let me know if it helps solve your issues! https://medium.com/p/7d04e8539540
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโt want to miss the chance to attend and share knowledge.
If there isnโt a group near you, start one and help create a community that brings people together.
Request a New Group