cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

how to log the KerasClassifier model in a sklearn pipeline in mlflow?

MGH1
New Contributor III

I have a set of pre-processing stages in a sklearn `Pipeline` and an estimator which is a `KerasClassifier` (`from tensorflow.keras.wrappers.scikit_learn import KerasClassifier`).

My overall goal is to tune and log the whole sklearn pipeline in `mlflow` (in databricks even). I get a confusing type error which I can't figure out how to reslove:

> TypeError: can't pickle _thread.RLock objects

I have the following code (without tuning stage) which returns the above error:

conda_env = _mlflow_conda_env(
   additional_conda_deps=None,
   additional_pip_deps=[
     "cloudpickle=={}".format(cloudpickle.__version__),
     "scikit-learn=={}".format(sklearn.__version__),
     "numpy=={}".format(np.__version__),
     "tensorflow=={}".format(tf.__version__),
   ],
   additional_conda_channels=None,
 )
 
 
 
search_space = {
   "estimator__dense_l1": 20,
   "estimator__dense_l2": 20,
   "estimator__learning_rate": 0.1,
   "estimator__optimizer": "Adam",
 }
 
  
def create_model(n):
 
   model = Sequential()
   model.add(Dense(int(n["estimator__dense_l1"]), activation="relu"))
   model.add(Dense(int(n["estimator__dense_l2"]), activation="relu"))
   model.add(Dense(1, activation="sigmoid"))
   model.compile(
     loss="binary_crossentropy",
     optimizer=n["estimator__optimizer"],
     metrics=["accuracy"],
   )
  
  return model
 
 
 
 
 
mlflow.sklearn.autolog()
 
with mlflow.start_run(nested=True) as run:
 
  classfier = KerasClassifier(build_fn=create_model, n=search_space)
   # fit the pipeline
   clf = Pipeline(steps=[("preprocessor", preprocessor), 
 
             ("estimator", classfier)])
   h = clf.fit(
     X_train,
     y_train.values,
     estimator__validation_split=0.2,
     estimator__epochs=10,
     estimator__verbose=2,
   )
 
 
  # log scores
   acc_score = clf.score(X=X_test, y=y_test)
   mlflow.log_metric("accuracy", acc_score)
 
  signature = infer_signature(X_test, clf.predict(X_test))
   # Log the model with a signature that defines the schema of the model's inputs and outputs.
   mlflow.sklearn.log_model(
     sk_model=clf, artifact_path="model", 
     signature=signature, 
     conda_env=conda_env
   )

I also get this warning before the error:

