cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

how to log the KerasClassifier model in a sklearn pipeline in mlflow?

MGH1
New Contributor III

I have a set of pre-processing stages in a sklearn `Pipeline` and an estimator which is a `KerasClassifier` (`from tensorflow.keras.wrappers.scikit_learn import KerasClassifier`).

My overall goal is to tune and log the whole sklearn pipeline in `mlflow` (in databricks even). I get a confusing type error which I can't figure out how to reslove:

> TypeError: can't pickle _thread.RLock objects

I have the following code (without tuning stage) which returns the above error:

conda_env = _mlflow_conda_env(
   additional_conda_deps=None,
   additional_pip_deps=[
     "cloudpickle=={}".format(cloudpickle.__version__),
     "scikit-learn=={}".format(sklearn.__version__),
     "numpy=={}".format(np.__version__),
     "tensorflow=={}".format(tf.__version__),
   ],
   additional_conda_channels=None,
 )
 
 
 
search_space = {
   "estimator__dense_l1": 20,
   "estimator__dense_l2": 20,
   "estimator__learning_rate": 0.1,
   "estimator__optimizer": "Adam",
 }
 
  
def create_model(n):
 
   model = Sequential()
   model.add(Dense(int(n["estimator__dense_l1"]), activation="relu"))
   model.add(Dense(int(n["estimator__dense_l2"]), activation="relu"))
   model.add(Dense(1, activation="sigmoid"))
   model.compile(
     loss="binary_crossentropy",
     optimizer=n["estimator__optimizer"],
     metrics=["accuracy"],
   )
  
  return model
 
 
 
 
 
mlflow.sklearn.autolog()
 
with mlflow.start_run(nested=True) as run:
 
  classfier = KerasClassifier(build_fn=create_model, n=search_space)
   # fit the pipeline
   clf = Pipeline(steps=[("preprocessor", preprocessor), 
 
             ("estimator", classfier)])
   h = clf.fit(
     X_train,
     y_train.values,
     estimator__validation_split=0.2,
     estimator__epochs=10,
     estimator__verbose=2,
   )
 
 
  # log scores
   acc_score = clf.score(X=X_test, y=y_test)
   mlflow.log_metric("accuracy", acc_score)
 
  signature = infer_signature(X_test, clf.predict(X_test))
   # Log the model with a signature that defines the schema of the model's inputs and outputs.
   mlflow.sklearn.log_model(
     sk_model=clf, artifact_path="model", 
     signature=signature, 
     conda_env=conda_env
   )

I also get this warning before the error:

