Ray on Databricks became generally available in April, 2024, and in a little over a year, we have seen 1,000+ customers initialize hundreds of thousands of Ray clusters. While there are several ways to run production-grade Ray workloads on Databricks, one of the most popular methods is via “Ray on Spark”, an approach that starts a Ray cluster on top of a highly scalable and managed Databricks cluster.
The integration of Ray and Spark offers a robust solution for the scalable processing of data and training of machine learning models. At the same time, sophisticated engineers concurrently utilizing Spark and Ray must consider resource balance between these frameworks. Specifically in Databricks, multi-node classic compute clusters (i.e., non-serverless) are initialized with Spark, and a common question arises: “How should I configure my cluster for a particular Ray workload?”
This blog will cover four key areas: 1) a concise overview of the Ray on Spark architecture, 2) process for setting up a Ray cluster, 3) illustrative use cases and configurations, 4) practical considerations when working with Ray in Databricks.
Ray on Spark means Ray is launched on top of the Spark cluster. This is the default configuration when running multi-node Ray on Databricks’ classic compute. In this section we’ll provide a brief overview of the Ray on Spark cluster.
FIgure 1: Ray on Spark architecture. Spark components and Ray components are denoted in orange and blue, respectively. Note the possible configurations of an isolated Ray worker (top right) and hybrid Ray and Spark worker (bottom right).
The figure above illustrates a simplified representation of a Ray on Spark cluster. Ray sits on top of a Spark cluster, where each Spark executor initiates one task to facilitate the Ray worker or driver nodes. Spark manages the underlying compute infrastructure (node failover, adding/removing executors, autoscaling behavior) while Ray handles task scheduling. For example, autoscaling of Ray workers is completed through the combination of Spark’s cluster manager to spin up/down nodes (shown above) and the Spark Context to start new executors housing Ray workers (not shown). Note that a Spark executor can allocate all of its compute resources to a task for a Ray worker (top right worker node) or partially for Spark and Ray in a hybrid worker (bottom right worker node).
There are two major developer considerations when using the Ray on Spark architecture. First, Ray worker nodes scale within the constraints of the Spark executors. For example, a Ray task cannot be allocated more memory than the underlying Spark process. Secondly, in standard Ray clusters Ray head nodes can have worker processes and raylets that can be assigned tasks. However, since Ray head nodes are stashed on the Spark driver node, where the primary goal is orchestration and coordination, these computations are disabled by default. Explicitly setting the head node CPUs and GPUs count will re-enable this process back but, if not managed properly, allocating these resources to Ray and then running Spark operations after will starve Ray and kill the Ray cluster.
Before setting up your Ray on Spark cluster, ensure you have a classic compute resource with Databricks Runtime 12.2 LTS ML and above and Dedicated (formerly single user) or no isolation shared access modes.
The primary way to set up the Ray on Spark cluster is via the Python API `setup_ray_cluster` within a Databricks Python REPL. This function has various parameters but it can be summarized as explicitly defining the number (minimum and maximum) and the compute (CPUs, GPUs, or both) for Ray nodes (head or worker). Below is a sample Python setup and the Ray Cluster dashboard that’s generated from it:
from ray.util.spark import setup_ray_cluster, shutdown_ray_cluster
setup_ray_cluster(
min_worker_nodes=2,
max_worker_nodes=4,
num_cpus_worker_node=8,
num_gpus_worker_node=1,
num_cpus_head_node=8,
num_gpus_head_node=1,
collect_log_to_path="/Volumes/mycatalog/myschema/myvolume/logs"
)
# Perform your Ray tasks
# ...
shutdown_ray_cluster()
Figure 2: A snapshot of the Ray cluster dashboard provided after running the above setup command. Note the presence of GPU and GRAM usage since we've set up a Ray on Spark GPU cluster.
Below, we’ve devised a mental model for how you should think about and set up the Ray on Spark cluster based on your available compute and use-case.
Figure 3: Mental model for setting up auto-scaling vs. fixed-size cluster.
Autoscaling clusters provide the benefit of scaling up/down due to increased/decreased compute demand, respectively. This is especially helpful if the data volumes per run are unknown or if the execution of your tasks are highly variable and you need to maintain a certain level of task throughput or performance. If you have autoscaling enabled, set the min_worker_nodes and max_worker_nodes for Ray equal to the minimum and maximum Spark workers, respectively.
Fixed-size clusters are preferred if you have very stable and predictable workloads that need to be completed over a set period of time. This has the added benefit of predictable costs. If you are using a fixed size Spark cluster, set the min_worker_nodes and max_worker_nodes equal to the max Spark workers.
Figure 4: Mental model for setting up GPU clusters.
GPUs significantly outperform CPUs in tasks requiring increased floating-point operations per second (FLOPS). This is particularly crucial for matrix multiplication, a fundamental operation in deep learning algorithms. As a result, when all other factors are equal, GPUs can accelerate the training of deep learning models by 10 to 30 times compared to CPUs. A Ray cluster equipped with GPUs can be used to parallelize hyperparameter tuning or perform distributed training (i.e. distributed data parallel) for a deep learning model.
Figure 5: Mental model for setting up a hybrid Ray and Spark cluster.
Does your workload require Spark? If you’re not sure, check out the following section titled “Balancing resources between Spark + Ray” to see whether your workload requires Spark or not.
To reserve a certain amount of compute for Spark in a hybrid cluster, you can either allocate it at the cluster-level (recommended approach) or the worker-level. At the cluster level, simply set the max number of Ray workers to less than the total number of workers available. This will reserve the remaining workers to Spark. For example, if you have 8 workers in your cluster, setting max_workers_nodes in setup_ray_cluster to 6, will reserve 2 workers for Spark.
If you allocate at the worker level, you can reserve portions of compute on a node for Spark. For example, setting Ray worker node CPU to 12 out of 16 total available cores reserves 4 CPUs for Spark. We recommend setting it at the cluster level to reduce the overhead in spinning up multiple Spark tasks within an executor.
Regardless of which approach you choose, do not define the compute for the Ray head node in a hybrid cluster. By default, the head node is placed onto the Spark driver node, a critical orchestrator and scheduler for Spark tasks, and allocating this necessary compute to Ray will crash the Ray cluster when Spark tries to perform tasks. To accomplish this, simply remove the optional parameters num_cpus_head_node and num_gpus_head_node from the setup_ray_cluster command.
OPTIONAL: Run this helper class to populate the parameters for setup_ray_cluster for a cluster in your Databricks workspace. Linked here.
OPTIONAL: By default, setup_ray_cluster provides a minimal dashboard that lets you view your cluster (i.e. head nodes, worker nodes, and worker processes). However, if you’d like to set up advanced monitoring with all the bells whistles using Grafana and Prometheus, check out this technical blog.
Under what circumstances should I configure a cluster to use Ray or Ray and Spark together? On Databricks, there are three major patterns to consider: isolated Ray and Spark, Hybrid Cluster, Ray on Serverless GPU Compute.
We can eliminate complications with resource sharing in a design pattern where Ray work and Spark work are isolated. This is the simplest pattern and a majority of workloads fall within this pattern. Common Ray operations you’d perform here include parallelizing hyperparameter tuning or distributing training for a deep learning model and logging the resulting model, metrics, and artifacts to MLflow. Furthermore, the beauty of Ray on Spark is that it handles auto-scaling and is tightly coupled with the rest of the Lakehouse platform (e.g., MLflow, Unity Catalog).
If you have a small dataset that can fit in memory on a single-node, you can simply read in a dataset from Delta Lake as a Spark Dataframe, convert it to a Pandas DataFrame, and operate on that in Ray. If your dataset is too large to fit in memory and you want to leverage the benefits of a distributed dataset in Ray, you can read directly from Unity Catalog via a SQL Warehouse (most recommended), credential vending, or Delta sharing after initializing the Ray cluster. This is the recommended route, if you can externalize large Spark data transformations out of the workflow.
Figure 6: Example workflow where Spark and Ray work is isolated.
For example, if you're using a Workflow, an example design dedicates Task 1 to a Spark cluster for data preparation and Task 2 to a Ray on Spark cluster solely dedicated to Ray for reading that prepared dataset from Unity Catalog in order to run model training (e.g., parallelize hyperparameter search or to perform distributed training).
Here are some examples from our Industry Solutions repository for hyperparameter search with Ray and Optuna for multiple machine learning algorithms, XGBoost, and deep learning with GPUs.
Combining Ray and Spark within the same cluster offers the advantage of leveraging both Spark's data parallelism and Ray's task parallelism. Specifically, Spark excels at reading from streaming sources like Kafka and cloud storage, efficiently handling Delta/Iceberg tables read and writes, and providing robust distributed fault tolerance for those datasets. On the other hand, Ray's task parallelism offers distinct advantages when it comes to fine-grained, independent computation and heterogeneous workloads. These distinct advantages for Ray are highlighted in hyperparameter tuning, parallelized training and inference, simulation and optimization, and distributed computing for machine learning workloads.
The benefits of running a hybrid cluster are seamless data transfer between Spark and Ray without the storage overhead (i.e., data locality) and simplified operations with only one compute cluster that can be dynamically allocated.
The most common pattern combining the two is leveraging Spark for data handling (i.e., read from and write results back to Delta Lake or tabular data transforms) and Ray for parallelized or distributed computations. Specifically, this involves reading a Spark Dataframe as a Ray Dataset, performing a user-defined function (Ray Tasks and Actors) on each row, batch, or group in the dataset, and writing the results back to Delta Lake through Spark. For example, a use-case could be generating forecasts for each store x SKU combination or performing batch/streaming object detection using a vision-language model on frames of a video in parallel. See this example for forecasting at scale, batch inference for speech detection, or batch inference with videos.
Note, as mentioned above, allocating resources (i.e., CPU/GPU) to the Ray head node during cluster setup will enable computations in the Spark Driver node. This will essentially starve the Ray cluster when Spark operations try to run, causing the Ray cluster to crash. We recommend NOT allocating resources to the Ray head node in a hybrid cluster. (i.e., do not set num_cpus_head_node and num_gpus_head_node in setup_ray_cluster).
Serverless GPU Compute (SGC) provides the benefits of fast start times and easy access to a highly demanded resource, GPUs, with seamless integrations across Databricks (e.g., mlflow, Unity Catalog, etc.). In SGC, Ray runs without the Spark overhead. The main usage is to simply connect your notebook to GPU compute and your Python REPL runs directly on GPU machines, allowing you to use GPUs with your Ray code via a simple decorator. This can support both H100 and A10s in both single or multi-node configurations.
For more information about SGC, please see docs: AWS | Azure
Ray datasets are critical for performing distributed training or parallelizing tasks across the cluster using Ray. Here are several methods to get data from Unity Catalog to a Ray Dataset, ordered from most recommended to least:
One of the primary use-cases for Ray within Databricks is parallelizing and performing distributed training of machine-learning models. However, to operationalize these models we require proper experiment tracking, model registration, and serving. MLflow makes it easy to operationalize machine learning models, but there is a lift required to set this up when running Ray in Databricks. This is because Ray workers do not have the appropriate access to the underlying MLflow tracking server. To resolve that, we need to explicitly define access.
To ensure each of the Ray actors have access to the underlying MLflow experiment, set the DATABRICKS_HOST and DATABRICKS_TOKEN (your personal access token) prior to Ray cluster launch. Consider using Databricks Secrets Management when storing and accessing sensitive variables.
import os
from ray.util.spark import setup_ray_cluster
from ray.air.integrations.mlflow import MLflowLoggerCallback, setup_mlflow
os.environ["DATABRICKS_HOST"] = "https://....databricks.com"
os.environ["DATABRICKS_TOKEN"] = "<your PAT token"
setup_ray_cluster(
num_cpus_worker_node=4,
num_gpus_worker_node=0,
max_worker_nodes=1,
min_worker_nodes=1
)
After you’ve set up the cluster, you can leverage MLflowLoggerCallback for use with Ray Tune or Ray Train at the head nodes or, if you need to log metrics/artifacts/figures in the low-level trainables, initialize an MLflow session with setup_mlflow and log accordingly.
We’d like to specifically thank Ben Wilson, Puneet Jain, Sri Tikkireddy, and Weichen Xu for their invaluable inputs to the engineering diagrams and best practices in the field.
You must be a registered user to add a comment. If you've already registered, sign in. Otherwise, register and sign in.