cancel
Showing results for 
Search instead for 
Did you mean: 
Technical Blog
cancel
Showing results for 
Search instead for 
Did you mean: 
StuartLynn
New Contributor III
New Contributor III

cover.png

Introduction

This article describes the key ways in which data scientists can exploit the parallelisation power of Apache Spark™ to optimise their workflows, solve a new set of problems and scale their solutions for production applications. It identifies a number of specific tasks within the data science/machine learning domain that are good candidates forApache Spark and describes howApache Spark can be used to tackle these.

Many data science teams build solutions using Python-based tooling in a local (or basic cloud VM-based) development environment. Usually, this means they are subject to the following constraints:

  • the data must fit in memory; and
  • the ability to parallelise tasks is limited to packages that provide support for multithreading.

You can, of course, work around these (e.g., lazy loading/sharding of data and orchestrating a parallelisation strategy using libraries such as `threading`), but this tends to introduce significant complexity and brittleness to workflows.

By using Databricks and Apache Spark, a number of well-defined design patterns can be followed to overcome these constraints.

You won't want to miss this...

As a companion to this article, we spoke to one of our most valued partners, Gavi Regunath, about this subject. You can watch the recording of that discussion on the Advancing Analytics YouTube channel.


Table of Contents


Identifying opportunities within traditional data science workflows

The diagram below asks us to consider the flows of data along the two primary data science workflows. Since the scope of data science work often extends beyond just the use of machine learning to create predictive models, we might choose to refer to these two workflows very generally as: “spotting the pattern” and “exploiting the pattern”. This allows us to expand our discussion to cover tasks from ML-adjacent fields such as operations research, engineering, and bioinformatics.

StuartLynn_2-1715332162772.png

Let’s explore the three scenarios described in the diagram to understand the issues that might be encountered and how using Apache Spark can help us overcome these.

Scenario 1: Pre-processing data required for “spotting the pattern”

In the case of tasks like supervised machine learning, “spotting the pattern” generally requires us to feed a large dataset to an algorithm whose objective is to determine the parameters of a model that accurately describes that data. In reality, at least for traditional ML applications, the datasets used for model training are not typically large enough to exhaust the memory available to the machine running the supervised learning process. Where they do exceed this limit, savvy data scientists tend to subsample this data, understanding that there is something of a law of diminishing returns at work here: gains in model performance generated by increasing the available data start to decrease beyond a certain point (see here for a sidebar discussion on this topic).

Although the final training dataset provided to a model might not exceed the limitations of the model training environment, it is likely produced from more than one upstream dataset, some of which may be very large and require complex aggregations, e.g., turning millions of online shopping transactions into some set of customer behaviour variables by aggregating the number and value of each customer’s transactions over 30, 60 and 90-day intervals within each product category. This is the hurdle described in scenario 1 above.

In these situations:

  • Databricks and Apache Spark can be employed to allow data scientists to perform exploratory data analysis, data pre-processing and feature engineering at a far greater scale than would otherwise be possible.
  • This analytical work can be undertaken in the same environment in which models are trained, simulations run, etc. (i.e., the Databricks Lakehouse), obviating the need to extract data from a data warehouse in order to begin those processes.
    • As a corollary, writing feature engineering code using Apache Spark on Databricks allows for easy ‘productionisation’ of this code (using, e.g., Databricks Workflows) and greater interlock between the work of data engineers (who typically focus on ingesting new datasets, cleaning them and scheduling updates) and that of the data scientists, including the ability to track data lineage from ingestion to model training/inference.
  • This type of activity does not require the data scientist to master the PySpark API: both ‘pandas on spark’ and ad-hoc SQL analysis are good alternatives.

Scenario 2: When the data isn’t big but the problem is

This is exactly the problem described in scenario 2: the inputs to some analytical process are not prohibitively large, but the computations that need to be performed on them are very resource-intensive. There are many categories of problems within the data science domain that fit this description, including

  • Simulation and optimisation problems;
  • Entity resolution and other classes of polynomial time complexity problems; and
  • Fine-grained forecasting and other “grouped” model training applications.

Let’s explore these common examples in turn.

Simulation and optimisation

We have encountered many customers looking to solve simulation and optimisation problems using Databricks. Depending on the specifics of the problem, these can be prime candidates for piggybacking on Apache Spark.

An example of a ‘good’ candidate problem might be optimisation at the entity level on a dataset covering multiple entities. We encountered a scenario just like this when working with a consumer goods organisation aiming to optimise pricing across multiple markets. 

