This article describes the key ways in which data scientists can exploit the parallelisation power of Apache Spark™ to optimise their workflows, solve a new set of problems and scale their solutions for production applications. It identifies a number of specific tasks within the data science/machine learning domain that are good candidates forApache Spark and describes howApache Spark can be used to tackle these.
Many data science teams build solutions using Python-based tooling in a local (or basic cloud VM-based) development environment. Usually, this means they are subject to the following constraints:
You can, of course, work around these (e.g., lazy loading/sharding of data and orchestrating a parallelisation strategy using libraries such as `threading`), but this tends to introduce significant complexity and brittleness to workflows.
By using Databricks and Apache Spark, a number of well-defined design patterns can be followed to overcome these constraints.
As a companion to this article, we spoke to one of our most valued partners, Gavi Regunath, about this subject. You can watch the recording of that discussion on the Advancing Analytics YouTube channel.
Table of Contents
The diagram below asks us to consider the flows of data along the two primary data science workflows. Since the scope of data science work often extends beyond just the use of machine learning to create predictive models, we might choose to refer to these two workflows very generally as: “spotting the pattern” and “exploiting the pattern”. This allows us to expand our discussion to cover tasks from ML-adjacent fields such as operations research, engineering, and bioinformatics.
Let’s explore the three scenarios described in the diagram to understand the issues that might be encountered and how using Apache Spark can help us overcome these.
In the case of tasks like supervised machine learning, “spotting the pattern” generally requires us to feed a large dataset to an algorithm whose objective is to determine the parameters of a model that accurately describes that data. In reality, at least for traditional ML applications, the datasets used for model training are not typically large enough to exhaust the memory available to the machine running the supervised learning process. Where they do exceed this limit, savvy data scientists tend to subsample this data, understanding that there is something of a law of diminishing returns at work here: gains in model performance generated by increasing the available data start to decrease beyond a certain point (see here for a sidebar discussion on this topic).
Although the final training dataset provided to a model might not exceed the limitations of the model training environment, it is likely produced from more than one upstream dataset, some of which may be very large and require complex aggregations, e.g., turning millions of online shopping transactions into some set of customer behaviour variables by aggregating the number and value of each customer’s transactions over 30, 60 and 90-day intervals within each product category. This is the hurdle described in scenario 1 above.
In these situations:
This is exactly the problem described in scenario 2: the inputs to some analytical process are not prohibitively large, but the computations that need to be performed on them are very resource-intensive. There are many categories of problems within the data science domain that fit this description, including
Let’s explore these common examples in turn.
We have encountered many customers looking to solve simulation and optimisation problems using Databricks. Depending on the specifics of the problem, these can be prime candidates for piggybacking on Apache Spark.
An example of a ‘good’ candidate problem might be optimisation at the entity level on a dataset covering multiple entities. We encountered a scenario just like this when working with a consumer goods organisation aiming to optimise pricing across multiple markets.
A proven pattern to address this type of problem efficiently is to write an Spark pandas user defined function (UDF) and apply it on a grouped Spark DataFrame. In this way, we can execute arbitrary python code on each group of rows from a Spark DataFrame. Spark feeds each group’s rows to the function as a pandas DataFrame object. The customer in question had already created an optimiser class that worked for a single market at a time so, within the UDF, all we needed to do was determine the parameters required for the optimiser to run and execute it on the incoming pandas DataFrame.
Let’s explore this solution with a code example. The first element is the basic Python function that runs the optimisation process:
def optimise_for_market(pdf: pd.DataFrame) -> pd.DataFrame:
first_row = next(pdf.itertuples())
results_df = pdf.copy()
pricing_optimiser = PricingOptimiser(
input_df=pdf, market_id=first_row.BU_ID,
experiment=experiment,
max_evals=200, initial_step_size=100)
prices, demand, return_code = pricing_optimiser.optimise()
results_df["OPTIMAL_PRICE"] = prices
results_df["EXPECTED_DEMAND"] = demand
results_df["OPTIMISER_RETURN_CODE"] = return_code
return results_df
If you look at the type hints in the function declaration, you’ll see it takes a pandas.DataFrame as an input and returns a pandas.DataFrame. The returned DataFrame contains all of the original columns plus some extra columns with the outputs of the optimisation routine. Just as a side note here: there’s no rule that says the input and output DataFrames need to have the same number of rows, or that the output DataFrame needs to contain any of the same columns as the input.
The second part is the PySpark transformation that executes this UDF on groups of the input DataFrame’s rows:
spark.conf.set("spark.sql.adaptive.enabled", "false")
optimiser_return_schema = "BU_ID long, PRODUCT_ID long, STORE_ID long, PRICE_LAST_WEEK double, UNITS_LAST_WEEK long, OPTIMAL_PRICE double, EXPECTED_DEMAND double, OPTIMISER_RETURN_CODE int"
optimiser_inputs = opt_dataset___all_markets.repartition(sc.defaultParallelism)
optimiser_inputs.cache()
(
optimiser_inputs
.groupBy("BU_ID")
.applyInPandas(optimise_for_market, schema=optimiser_return_schema)
.write
.mode("overwrite")
.saveAsTable("optimiser_outputs")
)
This is an example of an embarrassingly parallel computation in that none of the calls to the pricing optimisation routine rely on the outcome of previous runs. Some aspects worth highlighting here:
Similar examples of ‘good’ problems might come from customers using Monte Carlo simulation or Approximate Bayesian Computation techniques to estimate the distribution of parameters in a model. All calls to the model algorithm are independent and inputs can be sampled ahead of time and represented as rows in an input Spark DataFrame used to drive a process like the one above.
The canonical example here is fine-grained forecasting: producing forecasts at e.g., the store and SKU level. We’ve written about this quite a bit in the past:
Fundamentally, these operations are ‘embarrassingly parallel’ and we would ideally execute runs in parallel. Apache Spark provides the capability to parallelise arbitrary python code using the pandas UDF abstraction that was mentioned further up in this article and this is probably a good place to elaborate a little further on that pattern.
By applying a pandas UDF to a grouped Spark DataFrame, we can provide the rows comprising each group to a separate instance of the function, running inside its own Spark task with these tasks then being distributed across the cluster by the scheduler. In the case of fine-grained forecasting, the function in question would, at the minimum, fit a model to the incoming time series data and probably go on to then extrapolate and output some future predicted values.
Forecast models of this kind are something of a special case since they are often strongly coupled to recent data and tend to go out of date very quickly. For this reason, the models themselves are not typically persisted, rather, we retrain and perform inference in a single step, often on a daily or more frequent basis.
When the training and inference processes are logically segregated (e.g., training happens weekly, inference daily), then a grouped model training process will introduce complexity at inference time as the scoring code needs to select the appropriate model based on some input parameter. If your problem exhibits these characteristics, then the following blog will be helpful:
In either case, the mechanism at work here is still groupBy().applyInPandas().
The canonical example in this category is entity resolution/record linkage. Apache Spark has all of the internals to partition the problem appropriately and make it scale out successfully. See splink, arc, and zingg as examples of how this can be implemented in practice.
Within the field of geospatial analytics, there are many operations that exhibit quadratic or worse time complexity, e.g., spatial joins between point and polygon sets for matching events to locations. Many Apache Spark extensions such as Mosaic and Sedona have built-in optimisations for partitioning source data appropriately and controlling the complexity of these operations.
The third and final scenario described in the diagram belongs within the ‘exploit the pattern’ workflow.
The challenge here is that a significant skew often exists between the volume and velocity of data assumed in the development of an analytical product and those encountered in its intended destination environment.
Let’s begin by discussing the “data-heavy” part of this workflow. As you’ll find on your journey through the MLOps Gym materials, an ability to decouple pre-processing and feature engineering processes from the model training and inference steps will make testing, versioning, and deploying changes to an analytical product much easier. It also enables data science teams to follow the patterns employed by their data engineering colleagues with regard to scheduling and orchestration of these tasks.
If, at the time of training and evaluating models, you are:
then, when it comes to deployment, the only work necessary is to wrap this code up as something that can be deployed as a Workflow. Your code can now scale to process very large volumes of data, potentially in a continuous and incremental fashion, if you opt to employ Spark structured streaming.
For some practical examples of how to implement this, take a look at our feature tables documentation pages. Here’s an example from that page:
def compute_additional_customer_features(data):
''' Returns Streaming DataFrame
'''
pass
from databricks.feature_engineering import FeatureEngineeringClient
fe = FeatureEngineeringClient()
customer_transactions = spark.readStream.load("dbfs:/events/customer_transactions")
fe.write_table(
df=compute_additional_customer_features(customer_transactions),
name='ml.recommender_system.customer_features',
mode='merge'
)
Code written in this way will allow features in a feature table to be continuously updated in place using a Spark streaming pipeline. Furthermore, with the advent of Databricks Feature Serving (AWS | Azure), it is trivial to sync these features into an online store for real-time scoring applications.
When putting a predictive model into production, some analysis of the throughput and latency requirements of the application is needed in order to determine the appropriate deployment pattern:
Here’s a very quick example of the mlflow.pyfunc.spark_udf pattern mentioned above, if you haven’t seen it before:
import mlflow.pyfunc
from pyspark.sql.functions import struct
pyfunc_udf = mlflow.pyfunc.spark_udf(spark, model_uri=model_uri)
predicted_df = dataframe.withColumn("prediction", pyfunc_udf(struct('age', 'sex', 'bmi', 'bp', 's1', 's2', 's3', 's4', 's5', 's6')))
This retrieves a model from the MLflow tracking server and delivers it to the script as a vectorised Spark User Defined Function (UDF) (more info here). This is a well-proven pattern for scalable model inference in a batch or streaming pipeline and is compatible with many flavours of models. Take a look at the MLflow documentation for a comprehensive list. The only requirement is that you log the model using MLflow during your model training workflow.
The documentation also describes how to customise the wrapped model’s predict() method behaviour.
The preceding discussion of data science workflows and opportunities for employing Apache Spark assumed the development of analytical applications that are fed structured data, and the advice therein applies mostly to the use of ‘traditional’ machine learning algorithms. Furthermore, we assumed that model training is a once-and-done activity (i.e. no compute-intensive model tuning activity was required) and, provided the data fits in memory, this can be tackled in a ‘single-node’ development environment.
The following sections of this document describe the options available for using Apache Spark when these assumptions do not hold true. More details on each of these will be provided later in the MLOps Gym material.
The diagram above identifies some scenarios for parallelisation of the model training and tuning processes.
At this point, it is probably worth articulating the distinction between ‘model parallel’ and ‘data parallel’ training in the context of deep learning.
Another common pattern of task parallelisation in the context of model training is parallel hyperparameter tuning. This process requires the training of many candidate models in the search for an optimal performing set of hyperparameters.
We shall examine how Apache Spark can be used in all of these scenarios.
Apache Spark 3.5.0 saw the introduction of a DeepspeedTorchDistributor class within the pyspark.ml package, allowing for pipeline-parallel training of pytorch models. More details and an example notebook can be found in our docs (AWS | Azure | GCP).
The benefits of this toolset will be particularly apparent when available GPU memory is limited and the models being trained are very large. This class also allows for some degree of data parallelism during both training and inference time, which is particularly useful for batch inference applications.
Most often, we see this approach called for when customers are training models on datasets that comprise unstructured data. So why and how does our approach to efficient model training and evaluation change when we want to make predictions based on unstructured data: text, images etc?
By their nature, input datasets of this type are typically very large (e.g., libraries of several thousand images collected by borescope inspections of jet engine turbine blades), and deep-learning techniques will (usually) be best suited to tackling the tasks that use them.
Assuming that is the case, the training data will very often exceed the capacity of a single compute node and will require sharding across whatever compute target is being used to train the model. This is a problem that can also be solved fairly simply by using Apache Spark with the appropriate extension library for your desired workflow.
Here are some example patterns for doing this:
In general, these approaches require packaging existing, ‘single-node’ model training code into a function or class and wrapping these with some decorator or parent class that will handle the sharding of data and parallelisation of the model training and evaluation processes.
Data scientists often want to automate the process of determining the optimal hyperparameters for the algorithm responsible for producing their predictive models. There are several good options for parallelising these searches using Apache Spark, including:
The Databricks AutoML tools perform a parallelised search for an optimal model across multiple algorithms and a basic hyperparameter search space. Although these tools abstract away a lot of the complexity of a search operation, you still have control over aspects such as the model evaluation metric to be optimised, the stopping criteria of the search, and so on. As a starting point for the ‘modeling’ and ‘evaluation’ phases of your project, this is a great option since it provides information about what techniques are likely to perform well on your task and saves writing heaps of boilerplate code.
This is our go-to recommendation for performing parallel hyperparameter search since it allows you to take advantage of Hyperopt’s ability to refine the hyperparameter search space as the search progresses.
Some algorithms in scikit-learn (especially the tuning and cross-validation tools) let you specify a parallel backend. You can use the joblib-spark backend to use Apache Spark as that parallel backend. See the joblib-spark github page for an example.
In the course of this article, we have:
Next blog in this series: Getting started with version control in Databricks
You must be a registered user to add a comment. If you've already registered, sign in. Otherwise, register and sign in.