cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

Batch jobs suddenly slow down?

SebastianRowan
Contributor

What to do when sometimes batch jobs take way longer even though the data size hasnโ€™t changed. What causes this? And do you use any tool for that??

1 ACCEPTED SOLUTION

Accepted Solutions

nayan_wylde
Honored Contributor III

Some of the reasons that I can think of causing slowness are:

1. Cluster Resource Contention: Other jobs or users sharing the same cluster can compete for CPU, memory, or disk resources, leading to slowdowns. Autoscaling clusters may also take time to provision nodes, causing delays. 

Fix: Use dedicated clusters for critical jobs, enable autoscaling with a higher minimum node count, or isolate jobs with workload-specific pools.

2. Cluster Startup time: Sometime clusters take long time to start and when it needs to autoscale it might take time.

Fix: Consider moving to serverless or using instance pools.

3. Data Skew: Uneven data distribution across partitions can overload specific executors, causing some tasks to take much longer than others. This can vary run-to-run if data values shift slightly.

Fix: Repartition data or use techniques like salting for joins. Check with spark.sql.adaptive.enabled = true to enable Adaptive Query Execution (AQE).

4. Data Layout Issues: Small files or poor data organization in Delta tables can reduce data skipping efficiency, leading to variable scan times. Even with consistent data sizes, file fragmentation can worsen over time.

Fix: Run OPTIMIZE with Z-ordering to compact files and improve layout. Also consider liquid clustering since Z ordering is old technique for improving performance.

5. Query Plan Variability: The cost-based optimizer (CBO) may choose different execution plans due to outdated or missing table statistics, leading to inconsistent performance.

Fix: Regularly update stats with ANALYZE TABLE.

6. Garbage Collection or Memory Pressure: Excessive garbage collection (GC) in the JVM can pause executors, especially if memory is tight or data skew causes uneven memory usage.

Fix: Increase executor memory,spark.memory.fraction, or address skew.You can set spark.memory.fraction in your Databricks notebook or cluster configuration:

spark.conf.set("spark.memory.fraction", "0.75")

Some Analysis you can perform. Here are the code sample:

####List and Analyze Completed Runs for a Job (Last 24 Hours)
Compute durations and flag those >1 hour.
####################################

from databricks.sdk import WorkspaceClient
from databricks.sdk.service import jobs
import time

w = WorkspaceClient()

# Example: Replace with your job_id
job_id = 123456789  # Your job ID

# Time range: Last 24 hours (ms since epoch)
start_time_ms = int(time.time() * 1000) - (24 * 60 * 60 * 1000)
end_time_ms = int(time.time() * 1000)

# List completed runs
run_list = w.jobs.list_runs(
    job_id=job_id,
    completed_only=True,
    start_time_from=start_time_ms,
    start_time_to=end_time_ms,
    limit=100
)

long_runs = []
for run in run_list:
    if run.end_time and run.start_time:
        duration_ms = run.end_time - run.start_time
        duration_sec = duration_ms / 1000
        if duration_sec > 3600:  # >1 hour
            long_runs.append({
                'run_id': run.run_id,
                'duration_sec': duration_sec,
                'state': run.state.life_cycle_state,
                'result_state': run.state.result_state
            })

# Print or save to DataFrame
import pandas as pd
df_long_runs = pd.DataFrame(long_runs)
print(df_long_runs)

 

Enable Adaptive Query Execution (AQE): Handles skew dynamically.

spark.conf.set("spark.sql.adaptive.enabled", "true")

Tune Resources: Increase parallelism or memory.

spark.conf.set("spark.sql.shuffle.partitions", "400")
spark.conf.set("spark.executor.memory", "8g")

View solution in original post

8 REPLIES 8

BS_THE_ANALYST
Esteemed Contributor II

@SebastianRowan  this is interesting!

What compute do you have selected? If it's not serverless, it could be to do with start-up time of the cluster? 