A proven pattern to address this type of problem efficiently is to write an Spark pandas user defined function (UDF) and apply it on a grouped Spark DataFrame. In this way, we can execute arbitrary python code on each group of rows from a Spark DataFrame. Spark feeds each group’s rows to the function as a pandas DataFrame object. The customer in question had already created an optimiser class that worked for a single market at a time so, within the UDF, all we needed to do was determine the parameters required for the optimiser to run and execute it on the incoming pandas DataFrame.

Let’s explore this solution with a code example. The first element is the basic Python function that runs the optimisation process:

 

def optimise_for_market(pdf: pd.DataFrame) -> pd.DataFrame:
  first_row = next(pdf.itertuples())
  results_df = pdf.copy()
  
  pricing_optimiser = PricingOptimiser(
    input_df=pdf, market_id=first_row.BU_ID, 
    experiment=experiment, 
    max_evals=200, initial_step_size=100)
  prices, demand, return_code = pricing_optimiser.optimise()
  
  results_df["OPTIMAL_PRICE"] = prices
  results_df["EXPECTED_DEMAND"] = demand
  results_df["OPTIMISER_RETURN_CODE"] = return_code
  
  return results_df

 

If you look at the type hints in the function declaration, you’ll see it takes a pandas.DataFrame as an input and returns a pandas.DataFrame. The returned DataFrame contains all of the original columns plus some extra columns with the outputs of the optimisation routine. Just as a side note here: there’s no rule that says the input and output DataFrames need to have the same number of rows, or that the output DataFrame needs to contain any of the same columns as the input.

The second part is the PySpark transformation that executes this UDF on groups of the input DataFrame’s rows:

 

spark.conf.set("spark.sql.adaptive.enabled", "false")

optimiser_return_schema = "BU_ID long, PRODUCT_ID long, STORE_ID long, PRICE_LAST_WEEK double, UNITS_LAST_WEEK long, OPTIMAL_PRICE double, EXPECTED_DEMAND double, OPTIMISER_RETURN_CODE int"

optimiser_inputs = opt_dataset___all_markets.repartition(sc.defaultParallelism)
optimiser_inputs.cache()

(
  optimiser_inputs
  .groupBy("BU_ID")
  .applyInPandas(optimise_for_market, schema=optimiser_return_schema)
  .write
  .mode("overwrite")
 .saveAsTable("optimiser_outputs")
)

 

This is an example of an embarrassingly parallel computation in that none of the calls to the pricing optimisation routine rely on the outcome of previous runs. Some aspects worth highlighting here:

  • The key part of the pattern is groupBy().applyInPandas(). This will execute the pandas UDF optimise_for_market for each group of rows in its own Spark task.
  • We have to explicitly specify the schema of the DataFrame returned by the UDF.
  • The 1:1 mapping between row group and Spark task is only guaranteed with Adaptive Query Execution disabled. Hence, the spark.sql.adaptive.enabled=false directive is needed.

Similar examples of ‘good’ problems might come from customers using Monte Carlo simulation or Approximate Bayesian Computation techniques to estimate the distribution of parameters in a model. All calls to the model algorithm are independent and inputs can be sampled ahead of time and represented as rows in an input Spark DataFrame used to drive a process like the one above.

"Grouped" / granular model training

The canonical example here is fine-grained forecasting: producing forecasts at e.g., the store and SKU level. We’ve written about this quite a bit in the past:

  • Time Series Forecasting With Prophet And Spark (blog, notebooks)

Fundamentally, these operations are ‘embarrassingly parallel’ and we would ideally execute runs in parallel. Apache Spark provides the capability to parallelise arbitrary python code using the pandas UDF abstraction that was mentioned further up in this article and this is probably a good place to elaborate a little further on that pattern.

StuartLynn_3-1715332162741.png

By applying a pandas UDF to a grouped Spark DataFrame, we can provide the rows comprising each group to a separate instance of the function, running inside its own Spark task with these tasks then being distributed across the cluster by the scheduler. In the case of fine-grained forecasting, the function in question would, at the minimum, fit a model to the incoming time series data and probably go on to then extrapolate and output some future predicted values.

Forecast models of this kind are something of a special case since they are often strongly coupled to recent data and tend to go out of date very quickly. For this reason, the models themselves are not typically persisted, rather, we retrain and perform inference in a single step, often on a daily or more frequent basis.

