nayan_wylde
Esteemed Contributor II

Some of the reasons that I can think of causing slowness are:

1. Cluster Resource Contention: Other jobs or users sharing the same cluster can compete for CPU, memory, or disk resources, leading to slowdowns. Autoscaling clusters may also take time to provision nodes, causing delays. 

Fix: Use dedicated clusters for critical jobs, enable autoscaling with a higher minimum node count, or isolate jobs with workload-specific pools.

2. Cluster Startup time: Sometime clusters take long time to start and when it needs to autoscale it might take time.

Fix: Consider moving to serverless or using instance pools.

3. Data Skew: Uneven data distribution across partitions can overload specific executors, causing some tasks to take much longer than others. This can vary run-to-run if data values shift slightly.

Fix: Repartition data or use techniques like salting for joins. Check with spark.sql.adaptive.enabled = true to enable Adaptive Query Execution (AQE).

4. Data Layout Issues: Small files or poor data organization in Delta tables can reduce data skipping efficiency, leading to variable scan times. Even with consistent data sizes, file fragmentation can worsen over time.

Fix: Run OPTIMIZE with Z-ordering to compact files and improve layout. Also consider liquid clustering since Z ordering is old technique for improving performance.

5. Query Plan Variability: The cost-based optimizer (CBO) may choose different execution plans due to outdated or missing table statistics, leading to inconsistent performance.

Fix: Regularly update stats with ANALYZE TABLE.

6. Garbage Collection or Memory Pressure: Excessive garbage collection (GC) in the JVM can pause executors, especially if memory is tight or data skew causes uneven memory usage.

Fix: Increase executor memory,spark.memory.fraction, or address skew.You can set spark.memory.fraction in your Databricks notebook or cluster configuration:

spark.conf.set("spark.memory.fraction", "0.75")

Some Analysis you can perform. Here are the code sample:

####List and Analyze Completed Runs for a Job (Last 24 Hours)
Compute durations and flag those >1 hour.
####################################

from databricks.sdk import WorkspaceClient
from databricks.sdk.service import jobs
import time

w = WorkspaceClient()

# Example: Replace with your job_id
job_id = 123456789  # Your job ID

# Time range: Last 24 hours (ms since epoch)
start_time_ms = int(time.time() * 1000) - (24 * 60 * 60 * 1000)
end_time_ms = int(time.time() * 1000)

# List completed runs
run_list = w.jobs.list_runs(
    job_id=job_id,
    completed_only=True,
    start_time_from=start_time_ms,
    start_time_to=end_time_ms,
    limit=100
)

long_runs = []
for run in run_list:
    if run.end_time and run.start_time:
        duration_ms = run.end_time - run.start_time
        duration_sec = duration_ms / 1000
        if duration_sec > 3600:  # >1 hour
            long_runs.append({
                'run_id': run.run_id,
                'duration_sec': duration_sec,
                'state': run.state.life_cycle_state,
                'result_state': run.state.result_state
            })

# Print or save to DataFrame
import pandas as pd
df_long_runs = pd.DataFrame(long_runs)
print(df_long_runs)

 

Enable Adaptive Query Execution (AQE): Handles skew dynamically.

spark.conf.set("spark.sql.adaptive.enabled", "true")

Tune Resources: Increase parallelism or memory.

spark.conf.set("spark.sql.shuffle.partitions", "400")
spark.conf.set("spark.executor.memory", "8g")

View solution in original post