- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
09-09-2021 08:52 PM
I have a set of pre-processing stages in a sklearn `Pipeline` and an estimator which is a `KerasClassifier` (`from tensorflow.keras.wrappers.scikit_learn import KerasClassifier`).
My overall goal is to tune and log the whole sklearn pipeline in `mlflow` (in databricks even). I get a confusing type error which I can't figure out how to reslove:
> TypeError: can't pickle _thread.RLock objects
I have the following code (without tuning stage) which returns the above error:
conda_env = _mlflow_conda_env(
additional_conda_deps=None,
additional_pip_deps=[
"cloudpickle=={}".format(cloudpickle.__version__),
"scikit-learn=={}".format(sklearn.__version__),
"numpy=={}".format(np.__version__),
"tensorflow=={}".format(tf.__version__),
],
additional_conda_channels=None,
)
search_space = {
"estimator__dense_l1": 20,
"estimator__dense_l2": 20,
"estimator__learning_rate": 0.1,
"estimator__optimizer": "Adam",
}
def create_model(n):
model = Sequential()
model.add(Dense(int(n["estimator__dense_l1"]), activation="relu"))
model.add(Dense(int(n["estimator__dense_l2"]), activation="relu"))
model.add(Dense(1, activation="sigmoid"))
model.compile(
loss="binary_crossentropy",
optimizer=n["estimator__optimizer"],
metrics=["accuracy"],
)
return model
mlflow.sklearn.autolog()
with mlflow.start_run(nested=True) as run:
classfier = KerasClassifier(build_fn=create_model, n=search_space)
# fit the pipeline
clf = Pipeline(steps=[("preprocessor", preprocessor),
("estimator", classfier)])
h = clf.fit(
X_train,
y_train.values,
estimator__validation_split=0.2,
estimator__epochs=10,
estimator__verbose=2,
)
# log scores
acc_score = clf.score(X=X_test, y=y_test)
mlflow.log_metric("accuracy", acc_score)
signature = infer_signature(X_test, clf.predict(X_test))
# Log the model with a signature that defines the schema of the model's inputs and outputs.
mlflow.sklearn.log_model(
sk_model=clf, artifact_path="model",
signature=signature,
conda_env=conda_env
)
I also get this warning before the error:
```
WARNING mlflow.sklearn.utils: Truncated the value of the key `steps`. Truncated value: `[('preprocessor', ColumnTransformer(n_jobs=None, remainder='drop', sparse_threshold=0.3,
transformer_weights=None,
transformers=[('num',
Pipeline(memory=None,
```
note the whole pipeline runs outside mlflow.
can someone help?
- Labels:
-
MlFlow
-
Python
-
Sklean Pipeline
Accepted Solutions
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
09-13-2021 09:18 PM
I think I find sort of a workaround, but I think this issue needs to be addressed anyways.
What I did is not the best way.
I used a python package called scikeras that does this wrapping and then could log the model
The code:
import scikeras
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Input, Dense, Dropout, LSTM, Flatten, Activation
from scikeras.wrappers import KerasClassifier
class ModelWrapper(mlflow.pyfunc.PythonModel):
def __init__(self, model):
self.model = model
def predict(self, context, model_input):
return self.model.predict(model_input)
conda_env = _mlflow_conda_env(
additional_conda_deps=None,
additional_pip_deps=[
"cloudpickle=={}".format(cloudpickle.__version__),
"scikit-learn=={}".format(sklearn.__version__),
"numpy=={}".format(np.__version__),
"tensorflow=={}".format(tf.__version__),
"scikeras=={}".format(scikeras.__version__),
],
additional_conda_channels=None,
)
param = {
"dense_l1": 20,
"dense_l2": 20,
"optimizer__learning_rate": 0.1,
"optimizer": "Adam",
"loss":"binary_crossentropy",
}
def create_model(dense_l1, dense_l2, meta):
n_features_in_ = meta["n_features_in_"]
X_shape_ = meta["X_shape_"]
n_classes_ = meta["n_classes_"]
model = Sequential()
model.add(Dense(n_features_in_, input_shape=X_shape_[1:], activation="relu"))
model.add(Dense(dense_l1, activation="relu"))
model.add(Dense(dense_l2, activation="relu"))
model.add(Dense(1, activation="sigmoid"))
return model
mlflow.sklearn.autolog()
with mlflow.start_run(run_name="sample_run"):
classfier = KerasClassifier(
create_model,
loss=param["loss"],
dense_l1=param["dense_l1"],
dense_l2=param["dense_l2"],
optimizer__learning_rate = param["optimizer__learning_rate"],
optimizer= param["optimizer"],
)
# fit the pipeline
clf = Pipeline(steps=[('preprocessor', preprocessor),
('estimator', classfier)])
h = clf.fit(X_train, y_train.values)
# log scores
acc_score = clf.score(X=X_test, y=y_test)
mlflow.log_metric("accuracy", acc_score)
signature = infer_signature(X_test, clf.predict(X_test))
model_nn = ModelWrapper(clf,)
mlflow.pyfunc.log_model(
python_model= model_nn,
artifact_path = "model",
signature = signature,
conda_env = conda_env
)
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
09-10-2021 12:10 AM
no one?

- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
09-10-2021 07:11 AM
@Kaniz Fatma - Can you jump in here?
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
09-11-2021 12:44 AM
Thanks @Kaniz Fatma !
Just to clarify I have no issue logging a sklearn model and pipeline, for example if I replace this part of the above code from:
clf = Pipeline(steps=[("preprocessor", preprocessor),
("estimator", classfier)])
to:
clf = Pipeline(steps=[("preprocessor", preprocessor),
("estimator", RandomForestClassifier())])
it works without issue.
The problem is when you wrap a Keras model .
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
09-13-2021 09:18 PM
I think I find sort of a workaround, but I think this issue needs to be addressed anyways.
What I did is not the best way.
I used a python package called scikeras that does this wrapping and then could log the model
The code:
import scikeras
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Input, Dense, Dropout, LSTM, Flatten, Activation
from scikeras.wrappers import KerasClassifier
class ModelWrapper(mlflow.pyfunc.PythonModel):
def __init__(self, model):
self.model = model
def predict(self, context, model_input):
return self.model.predict(model_input)
conda_env = _mlflow_conda_env(
additional_conda_deps=None,
additional_pip_deps=[
"cloudpickle=={}".format(cloudpickle.__version__),
"scikit-learn=={}".format(sklearn.__version__),
"numpy=={}".format(np.__version__),
"tensorflow=={}".format(tf.__version__),
"scikeras=={}".format(scikeras.__version__),
],
additional_conda_channels=None,
)
param = {
"dense_l1": 20,
"dense_l2": 20,
"optimizer__learning_rate": 0.1,
"optimizer": "Adam",
"loss":"binary_crossentropy",
}
def create_model(dense_l1, dense_l2, meta):
n_features_in_ = meta["n_features_in_"]
X_shape_ = meta["X_shape_"]
n_classes_ = meta["n_classes_"]
model = Sequential()
model.add(Dense(n_features_in_, input_shape=X_shape_[1:], activation="relu"))
model.add(Dense(dense_l1, activation="relu"))
model.add(Dense(dense_l2, activation="relu"))
model.add(Dense(1, activation="sigmoid"))
return model
mlflow.sklearn.autolog()
with mlflow.start_run(run_name="sample_run"):
classfier = KerasClassifier(
create_model,
loss=param["loss"],
dense_l1=param["dense_l1"],
dense_l2=param["dense_l2"],
optimizer__learning_rate = param["optimizer__learning_rate"],
optimizer= param["optimizer"],
)
# fit the pipeline
clf = Pipeline(steps=[('preprocessor', preprocessor),
('estimator', classfier)])
h = clf.fit(X_train, y_train.values)
# log scores
acc_score = clf.score(X=X_test, y=y_test)
mlflow.log_metric("accuracy", acc_score)
signature = infer_signature(X_test, clf.predict(X_test))
model_nn = ModelWrapper(clf,)
mlflow.pyfunc.log_model(
python_model= model_nn,
artifact_path = "model",
signature = signature,
conda_env = conda_env
)
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
10-02-2021 07:02 PM
could you please share the full error stack trace?

