AutoGluon MLflow integration
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
02-27-2025 08:40 PM - edited 02-27-2025 08:42 PM
I am working on a personalized price package recommendation and implemented an AutoGluon code integrating it with MLflow.
The code has been created in a modular fashion to be used by other team members. They just need to pass the data, target column and experiment name to create the experiment.
I always face some problems when logging the model with MLflow, any help would be greatly appreciated.
This is my code:
class AutoGluonPyFuncWrapper(mlflow.pyfunc.PythonModel):
"""Wrapper for AutoGluon model to be logged as a PyFunc model in MLflow."""
def __init__(self, model_path):
self.model_path = model_path
self.predictor = None # Model will be loaded in predict method
def load_context(self, context):
"""Loads the AutoGluon model when MLflow loads the PyFunc model."""
self.predictor = TabularPredictor.load(self.model_path)
def predict(self, context, model_input):
"""
Predict probability scores for the given input.
model_input: Pandas DataFrame
Returns: Pandas DataFrame with probability scores
"""
if isinstance(model_input, pd.DataFrame):
predictions = self.predictor.predict_proba(model_input)
else:
predictions = self.predictor.predict_proba(pd.DataFrame(model_input))
# Get the class label for positive class dynamically
positive_class = predictions.columns[-1] # Last column is usually the positive class
return predictions[[positive_class]] # Return only probability of positive class
class AutoGluonMLflowClassifier:
def __init__(self, model_data: pd.DataFrame, target_col: str, experiment_name: str):
"""
Initializes the classifier with Databricks table name, target column, and MLflow experiment name.
"""
self.model_data = model_data
self.target_col = target_col
self.experiment_name = experiment_name
self.predictor = None
self.train_predictions = None
self.val_predictions = None
self._initialize_mlflow()
def _initialize_mlflow(self):
"""Sets up MLflow experiment dynamically in Databricks."""
# Define experiment path (store it in the user's workspace)
experiment_path = f"/Shared/automl_experiments/{self.experiment_name}"
# Check if the experiment already exists
experiment = mlflow.get_experiment_by_name(experiment_path)
if experiment is None:
# Create a new experiment if it does not exist
experiment_id = mlflow.create_experiment(experiment_path)
print(f"Created new MLflow experiment at: {experiment_path}")
else:
experiment_id = experiment.experiment_id
print(f"Using existing MLflow experiment: {experiment_path}")
# Set the experiment to use
mlflow.set_experiment(experiment_path)
def split_data(self):
self.train_data, self.val_data = train_test_split(self.model_data, test_size=0.2, random_state=42)
print(self.train_data.columns)
def train_model(self, time_limit: int = 200):
"""Trains AutoGluon model and logs parameters, metrics, and artifacts in MLflow."""
hyperparameters = {
"GBM": { # LightGBM
"num_boost_round": 1000, # More boosting rounds
"learning_rate": 0.02, # Lower learning rate for better generalization
"num_leaves": 31, # Leaf complexity
"feature_fraction": 0.8, # Feature bagging
"bagging_fraction": 0.8, # Sample bagging
"bagging_freq": 5, # Frequency of bagging
"min_data_in_leaf": 20, # Minimum samples per leaf
},
"XGB": { # XGBoost
"n_estimators": 1000,
"learning_rate": 0.02,
"max_depth": 6, # Controls complexity
"subsample": 0.8, # Sample fraction per tree
"colsample_bytree": 0.8, # Feature bagging
"gamma": 0.2, # Regularization
"lambda": 1, # L2 regularization
},
"CAT": { # CatBoost
"iterations": 1000,
"learning_rate": 0.02,
"depth": 6,
"l2_leaf_reg": 3, # L2 regularization
"border_count": 32, # Number of bins for numeric features
},
"NN_TORCH": { # Neural Network (PyTorch)
"num_epochs": 100, # Increase training epochs
"learning_rate": 0.001,
"dropout_prob": 0.1, # Dropout regularization
"weight_decay": 1e-5, # L2 weight regularization
"hidden_size": 256, # Hidden layer size
}
}
dbfs_model_path = "dbfs:/FileStore/automl/autogluon/"
local_model_path = "/Shared/automl_experiments/autogluon_model/"
with mlflow.start_run() as run:
# Training AutoGluon model with AUC as the evaluation metric
self.predictor = TabularPredictor(problem_type = "binary",
label = self.target_col,
eval_metric = "roc_auc",
path = local_model_path) \
.fit(self.train_data,
excluded_model_types = ["KNN", "RF"],
hyperparameters = hyperparameters,
presets = "best_quality",
num_bag_folds = 3,
num_stack_levels = 1,
time_limit = time_limit,
verbosity = 1, # Reduce logs
num_cpus = 4, # Limit CPU usage
num_gpus = 0,
ag_args_fit = {"num_cpus": 1, "num_gpus": 0} # Ensure sequential training
)
print(f"Model saved at: {local_model_path}")
dbutils.fs.rm(dbfs_model_path, recurse=True)
dbutils.fs.cp(f"file:{local_model_path}", dbfs_model_path, recurse=True)
# log dataset size
mlflow.log_params({"trainning_data_size": self.train_data.shape[0],
"validation_data_size": self.val_data.shape[0]})
# Making predictions on training and validation datasets
self.train_predictions = self.predictor.predict_proba(self.train_data.drop(columns = [self.target_col])).iloc[:, -1] # Get probabilities for positive class
self.val_predictions = self.predictor.predict_proba(self.val_data.drop(columns = [self.target_col])).iloc[:, -1] # Get probabilities for positive class
print("Training predictions:", self.train_predictions)
# Compute and log both training and validation metrics
self.compute_metrics(self.train_data[self.target_col], self.train_predictions, "train")
self.compute_metrics(self.val_data[self.target_col], self.val_predictions, "validation")
print("Logging model to MLflow...")
# generate the model signature
signature = infer_signature(model_input = self.train_data.drop(columns = [self.target_col]),
model_output = self.train_predictions)
model_wrapper = AutoGluonPyFuncWrapper(local_model_path)
artifacts = {"predictor_path": dbfs_model_path}
mlflow.pyfunc.log_model(artifact_path = "model",
python_model = model_wrapper,
input_example = self.X_train[:2],
signature = signature,
artifacts = artifacts)
self.run_id = run.info.run_id # Store run ID
print(f"Model logged successfully. Run ID: {self.run_id}")
# Calculating classification report
report = classification_report(self.val_data.drop(columns = [self.target_col]), self.val_predictions.round(), output_dict=True)
mlflow.log_dict(report, "classification_report.json")
# Define metric calculation function
def compute_metrics(self, y_true, y_pred, prefix):
"""Computes and logs metrics with a specified prefix (train/validation)."""
metrics = {
f"{prefix}_auc": roc_auc_score(y_true, y_pred),
f"{prefix}_average_precision": average_precision_score(y_true, y_pred),
f"{prefix}_f1_score": f1_score(y_true, y_pred > 0.5),
f"{prefix}_f2_score": fbeta_score(y_true, y_pred > 0.5, beta=2.0),
f"{prefix}_brier_score": brier_score_loss(y_true, y_pred > 0.5),
f"{prefix}_recall": recall_score(y_true, y_pred > 0.5),
f"{prefix}_precision": precision_score(y_true, y_pred > 0.5),
}
for metric_name, value in metrics.items():
mlflow.log_metric(metric_name, value)
return metrics
def evaluate_model(self):
"""Evaluate the model using AUC metric."""
y_pred_proba = self.predictor.predict_proba(self.X_train).iloc[:, -1]
auc_score = roc_auc_score(self.y_true, y_pred_proba)
print(f"Model AUC: {auc_score:.4f}")
return auc_score
def run_pipeline(self):
"""Complete pipeline: data generation, training, evaluation, logging, and loading."""
self.split_data()
self.train_model()
auc_score = self.evaluate_model()
from ucimlrepo import fetch_ucirepo
# fetch dataset
adult = fetch_ucirepo(id=2)
# data (as pandas dataframes)
X = adult.data.features
y = adult.data.targets
data = X.copy()
data['income'] = y['income']
data.head()
data['income'] = data['income'].replace({'<=50K.': '<=50K', '>50K.': '>50K'})
data['income'] = data['income'].replace({'<=50K': 0, '>50K': 1})
data['income'].value_counts()
# Example Usage:
classifier = AutoGluonMLflowClassifier(model_data = data,
target_col = "income",
experiment_name = "autogluon_sample_experiment")
classifier.run_pipeline()