```

  WARNING mlflow.sklearn.utils: Truncated the value of the key `steps`. Truncated value: `[('preprocessor', ColumnTransformer(n_jobs=None, remainder='drop', sparse_threshold=0.3,

           transformer_weights=None,

           transformers=[('num',

                  Pipeline(memory=None,

```

note the whole pipeline runs outside mlflow.

can someone help?

1 ACCEPTED SOLUTION

Accepted Solutions

MGH1
New Contributor III

I think I find sort of a workaround, but I think this issue needs to be addressed anyways.

What I did is not the best way.

I used a python package called scikeras that does this wrapping and then could log the model

The code:

import scikeras 
import tensorflow as tf 
from tensorflow.keras.models import Sequential 
from tensorflow.keras.layers import Input, Dense, Dropout, LSTM, Flatten, Activation 
 
from scikeras.wrappers import KerasClassifier 
  
 
class ModelWrapper(mlflow.pyfunc.PythonModel): 
    def __init__(self, model): 
        self.model = model 
 
    def predict(self, context, model_input): 
        return self.model.predict(model_input) 
 
conda_env =  _mlflow_conda_env( 
      additional_conda_deps=None, 
      additional_pip_deps=[ 
        "cloudpickle=={}".format(cloudpickle.__version__),  
        "scikit-learn=={}".format(sklearn.__version__), 
        "numpy=={}".format(np.__version__), 
        "tensorflow=={}".format(tf.__version__), 
        "scikeras=={}".format(scikeras.__version__), 
      ], 
      additional_conda_channels=None, 
  ) 
 
param = { 
   "dense_l1": 20, 
   "dense_l2": 20, 
   "optimizer__learning_rate": 0.1, 
   "optimizer": "Adam", 
   "loss":"binary_crossentropy", 
} 
 
  
def create_model(dense_l1, dense_l2, meta): 
  
  n_features_in_ = meta["n_features_in_"] 
  X_shape_ = meta["X_shape_"] 
  n_classes_ = meta["n_classes_"] 
 
  model = Sequential() 
  model.add(Dense(n_features_in_, input_shape=X_shape_[1:], activation="relu")) 
  model.add(Dense(dense_l1, activation="relu")) 
  model.add(Dense(dense_l2, activation="relu")) 
  model.add(Dense(1, activation="sigmoid")) 
 
  return model   
 
mlflow.sklearn.autolog() 
with mlflow.start_run(run_name="sample_run"): 
 
  classfier = KerasClassifier( 
    create_model, 
    loss=param["loss"], 
    dense_l1=param["dense_l1"], 
    dense_l2=param["dense_l2"], 
    optimizer__learning_rate = param["optimizer__learning_rate"], 
    optimizer= param["optimizer"], 
) 
 
  # fit the pipeline 
  clf = Pipeline(steps=[('preprocessor', preprocessor), 
                      ('estimator', classfier)])   
 
  h = clf.fit(X_train, y_train.values) 
  # log scores 
  acc_score = clf.score(X=X_test, y=y_test) 
  mlflow.log_metric("accuracy", acc_score) 
  signature = infer_signature(X_test, clf.predict(X_test)) 
  model_nn = ModelWrapper(clf,)  
 
  mlflow.pyfunc.log_model( 
      python_model= model_nn, 
      artifact_path = "model",  
      signature = signature,  
      conda_env = conda_env 
  ) 
 
  
 
 

View solution in original post

8 REPLIES 8

MGH1
New Contributor III

no one?

Anonymous
Not applicable

@Kaniz Fatma​ - Can you jump in here?

Kaniz
Community Manager
Community Manager

Sure @Piper 

Kaniz
Community Manager
Community Manager

Hi @MGH ! My name is Kaniz, and I'm the technical moderator here. Great to meet you, and thanks for your question! Let's see if your peers on the community have an answer to your question first. Or else I will follow up shortly with a response.

MGH1
New Contributor III

Thanks @Kaniz Fatma​ !

Just to clarify I have no issue logging a sklearn model and pipeline, for example if I replace this part of the above code from:

   clf = Pipeline(steps=[("preprocessor", preprocessor), 
                                                ("estimator", classfier)])

to:

   clf = Pipeline(steps=[("preprocessor", preprocessor), 
 
             ("estimator", RandomForestClassifier())])

it works without issue.

The problem is when you wrap a Keras model .

Kaniz
Community Manager
Community Manager

Hey there @MGH!

I'm glad I can help you 😊.

Please be patient , I'll get back to you very soon with the response.

MGH1
New Contributor III

I think I find sort of a workaround, but I think this issue needs to be addressed anyways.

What I did is not the best way.

I used a python package called scikeras that does this wrapping and then could log the model

The code:

import scikeras 
import tensorflow as tf 
from tensorflow.keras.models import Sequential 
from tensorflow.keras.layers import Input, Dense, Dropout, LSTM, Flatten, Activation 
 
from scikeras.wrappers import KerasClassifier 
  
 
class ModelWrapper(mlflow.pyfunc.PythonModel): 
    def __init__(self, model): 
        self.model = model 
 
    def predict(self, context, model_input): 
        return self.model.predict(model_input) 
 
conda_env =  _mlflow_conda_env( 
      additional_conda_deps=None, 
      additional_pip_deps=[ 
        "cloudpickle=={}".format(cloudpickle.__version__),  
        "scikit-learn=={}".format(sklearn.__version__), 
        "numpy=={}".format(np.__version__), 
        "tensorflow=={}".format(tf.__version__), 
        "scikeras=={}".format(scikeras.__version__), 
      ], 
      additional_conda_channels=None, 
  ) 
 
param = { 
   "dense_l1": 20, 
   "dense_l2": 20, 
   "optimizer__learning_rate": 0.1, 
   "optimizer": "Adam", 
   "loss":"binary_crossentropy", 
} 
 
  
def create_model(dense_l1, dense_l2, meta): 
  
  n_features_in_ = meta["n_features_in_"] 
  X_shape_ = meta["X_shape_"] 
  n_classes_ = meta["n_classes_"] 
 
  model = Sequential() 
  model.add(Dense(n_features_in_, input_shape=X_shape_[1:], activation="relu")) 
  model.add(Dense(dense_l1, activation="relu")) 
  model.add(Dense(dense_l2, activation="relu")) 
  model.add(Dense(1, activation="sigmoid")) 
 
  return model   
 
mlflow.sklearn.autolog() 
with mlflow.start_run(run_name="sample_run"): 
 
  classfier = KerasClassifier( 
    create_model, 
    loss=param["loss"], 
    dense_l1=param["dense_l1"], 
    dense_l2=param["dense_l2"], 
    optimizer__learning_rate = param["optimizer__learning_rate"], 
    optimizer= param["optimizer"], 
) 
 
  # fit the pipeline 
  clf = Pipeline(steps=[('preprocessor', preprocessor), 
                      ('estimator', classfier)])   
 
  h = clf.fit(X_train, y_train.values) 
  # log scores 
  acc_score = clf.score(X=X_test, y=y_test) 
  mlflow.log_metric("accuracy", acc_score) 
  signature = infer_signature(X_test, clf.predict(X_test)) 
  model_nn = ModelWrapper(clf,)  
 
  mlflow.pyfunc.log_model( 
      python_model= model_nn, 
      artifact_path = "model",  
      signature = signature,  
      conda_env = conda_env 
  ) 
 
  
 
 

shan_chandra
Honored Contributor III
Honored Contributor III

could you please share the full error stack trace?

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.