You could always look into the Spark UI by clicking onto your cluster and having a look a little lower level (if you're not using serverless). You can look into the jobs/stages/tasks in there.

I'm interested to see what the community recommendations are as well ๐ŸคŒ

All the best,
BS

SebastianRowan
Contributor

Thanx for your reply! But what should I look for in the spark UI to spot cluster start up delays or other slowdown causes??

Hello @SebastianRowan!

Jobs may occasionally run slower due to factors such as cluster startup delays, data skew or partitioning issues, or overloaded resources. In the Spark UI, you can check for startup lag in the Jobs timeline, review GC time, as high GC can indicate memory pressure.
To spot cluster startup delays, look for a long gap between job submission and the start of any tasks or stages.

Check this out: Spark UI Guide. It provides a detailed overview of what and where to check when diagnosing performance issues.

SebastianRowan
Contributor

Thanks SO much!

WiliamRosa
Contributor

If batch jobs on Databricks suddenly slow down without a change in data volume, the root cause could be shifting resource contention, skewed data partitions, or Spark-level bottlenecks. Checking the Spark UIโ€™s Jobs and Stages views for imbalanced tasks, long-running stages, or high shuffle and garbage collection times can reveal hotspots. It may also help to right-size the cluster via dynamic allocation, address data skew or small-file issues, and review recent cluster usage. Enabling features like predictive optimization, compaction, and caching, or even shifting to serverless compute to bypass cold-start delays, can also restore performance consistency.

Wiliam Rosa
Data Engineer | Machine Learning Engineer
LinkedIn: linkedin.com/in/wiliamrosa

Did the slowdown happen when the cluster was scaling or running other jobs, or was it just the same tasks running slower?

nayan_wylde
Honored Contributor III

Some of the reasons that I can think of causing slowness are:

1. Cluster Resource Contention: Other jobs or users sharing the same cluster can compete for CPU, memory, or disk resources, leading to slowdowns. Autoscaling clusters may also take time to provision nodes, causing delays. 

Fix: Use dedicated clusters for critical jobs, enable autoscaling with a higher minimum node count, or isolate jobs with workload-specific pools.

2. Cluster Startup time: Sometime clusters take long time to start and when it needs to autoscale it might take time.

Fix: Consider moving to serverless or using instance pools.

3. Data Skew: Uneven data distribution across partitions can overload specific executors, causing some tasks to take much longer than others. This can vary run-to-run if data values shift slightly.

Fix: Repartition data or use techniques like salting for joins. Check with spark.sql.adaptive.enabled = true to enable Adaptive Query Execution (AQE).

4. Data Layout Issues: Small files or poor data organization in Delta tables can reduce data skipping efficiency, leading to variable scan times. Even with consistent data sizes, file fragmentation can worsen over time.

Fix: Run OPTIMIZE with Z-ordering to compact files and improve layout. Also consider liquid clustering since Z ordering is old technique for improving performance.

5. Query Plan Variability: The cost-based optimizer (CBO) may choose different execution plans due to outdated or missing table statistics, leading to inconsistent performance.

Fix: Regularly update stats with ANALYZE TABLE.

6. Garbage Collection or Memory Pressure: Excessive garbage collection (GC) in the JVM can pause executors, especially if memory is tight or data skew causes uneven memory usage.

Fix: Increase executor memory,spark.memory.fraction, or address skew.You can set spark.memory.fraction in your Databricks notebook or cluster configuration:

spark.conf.set("spark.memory.fraction", "0.75")

Some Analysis you can perform. Here are the code sample:

####List and Analyze Completed Runs for a Job (Last 24 Hours)
Compute durations and flag those >1 hour.
####################################

from databricks.sdk import WorkspaceClient
from databricks.sdk.service import jobs
import time

w = WorkspaceClient()

# Example: Replace with your job_id
job_id = 123456789  # Your job ID

# Time range: Last 24 hours (ms since epoch)
start_time_ms = int(time.time() * 1000) - (24 * 60 * 60 * 1000)
end_time_ms = int(time.time() * 1000)

# List completed runs
run_list = w.jobs.list_runs(
    job_id=job_id,
    completed_only=True,
    start_time_from=start_time_ms,
    start_time_to=end_time_ms,
    limit=100
)

long_runs = []
for run in run_list:
    if run.end_time and run.start_time:
        duration_ms = run.end_time - run.start_time
        duration_sec = duration_ms / 1000
        if duration_sec > 3600:  # >1 hour
            long_runs.append({
                'run_id': run.run_id,
                'duration_sec': duration_sec,
                'state': run.state.life_cycle_state,
                'result_state': run.state.result_state
            })

# Print or save to DataFrame
import pandas as pd
df_long_runs = pd.DataFrame(long_runs)
print(df_long_runs)

 

Enable Adaptive Query Execution (AQE): Handles skew dynamically.

spark.conf.set("spark.sql.adaptive.enabled", "true")

Tune Resources: Increase parallelism or memory.

spark.conf.set("spark.sql.shuffle.partitions", "400")
spark.conf.set("spark.executor.memory", "8g")

SebastianRowan
Contributor

Thanks for the AMAZING response!

Join Us as a Local Community Builder!

Passionate about hosting events and connecting people? Help us grow a vibrant local communityโ€”sign up today to get started!

Sign Up Now