When the training and inference processes are logically segregated (e.g., training happens weekly, inference daily), then a grouped model training process will introduce complexity at inference time as the scoring code needs to select the appropriate model based on some input parameter. If your problem exhibits these characteristics, then the following blog will be helpful:

  • Bootstrap your large-scale forecasting solution on Databricks with Many Models Forecasting (MMF) Project (blog, github project).

In either case, the mechanism at work here is still groupBy().applyInPandas().

Problems that exhibit polynomial or exponential complexity with data volumes

The canonical example in this category is entity resolution/record linkage. Apache Spark has all of the internals to partition the problem appropriately and make it scale out successfully. See splink, arc, and zingg as examples of how this can be implemented in practice.

Within the field of geospatial analytics, there are many operations that exhibit quadratic or worse time complexity, e.g., spatial joins between point and polygon sets for matching events to locations. Many Apache Spark extensions such as Mosaic and Sedona have built-in optimisations for partitioning source data appropriately and controlling the complexity of these operations.

Scenario 3: Scaling out for high-throughput production applications

The third and final scenario described in the diagram belongs within the ‘exploit the pattern’ workflow.

The challenge here is that a significant skew often exists between the volume and velocity of data assumed in the development of an analytical product and those encountered in its intended destination environment.

Pre-processing and feature engineering

Let’s begin by discussing the “data-heavy” part of this workflow. As you’ll find on your journey through the MLOps Gym materials, an ability to decouple pre-processing and feature engineering processes from the model training and inference steps will make testing, versioning, and deploying changes to an analytical product much easier. It also enables data science teams to follow the patterns employed by their data engineering colleagues with regard to scheduling and orchestration of these tasks.

If, at the time of training and evaluating models, you are:

  1. building and iterating on your feature engineering code using PySpark, pandas on Spark, or as a chain of SQL views; and
  2. saving features in feature tables on Unity Catalog (any Delta table with a primary key can be a feature table)

then, when it comes to deployment, the only work necessary is to wrap this code up as something that can be deployed as a Workflow. Your code can now scale to process very large volumes of data, potentially in a continuous and incremental fashion, if you opt to employ Spark structured streaming.

For some practical examples of how to implement this, take a look at our feature tables documentation pages. Here’s an example from that page:

 

 

def compute_additional_customer_features(data):
  ''' Returns Streaming DataFrame
  '''
  pass

from databricks.feature_engineering import FeatureEngineeringClient
fe = FeatureEngineeringClient()

customer_transactions = spark.readStream.load("dbfs:/events/customer_transactions")

fe.write_table(
  df=compute_additional_customer_features(customer_transactions),
  name='ml.recommender_system.customer_features',
  mode='merge'
)

 

Code written in this way will allow features in a feature table to be continuously updated in place using a Spark streaming pipeline. Furthermore, with the advent of Databricks Feature Serving (AWS | Azure), it is trivial to sync these features into an online store for real-time scoring applications.

Inference/execution

When putting a predictive model into production, some analysis of the throughput and latency requirements of the application is needed in order to determine the appropriate deployment pattern:

  • For the lowest cost, highest throughput applications where latency is not so critical, then a Spark batch pipeline that calls the model using the `mlflow.pyfunc.spark_udf` pattern is ideal.
  • If you wish to score incoming observations (delivered via, e.g., a pub/sub service), and are looking for near-real-time results but can tolerate sub-minute latency, then deploying MLflow models into a Spark structured streaming pipeline is the perfect fit. This can also be adapted for ‘incremental batch’ deployments using the Trigger.AvailableNow approach (AWS | Azure | GCP).
  • If low latency is the top priority, then deploying models into a model serving endpoint is the way to go (AWS | Azure).

Here’s a very quick example of the mlflow.pyfunc.spark_udf pattern mentioned above, if you haven’t seen it before:

 

import mlflow.pyfunc
from pyspark.sql.functions import struct

pyfunc_udf = mlflow.pyfunc.spark_udf(spark, model_uri=model_uri)
 
predicted_df = dataframe.withColumn("prediction", pyfunc_udf(struct('age', 'sex', 'bmi', 'bp', 's1', 's2', 's3', 's4', 's5', 's6')))

 

This retrieves a model from the MLflow tracking server and delivers it to the script as a vectorised Spark User Defined Function (UDF) (more info here). This is a well-proven pattern for scalable model inference in a batch or streaming pipeline and is compatible with many flavours of models. Take a look at the MLflow documentation for a comprehensive list. The only requirement is that you log the model using MLflow during your model training workflow.

The documentation also describes how to customise the wrapped model’s predict() method behaviour.

Opportunities within model tuning, deep-learning tasks, and working with unstructured data

