cancel
Showing results for 
Search instead for 
Did you mean: 
Machine Learning
Dive into the world of machine learning on the Databricks platform. Explore discussions on algorithms, model training, deployment, and more. Connect with ML enthusiasts and experts.
cancel
Showing results for 
Search instead for 
Did you mean: 

AutoGluon MLflow integration

cleversuresh
New Contributor III

I am working on a personalized price package recommendation and implemented an AutoGluon code integrating it with MLflow.

The code has been created in a modular fashion to be used by other team members. They just need to pass the data, target column and experiment name to create the experiment.

I always face some problems when logging the model with MLflow, any help would be greatly appreciated.

This is my code:

class AutoGluonPyFuncWrapper(mlflow.pyfunc.PythonModel):
"""Wrapper for AutoGluon model to be logged as a PyFunc model in MLflow."""

def __init__(self, model_path):
self.model_path = model_path
self.predictor = None # Model will be loaded in predict method

def load_context(self, context):
"""Loads the AutoGluon model when MLflow loads the PyFunc model."""
self.predictor = TabularPredictor.load(self.model_path)

def predict(self, context, model_input):
"""
Predict probability scores for the given input.

model_input: Pandas DataFrame
Returns: Pandas DataFrame with probability scores
"""
if isinstance(model_input, pd.DataFrame):
predictions = self.predictor.predict_proba(model_input)
else:
predictions = self.predictor.predict_proba(pd.DataFrame(model_input))

# Get the class label for positive class dynamically
positive_class = predictions.columns[-1] # Last column is usually the positive class
return predictions[[positive_class]] # Return only probability of positive class


class AutoGluonMLflowClassifier:
def __init__(self, model_data: pd.DataFrame, target_col: str, experiment_name: str):
"""
Initializes the classifier with Databricks table name, target column, and MLflow experiment name.
"""
self.model_data = model_data
self.target_col = target_col
self.experiment_name = experiment_name
self.predictor = None
self.train_predictions = None
self.val_predictions = None
self._initialize_mlflow()

def _initialize_mlflow(self):
"""Sets up MLflow experiment dynamically in Databricks."""

# Define experiment path (store it in the user's workspace)
experiment_path = f"/Shared/automl_experiments/{self.experiment_name}"

# Check if the experiment already exists
experiment = mlflow.get_experiment_by_name(experiment_path)

if experiment is None:
# Create a new experiment if it does not exist
experiment_id = mlflow.create_experiment(experiment_path)
print(f"Created new MLflow experiment at: {experiment_path}")
else:
experiment_id = experiment.experiment_id
print(f"Using existing MLflow experiment: {experiment_path}")

# Set the experiment to use
mlflow.set_experiment(experiment_path)

def split_data(self):

self.train_data, self.val_data = train_test_split(self.model_data, test_size=0.2, random_state=42)
print(self.train_data.columns)
def train_model(self, time_limit: int = 200):
"""Trains AutoGluon model and logs parameters, metrics, and artifacts in MLflow."""
hyperparameters = {
"GBM": { # LightGBM
"num_boost_round": 1000, # More boosting rounds
"learning_rate": 0.02, # Lower learning rate for better generalization
"num_leaves": 31, # Leaf complexity
"feature_fraction": 0.8, # Feature bagging
"bagging_fraction": 0.8, # Sample bagging
"bagging_freq": 5, # Frequency of bagging
"min_data_in_leaf": 20, # Minimum samples per leaf
},
"XGB": { # XGBoost
"n_estimators": 1000,
"learning_rate": 0.02,
"max_depth": 6, # Controls complexity
"subsample": 0.8, # Sample fraction per tree
"colsample_bytree": 0.8, # Feature bagging
"gamma": 0.2, # Regularization
"lambda": 1, # L2 regularization
},
"CAT": { # CatBoost
"iterations": 1000,
"learning_rate": 0.02,
"depth": 6,
"l2_leaf_reg": 3, # L2 regularization
"border_count": 32, # Number of bins for numeric features
},
"NN_TORCH": { # Neural Network (PyTorch)
"num_epochs": 100, # Increase training epochs
"learning_rate": 0.001,
"dropout_prob": 0.1, # Dropout regularization
"weight_decay": 1e-5, # L2 weight regularization
"hidden_size": 256, # Hidden layer size
}
}

dbfs_model_path = "dbfs:/FileStore/automl/autogluon/"
local_model_path = "/Shared/automl_experiments/autogluon_model/"