```

  WARNING mlflow.sklearn.utils: Truncated the value of the key `steps`. Truncated value: `[('preprocessor', ColumnTransformer(n_jobs=None, remainder='drop', sparse_threshold=0.3,

           transformer_weights=None,

           transformers=[('num',

                  Pipeline(memory=None,

```

note the whole pipeline runs outside mlflow.

can someone help?

1 ACCEPTED SOLUTION

Accepted Solutions

MGH1
New Contributor III

I think I find sort of a workaround, but I think this issue needs to be addressed anyways.

What I did is not the best way.

I used a python package called scikeras that does this wrapping and then could log the model

The code:

import scikeras 
import tensorflow as tf 
from tensorflow.keras.models import Sequential 
from tensorflow.keras.layers import Input, Dense, Dropout, LSTM, Flatten, Activation 
 
from scikeras.wrappers import KerasClassifier 
  
 
class ModelWrapper(mlflow.pyfunc.PythonModel): 
    def __init__(self, model): 
        self.model = model 
 
    def predict(self, context, model_input): 
        return self.model.predict(model_input) 
 
conda_env =  _mlflow_conda_env( 
      additional_conda_deps=None, 
      additional_pip_deps=[ 
        "cloudpickle=={}".format(cloudpickle.__version__),  
        "scikit-learn=={}".format(sklearn.__version__), 
        "numpy=={}".format(np.__version__), 
        "tensorflow=={}".format(tf.__version__), 
        "scikeras=={}".format(scikeras.__version__), 
      ], 
      additional_conda_channels=None, 
  ) 
 
param = { 
   "dense_l1": 20, 
   "dense_l2": 20, 
   "optimizer__learning_rate": 0.1, 
   "optimizer": "Adam", 
   "loss":"binary_crossentropy", 
} 
 
  
def create_model(dense_l1, dense_l2, meta): 
  
  n_features_in_ = meta["n_features_in_"] 
  X_shape_ = meta["X_shape_"] 
  n_classes_ = meta["n_classes_"] 
 
  model = Sequential() 
  model.add(Dense(n_features_in_, input_shape=X_shape_[1:], activation="relu")) 
  model.add(Dense(dense_l1, activation="relu")) 
  model.add(Dense(dense_l2, activation="relu")) 
  model.add(Dense(1, activation="sigmoid")) 
 
  return model   
 
mlflow.sklearn.autolog() 
with mlflow.start_run(run_name="sample_run"): 
 
  classfier = KerasClassifier( 
    create_model, 
    loss=param["loss"], 
    dense_l1=param["dense_l1"], 
    dense_l2=param["dense_l2"], 
    optimizer__learning_rate = param["optimizer__learning_rate"], 
    optimizer= param["optimizer"], 
) 
 
  # fit the pipeline 
  clf = Pipeline(steps=[('preprocessor', preprocessor), 
                      ('estimator', classfier)])   
 
  h = clf.fit(X_train, y_train.values) 
  # log scores 
  acc_score = clf.score(X=X_test, y=y_test) 
  mlflow.log_metric("accuracy", acc_score) 
  signature = infer_signature(X_test, clf.predict(X_test)) 
  model_nn = ModelWrapper(clf,)  
 
  mlflow.pyfunc.log_model( 
      python_model= model_nn, 
      artifact_path = "model",  
      signature = signature,  
      conda_env = conda_env 
  ) 
 
  
 
 

View solution in original post

5 REPLIES 5

MGH1
New Contributor III

no one?

Anonymous
Not applicable

@Kaniz Fatma​ - Can you jump in here?

MGH1
New Contributor III

Thanks @Kaniz Fatma​ !

Just to clarify I have no issue logging a sklearn model and pipeline, for example if I replace this part of the above code from:

   clf = Pipeline(steps=[("preprocessor", preprocessor), 
                                                ("estimator", classfier)])

to:

   clf = Pipeline(steps=[("preprocessor", preprocessor), 
 
             ("estimator", RandomForestClassifier())])

it works without issue.

The problem is when you wrap a Keras model .

MGH1
New Contributor III

I think I find sort of a workaround, but I think this issue needs to be addressed anyways.

What I did is not the best way.

I used a python package called scikeras that does this wrapping and then could log the model

The code:

import scikeras 
import tensorflow as tf 
from tensorflow.keras.models import Sequential 
from tensorflow.keras.layers import Input, Dense, Dropout, LSTM, Flatten, Activation 
 
from scikeras.wrappers import KerasClassifier 
  
 
class ModelWrapper(mlflow.pyfunc.PythonModel): 
    def __init__(self, model): 
        self.model = model 
 
    def predict(self, context, model_input): 
        return self.model.predict(model_input) 
 
conda_env =  _mlflow_conda_env( 
      additional_conda_deps=None, 
      additional_pip_deps=[ 
        "cloudpickle=={}".format(cloudpickle.__version__),  
        "scikit-learn=={}".format(sklearn.__version__), 
        "numpy=={}".format(np.__version__), 
        "tensorflow=={}".format(tf.__version__), 
        "scikeras=={}".format(scikeras.__version__), 
      ], 
      additional_conda_channels=None, 
  ) 
 
param = { 
   "dense_l1": 20, 
   "dense_l2": 20, 
   "optimizer__learning_rate": 0.1, 
   "optimizer": "Adam", 
   "loss":"binary_crossentropy", 
} 
 
  
def create_model(dense_l1, dense_l2, meta): 
  
  n_features_in_ = meta["n_features_in_"] 
  X_shape_ = meta["X_shape_"] 
  n_classes_ = meta["n_classes_"] 
 
  model = Sequential() 
  model.add(Dense(n_features_in_, input_shape=X_shape_[1:], activation="relu")) 
  model.add(Dense(dense_l1, activation="relu")) 
  model.add(Dense(dense_l2, activation="relu")) 
  model.add(Dense(1, activation="sigmoid")) 
 
  return model   
 
mlflow.sklearn.autolog() 
with mlflow.start_run(run_name="sample_run"): 
 
  classfier = KerasClassifier( 
    create_model, 
    loss=param["loss"], 
    dense_l1=param["dense_l1"], 
    dense_l2=param["dense_l2"], 
    optimizer__learning_rate = param["optimizer__learning_rate"], 
    optimizer= param["optimizer"], 
) 
 
  # fit the pipeline 
  clf = Pipeline(steps=[('preprocessor', preprocessor), 
                      ('estimator', classfier)])   
 
  h = clf.fit(X_train, y_train.values) 
  # log scores 
  acc_score = clf.score(X=X_test, y=y_test) 
  mlflow.log_metric("accuracy", acc_score) 
  signature = infer_signature(X_test, clf.predict(X_test)) 
  model_nn = ModelWrapper(clf,)  
 
  mlflow.pyfunc.log_model( 
      python_model= model_nn, 
      artifact_path = "model",  
      signature = signature,  
      conda_env = conda_env 
  ) 
 
  
 
 

shan_chandra
Databricks Employee
Databricks Employee

could you please share the full error stack trace?

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group