Hi @rcostanza
You're facing a common issue with autoscaling clusters and cached data locality.
There are several approaches to address this:
Preventing Downscaling During Execution
1. Disable Autoscaling Temporarily
- You can disable autoscaling programmatically at the start of your notebook and re-enable it at the end:
# At the beginning of your notebook
dbutils.fs.put("/databricks/driver/conf/spark-defaults.conf",
"spark.databricks.cluster.autoScale.enabled false")
# Your data processing code here
# At the end (optional - re-enable autoscaling)
dbutils.fs.put("/databricks/driver/conf/spark-defaults.conf",
"spark.databricks.cluster.autoScale.enabled true")
2. Use Cluster Policies
Create a cluster policy that sets more conservative autoscaling parameters:
- Increase the idle timeout before downscaling
- Set minimum workers to 2 if your workload consistently needs both
- Configure spark.databricks.cluster.autoScale.idleTimeBeforeShutDown to a higher value
Tuning Autoscaling Behavior
Configuration Options:
# Set these in your cluster's Spark config or advanced options
spark.databricks.cluster.autoScale.idleTimeBeforeShutDown 20m # Default is usually 10m
spark.databricks.cluster.autoScale.minIdleTimeBeforeShutDown 10m
Alternative Approaches
1. Use .cache() with Storage Levels
from pyspark import StorageLevel
- # Cache with replication across nodes
- df.cache().persist(StorageLevel.MEMORY_AND_DISK_2)
2. Strategic Checkpointing
Instead of localCheckpoint(), use regular .checkpoint() which writes to distributed storage:
# Set checkpoint directory first
spark.sparkContext.setCheckpointDir("/path/to/checkpoint/dir")
# Then checkpoint your dataframes
df_cached = df.checkpoint()
3. Repartition Before Caching
# Ensure data is distributed across available executors
df_cached = df.repartition(4).localCheckpoint() # Adjust partition count as needed
Recommended Solution
For your use case, I'd recommend:
1. Short term: Set a longer idle timeout in your cluster configuration (20-30 minutes instead of the default)
2. Long term: Switch to .checkpoint() for critical dataframes that you can't afford to lose, and use .cache() with replication for less critical ones
LR