cancel
Showing results for 
Search instead for 
Did you mean: 
Technical Blog
Explore in-depth articles, tutorials, and insights on data analytics and machine learning in the Databricks Technical Blog. Stay updated on industry trends, best practices, and advanced techniques.
cancel
Showing results for 
Search instead for 
Did you mean: 
NatashaSavic
Databricks Employee
Databricks Employee

The Need of Custom Functionalities

Machine learning pipelines are invaluable for organizing and structuring the model development process. However, as models become more sophisticated, the need for custom functionalities within these pipelines arises. This can introduce a few pain points, especially when it comes to creating and saving/logging custom PySpark-ML pipelines. But don’t worry, with the following hands-on demo we got you covered and you will be equipped with the necessary tools to accomplish building your PySpark-ML pipelines.

Recap on SparkML Pipelines

PySpark-ML provides robust pipelines for building end-to-end machine learning workflows. These pipelines encapsulate data preprocessing, feature engineering, model training, and evaluation in a structured manner, enhancing code organization and reproducibility. Additionally, model pipelines facilitate model scoring and serving integration, maintain high-level lineage visibility, offer flexibility for experimentation, and ensure transferability across different environments, all while seamlessly integrating with MLflow and your overall MLOps estate.

Why Custom Pipelines are Needed

Despite the strengths of standard PySpark-ML pipelines, there are scenarios where customization is essential. Custom transformations, encodings, or even entirely new algorithms might be required to address specific business or domain needs. In the example below we will create a few custom estimators and transformations that are not available with native PySpark-ML and that you may find useful for your own use case. Similarly, you can use those patterns to create your own pipelines.

Why it's Painful to Create and Persist Custom Pipelines

Creating and persisting (saving/logging) custom pipelines can be a cumbersome process. The native functionalities often require intricate boilerplate code for saving and loading custom model artifacts without which it is not possible to save or load your PySpark-ML model pipelines. This complexity can hinder development efforts and shift the focus away from refining the actual functionality of the ML pipeline. We will use an external library, sparkml-base-classes, to take those layers of abstraction away and have you implement your custom transformations in the blink of an eye.

A Step by Step intro to Creating your Custom Pyspark-ML Pipelines

1. Read and Transform the Dataset

To kick off our demo, we read the dataset from "/databricks-datasets" or from a downloadable Kaggle link. The classic dataset, sourced from Cortez et al., 2009, consists of red and white wine data. We convert it into a Spark DataFrame, add a "color" column, and categorize the "quality" column.

2. Create Custom Spark Transformers for Your Model Pipeline

This section delves into the creation of custom Spark transformers to create your machine learning pipeline. The transformers and estimator classes inherit crucial parameters from sparkml-base-classes, ensuring seamless persistence of custom components within the ML training pipeline. This not only facilitates the retention of custom transformations but also enables effortless logging through MLFlow.

3. Custom Transformers created in the demo:

Throughout the demo, we will define the following custom transformations:

  1. CustomAdder: Adds specified columns and creates a new column with the sum.
  2. TargetEncoderModel and TargetEncoder: Target encodes the "quality" column based on the "alcohol" target.

Let’s have a look into one of the custom transformers – we have added the necessary documentation directly into the code below. The full example can be found in the example notebook.

 

 

# Import necessary classes and modules
from sparkml_base_classes import TransformerBaseClass, EstimatorBaseClass
from pyspark import keyword_only
from pyspark.sql import DataFrame
from pyspark.sql import functions as F
from pyspark.ml.feature import StringIndexer
from pyspark.sql.types import StringType


class CustomAdder(TransformerBaseClass):
    """
    Example 2: Intermediate Transformer
    
    Combines multiple numeric columns through addition.
    Shows how to work with multiple input columns and create derived features.
    """
    @keyword_only
    def __init__(self, inputCols=None, outputCol=None):
        super().__init__()

    def _transform(self, df: DataFrame) -> DataFrame:
        df = df.withColumn(self._outputCol, sum(F.col(c) for c in self._inputCols))
        return df

 

 

For comparison, you can see the same CustomAdder class, just that it uses the native PySpark-ML boilerplate implementation in this gist. Now that we have our custom PySpark-ML transformers and models defined, we can assemble them into the overall training pipeline using native pyspark-ml pipeline transformations.

 

 

def get_wine_data_model_pipeline() -> Pipeline:
    """
    Builds an end-to-end pipeline combining our custom transformers:
    1. Encodes the wine color
    2. Combines acidity features
    3. Applies target encoding to quality
    4. Assembles features for the final XGBoost model
    """
    # Encode categorical color feature
    color_encoder = StringIndexer(inputCol="color", outputCol="indexed_color")
    
    addition_transformer = CustomAdder(inputCols=['fixed acidity',
                    'volatile acidity'], outputCol='total acidity')

    quality_target_encoder = TargetEncoder(inputCol='quality', targetCol='alcohol',outputCol='encoded_quality')

        # Assembling features, make sure you do not include the target column as feature column. This one is later utilised as labelCol parameter in the ML model
    feature_cols = [#'quality',
                    'fixed acidity',
                    'volatile acidity',
                    'citric acid',
                    'residual sugar',
                    'chlorides',
                    'free sulfur dioxide',
                    'total sulfur dioxide',
                    'density',
                    'pH',
                    'sulphates',
                    #'alcohol',
                    #'color',
                    'indexed_color',
                    'total acidity',
                    'encoded_quality']
    
    vector_assembler = VectorAssembler(
        inputCols=feature_cols, outputCol="features")

    # Machine Learning model
    model = SparkXGBRegressor(features_col="features", label_col="alcohol")

    stages=[color_encoder, addition_transformer, quality_target_encoder, vector_assembler, model]
    # Pipeline
    pipeline = Pipeline(stages=stages)
    return pipeline

 

 

Finally, we wrap the get_wine_data_model_pipeline() into a MLFlow run for model tracking: 

 

 

import mlflow
import mlflow.spark
from pyspark.ml.evaluation import RegressionEvaluator

# Configure MLflow tracking
mlflow.set_tracking_uri('databricks')
# Example: Fit and save the pipeline using MLFlow
with mlflow.start_run(run_name='Linear Regression Wine') as run:
    pipeline = get_wine_data_model_pipeline()
    pipeline_model = pipeline.fit(train)

    # Transforming the test data using the model
    predictions = pipeline_model.transform(test)

    # Evaluate the model to get actual metrics
    evaluator = RegressionEvaluator(
        labelCol="alcohol", predictionCol="prediction")

    # Log metrics
    rmse = evaluator.evaluate(predictions, {evaluator.metricName: "rmse"})

    # Log model metrics
    mlflow.log_metric("root_mean_squared_error", rmse)

    # Log the model
    mlflow.spark.log_model(
        pipeline_model, artifact_path="custom_pipeline_model/")

 

 

Summary and Next Steps

In summary, understanding and overcoming the pain points associated with custom PySpark-ML pipelines is crucial for efficient machine learning model development. The sparkml_base_classes library offers a straightforward solution by reducing the complexity of saving and loading custom model artifacts and it integrates perfectly with MLflow. To run the full demo, you can access the following example.

To run it in Databricks, simply copy and paste the link of the associated Python file into your workspace and you’re good to go!