with mlflow.start_run() as run:
# Training AutoGluon model with AUC as the evaluation metric
self.predictor = TabularPredictor(problem_type = "binary",
label = self.target_col,
eval_metric = "roc_auc",
path = local_model_path) \
.fit(self.train_data,
excluded_model_types = ["KNN", "RF"],
hyperparameters = hyperparameters,
presets = "best_quality",
num_bag_folds = 3,
num_stack_levels = 1,
time_limit = time_limit,
verbosity = 1, # Reduce logs
num_cpus = 4, # Limit CPU usage
num_gpus = 0,
ag_args_fit = {"num_cpus": 1, "num_gpus": 0} # Ensure sequential training
)

print(f"Model saved at: {local_model_path}")
dbutils.fs.rm(dbfs_model_path, recurse=True)
dbutils.fs.cp(f"file:{local_model_path}", dbfs_model_path, recurse=True)

# log dataset size
mlflow.log_params({"trainning_data_size": self.train_data.shape[0],
"validation_data_size": self.val_data.shape[0]})

# Making predictions on training and validation datasets
self.train_predictions = self.predictor.predict_proba(self.train_data.drop(columns = [self.target_col])).iloc[:, -1] # Get probabilities for positive class
self.val_predictions = self.predictor.predict_proba(self.val_data.drop(columns = [self.target_col])).iloc[:, -1] # Get probabilities for positive class
print("Training predictions:", self.train_predictions)

# Compute and log both training and validation metrics
self.compute_metrics(self.train_data[self.target_col], self.train_predictions, "train")
self.compute_metrics(self.val_data[self.target_col], self.val_predictions, "validation")

print("Logging model to MLflow...")
# generate the model signature
signature = infer_signature(model_input = self.train_data.drop(columns = [self.target_col]),
model_output = self.train_predictions)

model_wrapper = AutoGluonPyFuncWrapper(local_model_path)
artifacts = {"predictor_path": dbfs_model_path}
mlflow.pyfunc.log_model(artifact_path = "model",
python_model = model_wrapper,
input_example = self.X_train[:2],
signature = signature,
artifacts = artifacts)

self.run_id = run.info.run_id # Store run ID
print(f"Model logged successfully. Run ID: {self.run_id}")

# Calculating classification report
report = classification_report(self.val_data.drop(columns = [self.target_col]), self.val_predictions.round(), output_dict=True)
mlflow.log_dict(report, "classification_report.json")

# Define metric calculation function
def compute_metrics(self, y_true, y_pred, prefix):
"""Computes and logs metrics with a specified prefix (train/validation)."""
metrics = {
f"{prefix}_auc": roc_auc_score(y_true, y_pred),
f"{prefix}_average_precision": average_precision_score(y_true, y_pred),
f"{prefix}_f1_score": f1_score(y_true, y_pred > 0.5),
f"{prefix}_f2_score": fbeta_score(y_true, y_pred > 0.5, beta=2.0),
f"{prefix}_brier_score": brier_score_loss(y_true, y_pred > 0.5),
f"{prefix}_recall": recall_score(y_true, y_pred > 0.5),
f"{prefix}_precision": precision_score(y_true, y_pred > 0.5),
}
for metric_name, value in metrics.items():
mlflow.log_metric(metric_name, value)
return metrics

def evaluate_model(self):
"""Evaluate the model using AUC metric."""
y_pred_proba = self.predictor.predict_proba(self.X_train).iloc[:, -1]
auc_score = roc_auc_score(self.y_true, y_pred_proba)
print(f"Model AUC: {auc_score:.4f}")
return auc_score

def run_pipeline(self):
"""Complete pipeline: data generation, training, evaluation, logging, and loading."""
self.split_data()

self.train_model()
auc_score = self.evaluate_model()


from ucimlrepo import fetch_ucirepo

# fetch dataset
adult = fetch_ucirepo(id=2)

# data (as pandas dataframes)
X = adult.data.features
y = adult.data.targets

data = X.copy()
data['income'] = y['income']
data.head()
data['income'] = data['income'].replace({'<=50K.': '<=50K', '>50K.': '>50K'})
data['income'] = data['income'].replace({'<=50K': 0, '>50K': 1})
data['income'].value_counts()

# Example Usage:
classifier = AutoGluonMLflowClassifier(model_data = data,
target_col = "income",
experiment_name = "autogluon_sample_experiment")
classifier.run_pipeline()

0 REPLIES 0

Join Us as a Local Community Builder!

Passionate about hosting events and connecting people? Help us grow a vibrant local community—sign up today to get started!

Sign Up Now