09-09-2021 08:52 PM
I have a set of pre-processing stages in a sklearn `Pipeline` and an estimator which is a `KerasClassifier` (`from tensorflow.keras.wrappers.scikit_learn import KerasClassifier`).
My overall goal is to tune and log the whole sklearn pipeline in `mlflow` (in databricks even). I get a confusing type error which I can't figure out how to reslove:
> TypeError: can't pickle _thread.RLock objects
I have the following code (without tuning stage) which returns the above error:
conda_env = _mlflow_conda_env(
additional_conda_deps=None,
additional_pip_deps=[
"cloudpickle=={}".format(cloudpickle.__version__),
"scikit-learn=={}".format(sklearn.__version__),
"numpy=={}".format(np.__version__),
"tensorflow=={}".format(tf.__version__),
],
additional_conda_channels=None,
)
search_space = {
"estimator__dense_l1": 20,
"estimator__dense_l2": 20,
"estimator__learning_rate": 0.1,
"estimator__optimizer": "Adam",
}
def create_model(n):
model = Sequential()
model.add(Dense(int(n["estimator__dense_l1"]), activation="relu"))
model.add(Dense(int(n["estimator__dense_l2"]), activation="relu"))
model.add(Dense(1, activation="sigmoid"))
model.compile(
loss="binary_crossentropy",
optimizer=n["estimator__optimizer"],
metrics=["accuracy"],
)
return model
mlflow.sklearn.autolog()
with mlflow.start_run(nested=True) as run:
classfier = KerasClassifier(build_fn=create_model, n=search_space)
# fit the pipeline
clf = Pipeline(steps=[("preprocessor", preprocessor),
("estimator", classfier)])
h = clf.fit(
X_train,
y_train.values,
estimator__validation_split=0.2,
estimator__epochs=10,
estimator__verbose=2,
)
# log scores
acc_score = clf.score(X=X_test, y=y_test)
mlflow.log_metric("accuracy", acc_score)
signature = infer_signature(X_test, clf.predict(X_test))
# Log the model with a signature that defines the schema of the model's inputs and outputs.
mlflow.sklearn.log_model(
sk_model=clf, artifact_path="model",
signature=signature,
conda_env=conda_env
)
I also get this warning before the error:
```
WARNING mlflow.sklearn.utils: Truncated the value of the key `steps`. Truncated value: `[('preprocessor', ColumnTransformer(n_jobs=None, remainder='drop', sparse_threshold=0.3,
transformer_weights=None,
transformers=[('num',
Pipeline(memory=None,
```
note the whole pipeline runs outside mlflow.
can someone help?
09-13-2021 09:18 PM
I think I find sort of a workaround, but I think this issue needs to be addressed anyways.
What I did is not the best way.
I used a python package called scikeras that does this wrapping and then could log the model
The code:
import scikeras
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Input, Dense, Dropout, LSTM, Flatten, Activation
from scikeras.wrappers import KerasClassifier
class ModelWrapper(mlflow.pyfunc.PythonModel):
def __init__(self, model):
self.model = model
def predict(self, context, model_input):
return self.model.predict(model_input)
conda_env = _mlflow_conda_env(
additional_conda_deps=None,
additional_pip_deps=[
"cloudpickle=={}".format(cloudpickle.__version__),
"scikit-learn=={}".format(sklearn.__version__),
"numpy=={}".format(np.__version__),
"tensorflow=={}".format(tf.__version__),
"scikeras=={}".format(scikeras.__version__),
],
additional_conda_channels=None,
)
param = {
"dense_l1": 20,
"dense_l2": 20,
"optimizer__learning_rate": 0.1,
"optimizer": "Adam",
"loss":"binary_crossentropy",
}
def create_model(dense_l1, dense_l2, meta):
n_features_in_ = meta["n_features_in_"]
X_shape_ = meta["X_shape_"]
n_classes_ = meta["n_classes_"]
model = Sequential()
model.add(Dense(n_features_in_, input_shape=X_shape_[1:], activation="relu"))
model.add(Dense(dense_l1, activation="relu"))
model.add(Dense(dense_l2, activation="relu"))
model.add(Dense(1, activation="sigmoid"))
return model
mlflow.sklearn.autolog()
with mlflow.start_run(run_name="sample_run"):
classfier = KerasClassifier(
create_model,
loss=param["loss"],
dense_l1=param["dense_l1"],
dense_l2=param["dense_l2"],
optimizer__learning_rate = param["optimizer__learning_rate"],
optimizer= param["optimizer"],
)
# fit the pipeline
clf = Pipeline(steps=[('preprocessor', preprocessor),
('estimator', classfier)])
h = clf.fit(X_train, y_train.values)
# log scores
acc_score = clf.score(X=X_test, y=y_test)
mlflow.log_metric("accuracy", acc_score)
signature = infer_signature(X_test, clf.predict(X_test))
model_nn = ModelWrapper(clf,)
mlflow.pyfunc.log_model(
python_model= model_nn,
artifact_path = "model",
signature = signature,
conda_env = conda_env
)
09-10-2021 12:10 AM
no one?
09-10-2021 07:11 AM
@Kaniz Fatma - Can you jump in here?
09-11-2021 12:44 AM
Thanks @Kaniz Fatma !
Just to clarify I have no issue logging a sklearn model and pipeline, for example if I replace this part of the above code from:
clf = Pipeline(steps=[("preprocessor", preprocessor),
("estimator", classfier)])
to:
clf = Pipeline(steps=[("preprocessor", preprocessor),
("estimator", RandomForestClassifier())])
it works without issue.
The problem is when you wrap a Keras model .
09-13-2021 09:18 PM
I think I find sort of a workaround, but I think this issue needs to be addressed anyways.
What I did is not the best way.
I used a python package called scikeras that does this wrapping and then could log the model
The code:
import scikeras
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Input, Dense, Dropout, LSTM, Flatten, Activation
from scikeras.wrappers import KerasClassifier
class ModelWrapper(mlflow.pyfunc.PythonModel):
def __init__(self, model):
self.model = model
def predict(self, context, model_input):
return self.model.predict(model_input)
conda_env = _mlflow_conda_env(
additional_conda_deps=None,
additional_pip_deps=[
"cloudpickle=={}".format(cloudpickle.__version__),
"scikit-learn=={}".format(sklearn.__version__),
"numpy=={}".format(np.__version__),
"tensorflow=={}".format(tf.__version__),
"scikeras=={}".format(scikeras.__version__),
],
additional_conda_channels=None,
)
param = {
"dense_l1": 20,
"dense_l2": 20,
"optimizer__learning_rate": 0.1,
"optimizer": "Adam",
"loss":"binary_crossentropy",
}
def create_model(dense_l1, dense_l2, meta):
n_features_in_ = meta["n_features_in_"]
X_shape_ = meta["X_shape_"]
n_classes_ = meta["n_classes_"]
model = Sequential()
model.add(Dense(n_features_in_, input_shape=X_shape_[1:], activation="relu"))
model.add(Dense(dense_l1, activation="relu"))
model.add(Dense(dense_l2, activation="relu"))
model.add(Dense(1, activation="sigmoid"))
return model
mlflow.sklearn.autolog()
with mlflow.start_run(run_name="sample_run"):
classfier = KerasClassifier(
create_model,
loss=param["loss"],
dense_l1=param["dense_l1"],
dense_l2=param["dense_l2"],
optimizer__learning_rate = param["optimizer__learning_rate"],
optimizer= param["optimizer"],
)
# fit the pipeline
clf = Pipeline(steps=[('preprocessor', preprocessor),
('estimator', classfier)])
h = clf.fit(X_train, y_train.values)
# log scores
acc_score = clf.score(X=X_test, y=y_test)
mlflow.log_metric("accuracy", acc_score)
signature = infer_signature(X_test, clf.predict(X_test))
model_nn = ModelWrapper(clf,)
mlflow.pyfunc.log_model(
python_model= model_nn,
artifact_path = "model",
signature = signature,
conda_env = conda_env
)
10-02-2021 07:02 PM
could you please share the full error stack trace?
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.
If there isn’t a group near you, start one and help create a community that brings people together.
Request a New Group