This behavior suggests a significant difference in configuration or resource access between your Databricks serverless SQL warehouse, single-node cluster, and multi-node Spark cluster. The issue is not with SQL syntax or table access itself, since those are proven to work elsewhere—it is specific to the multi-node Spark cluster context.
Key Points from Your Scenario
-
Serverless SQL Warehouse: Query executes in seconds.
-
Single-Node Cluster: Query also executes in seconds.
-
Multi-Node Cluster: Query never completes (even after 60 minutes).
Likely Causes
1. Table Format and Underlying Storage
Databricks tables may be managed as Delta Lake tables (transactional Parquet) or as external tables (possibly non-optimized formats). If your table's physical files are not partitioned or are stored as a single large file, Spark's distributed computation may actually be slower due to overhead or skew.
-
If the table is stored in a single Parquet file, a multi-node cluster could have resource contention for the single file, causing worker nodes to wait on file access or data splits.
-
Large, unpartitioned tables can cause parallel reads to be ineffective.
2. Data Skew or Small File Problem
Distributed clusters work best when data can be split into multiple partitions. If there are not enough input splits, only a small subset of executors will do the work while others wait. This leads to cluster underutilization and a "spinning" effect.
-
Check the number of partitions your table/files have.
-
If running count(*), check the Spark job DAG to see if tasks are being created for each partition.
3. Cluster Configuration
With serverless and single-node clusters, all work is executed in one process, but with a multi-node cluster, tasks are distributed among worker nodes. If your table is restricted by IP, credential passthrough, or by restrictive mount points, the workers may not have data access while the driver does.
-
Check mount points and data access for all worker nodes, not just the driver.
-
Credentials and IAM permissions should be checked for all nodes.
4. Shuffle or Query Plan Issue
Count operations can sometimes trigger large shuffles in Spark if each executor produces a large intermediate result. However, for basic count(*) on a simple table, this should not normally be a bottleneck unless associated with a shuffle or large data movement.
5. Resource Limits
Worker nodes may be resource starved (low memory, small disk for temp spill, etc.) compared to the resources available to a driver-only, single-node cluster.
6. Table Caching or Auto-Optimize Differences
SQL warehouse and single-node environments may have cached or auto-tuned the table, while distributed Spark may not benefit from those until the first run.
Debugging and Solutions
-
Check Spark DAG: In Spark UI (on your multi-node cluster), see if the job is progressing, stuck initializing, or all tasks are pending.
-
Partitioning: Run spark.read.table('schema.tbl_name').rdd.getNumPartitions() in the notebook to see how many partitions exist.
-
File Structure: Check if the table's underlying storage (Parquet or Delta files) is a single file or many files.
-
Permissions: Ensure all cluster nodes can access the data source equally.
-
Logs and Metrics: Examine logs for exceptions, warnings, or timeouts (especially storage/network related).
-
Run Smaller Sample: Try limiting to a small subset, e.g., select count(*) from schema.tbl_name limit 1000 and see if the issue persists.
Suggested Next Steps
-
Investigate file structure and partitioning of the underlying table data.
-
Use Spark UI to analyze the execution plan.
-
Confirm worker access to the data source.
-
Consider repartitioning or optimizing the storage format if partitions are insufficient or if there’s a single large file.