Welcome to the fourth instalment of our blog series exploring Databricks Workflows, a powerful product for orchestrating data processing, machine learning, and analytics pipelines on the Databricks Data Intelligence Platform.
In our previous blog: Basics of Databricks Workflows - Part 3: The Compute Spectrum, we delved into the various Databricks compute options and shared guidelines for configuring resources based on the workloads. In this blog, we'll delve into cluster reuse, a crucial feature that enhances resource utilization and streamlines workflow execution.
Cluster reuse in Databricks Workflows is a feature that allows for more efficient use of compute resources when running multiple tasks within a job leading to cost-savings.
Previously, each task within a Databricks job would spin up its own cluster, adding time and cost overhead due to cluster startup times and potential underutilization during task execution. Now, with cluster reuse, a single cluster can be shared across multiple tasks within the same job. This provides better and more efficient resource utilization and minimizes the time taken for tasks to commence. In other words, by utilizing cluster reuse, you can reduce the overhead of creating and terminating clusters within a job.
Let’s explore this feature in more detail by revisiting the use-case outlined in one of our earlier blog posts in this series: Basics of Databricks Workflows - Part 1: Creating your pipeline.
Example Use Case: Product Recommender
A retailer aims to create a personalized shopping experience by implementing a recommender system. The Data Engineering team has established a robust data pipeline, while the Data Analysts utilize dashboards and SQL reports to gain insights into customer behavior. The Data Science and ML team focuses on dynamic profiling, model retraining, and real-time recommendations.
Below we explain how cluster reuse is being leveraged by the three sections in the example workflow: data engineering, data warehousing & BI, and machine learning.
Above we showcased one way to utilize cluster reuse. As shown, it gives us flexibility to create a tailored cluster as per the needs of a group or all of the tasks.
You can configure tailored clusters and reuse them depending on task types like:
When using the control flow “Run Job” as a task, you call another workflow from the current workflow job. In this case, cluster re-use cannot be leveraged from the parent workflow. The child workflow will have its own cluster.
Unlock the full potential of Databricks Workflows with the myriad of advantages offered by adopting cluster reuse:
Below are a few scenarios where cluster reuse will help improve cost and performance efficiency:
Before the cluster reuse feature was introduced, the multi-task jobs used All-purpose clusters to achieve zero startup time for subsequent tasks. This introduced some challenges. The below table marks the challenges that are faced when using all-purpose clusters and demonstrates how re-using job clusters can resolve them.
|
Option 1 |
Option 2 |
Cost |
Higher |
Lower (Job cluster rate) |
Isolation from other tasks in job |
None |
No isolation for tasks on same cluster Note: With Unity Catalog, isolation within tasks can be achieved by setting the right security mode. |
Isolations from other jobs |
No |
Yes |
Note: You can use other orchestrators to run jobs in Databricks such as Azure Data Factory (ADF), Airflow, etc. but these tools cannot leverage the job cluster reuse feature. |
To modify tasks to employ cluster reuse in an existing Databricks Workflow pipeline, follow these steps:
You can automate operations in Databricks accounts, workspaces, and related resources with the Databricks SDK for Python. The sample code below shows you how you can create a workflow and configure it to reuse a job cluster.
It configures two ingestion tasks, Ingestion_Silver and Ingestion_Gold, which reuse the same job cluster called ‘ingestion_job_cluster’. And two ML tasks, ML_Feature_Engineering and ML_Batch_Scoring, reuse another one called ‘ml_job_cluster’. Also notice that ingestion cluster is utilizing an existing policy and ML cluster defines its own custom configuration.
Code summary
Order |
Workflow Task |
Cluster |
Cluster Configuration & Policy |
1 |
Ingestion Silver |
Ingestion_Job_Cluster |
Existing cluster policy ID: E0631F5C0D0006D6 |
2 |
Ingestion Gold |
||
3 |
ML Feature Engineering |
ML_Job_Cluster |
Newly defined cluster Latest DBR Autoscale: 1-2 nodes |
4 |
ML Batch Scoring |
# Installed current latest version: 0.17.0
!pip install --upgrade databricks-sdk==0.17.0
# Import
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.compute import ClusterSpec, AutoScale
from databricks.sdk.service.jobs import Task, NotebookTask, JobCluster, TaskDependency
# Create client
w = WorkspaceClient(
host='https://xxxxxxxxxx.cloud.databricks.com/',
token= 'dapixxxxxxxxxxxxxxxxxx')
# Notebook path
base_path = f'/Users/{w.current_user.me().user_name}'
ingestion_silver = f'{base_path}/Ingestion_Silver'
ingestion_gold = f'{base_path}/Ingestion_Gold'
ml_featurisation = f'{base_path}/ML_Featurisation'
ml_scoring = f'{base_path}/ML_Scoring'
# Cluster spec example using custom config
latest_runtime = w.clusters.select_spark_version(latest=True)
cluster_spec_custom = ClusterSpec(
node_type_id='i3.xlarge',
num_workers=2,
spark_version=latest_runtime,
autoscale=AutoScale(
min_workers=1,
max_workers=2
))
# Cluster spec example using cluster policy
cluster_spec_using_policy = ClusterSpec(
policy_id="E0631F5C0D0006D6",
apply_policy_default_values=True
)
# Create job
created_job = w.jobs.create(
name=f'product_recommendation_pipeline',
tasks=[
# Define all tasks
Task(description="Ingestion_Silver",
# Use job cluster using user defined key name
job_cluster_key="ingestion_job_cluster",
notebook_task=NotebookTask(notebook_path=ingestion_silver),
task_key="Silver",
timeout_seconds=0),
Task(description="Ingestion_Gold",
depends_on=[TaskDependency("Silver")],
# Reuse cluster by referring to the same cluster key
job_cluster_key="ingestion_job_cluster",
notebook_task=NotebookTask(notebook_path=ingestion_gold),
task_key="Gold",
timeout_seconds=0,
),
Task(description="ML_Feature_Engineering",
depends_on=[TaskDependency("Gold")],
# Job cluster for ML loads
job_cluster_key="ml_job_cluster",
notebook_task=NotebookTask(notebook_path=ml_featurisation),
task_key="ML_Featurisation",
timeout_seconds=0,
),
Task(description="ML_Batch_Scoring",
depends_on=[TaskDependency("ML_Featurisation")],
# Reuse ML job cluster
job_cluster_key="ml_job_cluster",
notebook_task=NotebookTask(notebook_path=ml_scoring),
task_key="ML_Scoring",
timeout_seconds=0,
)
],
# Optionally add multiple job cluster config to use in various tasks
job_clusters=[
JobCluster(
job_cluster_key="ingestion_job_cluster",
new_cluster=cluster_spec_using_policy
),
JobCluster(
job_cluster_key="ml_job_cluster",
new_cluster=cluster_spec_custom
)
])
While cluster reuse is generally beneficial, there are some factors to consider:
Cluster reuse within Databricks Workflows represents a significant stride towards optimizing resource utilization and curtailing cloud costs. By allowing multiple tasks within a job to share a single cluster, organizations can eliminate the overhead associated with spinning up new clusters for each task, thereby streamlining their data pipelines and accelerating job execution. This efficient allocation of resources not only leads to quicker insights and decision-making but also translates into tangible cost savings, especially for jobs that run frequently or require similar resource configurations.
By understanding when and how to employ cluster reuse—taking into account factors like workload similarity, execution frequency, and task isolation—teams can maximize the benefits while maintaining the integrity and performance of their data workflows. With right planning, cluster reuse can be a powerful lever in the pursuit of efficient and cost-effective data operations on the Databricks platform.
You must be a registered user to add a comment. If you've already registered, sign in. Otherwise, register and sign in.