The preceding discussion of data science workflows and opportunities for employing Apache Spark assumed the development of analytical applications that are fed structured data, and the advice therein applies mostly to the use of ‘traditional’ machine learning algorithms. Furthermore, we assumed that model training is a once-and-done activity (i.e. no compute-intensive model tuning activity was required) and, provided the data fits in memory, this can be tackled in a ‘single-node’ development environment.

The following sections of this document describe the options available for using Apache Spark when these assumptions do not hold true. More details on each of these will be provided later in the MLOps Gym material.

StuartLynn_4-1715332162879.png


The diagram above identifies some scenarios for parallelisation of the model training and tuning processes.

Model vs. data parallelism

At this point, it is probably worth articulating the distinction between ‘model parallel’ and ‘data parallel’ training in the context of deep learning.

  • Model parallelism describes the pattern of splitting a large model between multiple machines, cores, GPUs, etc., and feeding the results of each training subprocess to its downstream neighbour(s) in the sequence to perform updates. The process can be parallelised by continuing to compute gradients on stale weights while these are updated in a concurrent process.
  • Data parallelism typically involves sharding data across multiple nodes, with each node training its own variant of the model. A supervisor process aggregates results from each worker subprocess and updates the ‘final’ model before distributing this back to the workers and initiating another round of training.

Another common pattern of task parallelisation in the context of model training is parallel hyperparameter tuning. This process requires the training of many candidate models in the search for an optimal performing set of hyperparameters.

We shall examine how Apache Spark can be used in all of these scenarios.

Scenario 1: Model parallel training

Apache Spark 3.5.0 saw the introduction of a DeepspeedTorchDistributor class within the pyspark.ml package, allowing for pipeline-parallel training of pytorch models. More details and an example notebook can be found in our docs (AWS | Azure | GCP).

The benefits of this toolset will be particularly apparent when available GPU memory is limited and the models being trained are very large. This class also allows for some degree of data parallelism during both training and inference time, which is particularly useful for batch inference applications.

Scenario 2: Data parallel model training

Most often, we see this approach called for when customers are training models on datasets that comprise unstructured data. So why and how does our approach to efficient model training and evaluation change when we want to make predictions based on unstructured data: text, images etc?

By their nature, input datasets of this type are typically very large (e.g., libraries of several thousand images collected by borescope inspections of jet engine turbine blades), and deep-learning techniques will (usually) be best suited to tackling the tasks that use them.

Assuming that is the case, the training data will very often exceed the capacity of a single compute node and will require sharding across whatever compute target is being used to train the model. This is a problem that can also be solved fairly simply by using Apache Spark with the appropriate extension library for your desired workflow.

Here are some example patterns for doing this:

In general, these approaches require packaging existing, ‘single-node’ model training code into a function or class and wrapping these with some decorator or parent class that will handle the sharding of data and parallelisation of the model training and evaluation processes.


Scenario 3: Parallel hyperparameter tuning

Data scientists often want to automate the process of determining the optimal hyperparameters for the algorithm responsible for producing their predictive models. There are several good options for parallelising these searches using Apache Spark, including:

Tuning with AutoML

The Databricks AutoML tools perform a parallelised search for an optimal model across multiple algorithms and a basic hyperparameter search space. Although these tools abstract away a lot of the complexity of a search operation, you still have control over aspects such as the model evaluation metric to be optimised, the stopping criteria of the search, and so on. As a starting point for the ‘modeling’ and ‘evaluation’ phases of your project, this is a great option since it provides information about what techniques are likely to perform well on your task and saves writing heaps of boilerplate code.

Hyperopt SparkTrials

This is our go-to recommendation for performing parallel hyperparameter search since it allows you to take advantage of Hyperopt’s ability to refine the hyperparameter search space as the search progresses.

scikit-learn + joblib-spark

Some algorithms in scikit-learn (especially the tuning and cross-validation tools) let you specify a parallel backend. You can use the joblib-spark backend to use Apache Spark as that parallel backend. See the joblib-spark github page for an example.

Summary

In the course of this article, we have:

  • examined how data scientists can leverage the power of Apache Spark to create efficient data science and machine learning workflows;
  • discussed the scenarios where Apache Spark excels such as handling large datasets, resource-intensive computations, high-throughput applications, model tuning, and deep learning; and
  • covered some of the different parallelisation strategies that can be employed, such as model parallelism and data parallelism with some practical examples and patterns for using Apache Spark to achieve these.

Coming up next!

Next blog in this series: Getting started with version control in Databricks