Hi,
The reason your performance degrades so badly (4 mins for 2k tables, but 50 mins for 12k) is because of the Spark Driver. When you run spark.sql("DESCRIBE HISTORY...") inside a ThreadPoolExecutor, every single one of those 22,000 queries has to be parsed, planned, and scheduled by the single Driver node. Spark is designed to run huge queries across many workers, not thousands of tiny queries from the same session. Your Driver's JVM and the Unity Catalog API are simply choking on the concurrency, while your worker nodes are likely sitting mostly idle.
Throwing more workers at this won't help, because the bottleneck is the Driver.
Two best Databricks-native ways to solve this and hit your 20-minute goal:
1. Since external Delta tables are just files in cloud storage, you can bypass the Spark query planner entirely and have your workers read the JSON log files simultaneously. You can do this by installing the open-source Python deltalake library and wrapping it in a PySpark UDF.
# %pip install deltalake
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from deltalake import DeltaTable
@udf(returnType=StringType())
def get_latest_history_udf(storage_path):
try:
# This reads directly from storage without asking the Spark Driver
dt = DeltaTable(storage_path)
latest_commit = dt.history(1)[0]
return str(latest_commit) # Or parse the specific JSON fields you need
except Exception as e:
return str(e)
# 1. Create a DataFrame containing all 22,000 storage paths for your tables
# 2. Apply the UDF
# 3. Write the results
df_paths.withColumn("latest_history", get_latest_history_udf("storage_path")) \
.write.format("delta").saveAsTable("my_consolidated_history_table")
This approach scales perfectly linearly. If you add more workers, it goes faster, because the Driver just sees one big job instead of 22,000 tiny ones.
2. If you prefer to stick to your current ThreadPoolExecutor script, you need to relieve the pressure on the single Driver.
Instead of running 1 script for 22,000 tables, update your script to accept parameters (e.g., start_index, end_index). Then, create a Databricks Workflow (Job) with 5 parallel tasks. Have Task A process tables 1-4000, Task B do 4001-8000, etc.
If you do this, change your Driver node type to Compute-Optimized (e.g., F-series on Azure or C-series on AWS). You need CPU power on the Driver to handle the threads, not RAM!
I highly recommend trying the UDF approach—it was practically built for large-scale metadata extraction like this.