cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

DataFrame.localCheckpoint() and cluster autoscaling at odds with each other

rcostanza
New Contributor III

I have a notebook where at the beginning I load several dataframes and cache them using localCheckpoint(). I run this notebook using an all-purpose cluster with autoscaling enabled, with a mininum of 1 worker and maximum 2.

The cluster often autoscales from 1 to 2 during the beginning of the execution, before the dataframes are cached, but during some long running tasks halfway through it often downscales from 2 to 1 if there are no other notebooks running in parallel, sometimes within 5min of bringing that worker up. And since an executor was lost, so were all checkpoints done there, which will cause errors later on once I try to use those dataframes.

I understand the alternative is to resort to .checkpoint() instead. But before trying that, is there a way to prevent cluster downscaling during the execution? Or maybe a way to tune it so it won't downscale within X minutes of scaling up?

1 ACCEPTED SOLUTION

Accepted Solutions

lingareddy_Alva
Honored Contributor II

Hi @rcostanza 

You're facing a common issue with autoscaling clusters and cached data locality.
There are several approaches to address this:

Preventing Downscaling During Execution
1. Disable Autoscaling Temporarily
- You can disable autoscaling programmatically at the start of your notebook and re-enable it at the end:

# At the beginning of your notebook
dbutils.fs.put("/databricks/driver/conf/spark-defaults.conf",
"spark.databricks.cluster.autoScale.enabled false")

# Your data processing code here

# At the end (optional - re-enable autoscaling)
dbutils.fs.put("/databricks/driver/conf/spark-defaults.conf",
"spark.databricks.cluster.autoScale.enabled true")


2. Use Cluster Policies
Create a cluster policy that sets more conservative autoscaling parameters:
- Increase the idle timeout before downscaling
- Set minimum workers to 2 if your workload consistently needs both
- Configure spark.databricks.cluster.autoScale.idleTimeBeforeShutDown to a higher value

Tuning Autoscaling Behavior
Configuration Options:
# Set these in your cluster's Spark config or advanced options
spark.databricks.cluster.autoScale.idleTimeBeforeShutDown 20m # Default is usually 10m
spark.databricks.cluster.autoScale.minIdleTimeBeforeShutDown 10m

Alternative Approaches
1. Use .cache() with Storage Levels
from pyspark import StorageLevel
- # Cache with replication across nodes
- df.cache().persist(StorageLevel.MEMORY_AND_DISK_2)

2. Strategic Checkpointing
Instead of localCheckpoint(), use regular .checkpoint() which writes to distributed storage:
# Set checkpoint directory first
spark.sparkContext.setCheckpointDir("/path/to/checkpoint/dir")
# Then checkpoint your dataframes
df_cached = df.checkpoint()

3. Repartition Before Caching
# Ensure data is distributed across available executors
df_cached = df.repartition(4).localCheckpoint() # Adjust partition count as needed

Recommended Solution
For your use case, I'd recommend:
1. Short term: Set a longer idle timeout in your cluster configuration (20-30 minutes instead of the default)
2. Long term: Switch to .checkpoint() for critical dataframes that you can't afford to lose, and use .cache() with replication for less critical ones

 

LR

View solution in original post

1 REPLY 1

lingareddy_Alva
Honored Contributor II

Hi @rcostanza 

You're facing a common issue with autoscaling clusters and cached data locality.
There are several approaches to address this:

Preventing Downscaling During Execution
1. Disable Autoscaling Temporarily
- You can disable autoscaling programmatically at the start of your notebook and re-enable it at the end:

# At the beginning of your notebook
dbutils.fs.put("/databricks/driver/conf/spark-defaults.conf",
"spark.databricks.cluster.autoScale.enabled false")

# Your data processing code here

# At the end (optional - re-enable autoscaling)
dbutils.fs.put("/databricks/driver/conf/spark-defaults.conf",
"spark.databricks.cluster.autoScale.enabled true")


2. Use Cluster Policies
Create a cluster policy that sets more conservative autoscaling parameters:
- Increase the idle timeout before downscaling
- Set minimum workers to 2 if your workload consistently needs both
- Configure spark.databricks.cluster.autoScale.idleTimeBeforeShutDown to a higher value

Tuning Autoscaling Behavior
Configuration Options:
# Set these in your cluster's Spark config or advanced options
spark.databricks.cluster.autoScale.idleTimeBeforeShutDown 20m # Default is usually 10m
spark.databricks.cluster.autoScale.minIdleTimeBeforeShutDown 10m

Alternative Approaches
1. Use .cache() with Storage Levels
from pyspark import StorageLevel
- # Cache with replication across nodes
- df.cache().persist(StorageLevel.MEMORY_AND_DISK_2)

2. Strategic Checkpointing
Instead of localCheckpoint(), use regular .checkpoint() which writes to distributed storage:
# Set checkpoint directory first
spark.sparkContext.setCheckpointDir("/path/to/checkpoint/dir")
# Then checkpoint your dataframes
df_cached = df.checkpoint()

3. Repartition Before Caching
# Ensure data is distributed across available executors
df_cached = df.repartition(4).localCheckpoint() # Adjust partition count as needed

Recommended Solution
For your use case, I'd recommend:
1. Short term: Set a longer idle timeout in your cluster configuration (20-30 minutes instead of the default)
2. Long term: Switch to .checkpoint() for critical dataframes that you can't afford to lose, and use .cache() with replication for less critical ones

 

LR

Join Us as a Local Community Builder!

Passionate about hosting events and connecting people? Help us grow a vibrant local community—sign up today to get started!

Sign Up Now