12-29-2024 02:20 AM - edited 12-29-2024 02:21 AM
I have a complex join that I'm trying to optimize df1 has cols id,main_key,col1,col1_isnull,col2,col2_isnull...col30 df2 has cols id,main_key,col1,col2..col_30
I'm trying to run this sql query on Pyspark
select df1.id, df2.id from df1 join df2 on df1.main_key = df2.main_key AND (df1.col1_is_null OR (df1.col1 = df2.col1)) AND (df1.col2_is_null OR (df1.col2 = df2.col2)) ...
This query takes a very lot of time with just a few long running straggler tasks both dataframes are huge, and the join key is skewed
Things I've tried:
spark.conf.set("spark.sql.adaptive.enabled", "true") spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
Salting the smaller df, exploding the other
broadcasting the smaller df (sometimes the AQE overrides it with a SortMergeJoin(skew=true))
Filtering just the top 2 most common main_key value first, then doing all the above
Splitting the query to joining on main_key and then filtering using a 2nd query
The tasks execution still is very skewed What more can I do to optimize this further?
12-29-2024 02:55 PM
To optimize your complex join in PySpark, you can try the following additional strategies:
Skew Join Optimization Using Skew Hints:
SELECT /*+ SKEW('df1', 'main_key') */ df1.id, df2.id
FROM df1
JOIN df2 ON df1.main_key = df2.main_key
AND (df1.col1_is_null OR (df1.col1 = df2.col1))
AND (df1.col2_is_null OR (df1.col2 = df2.col2))
...
Adaptive Query Execution (AQE):
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionFactor", "5")
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", "256MB")
Cost-Based Optimizer (CBO):
ANALYZE TABLE df1 COMPUTE STATISTICS FOR ALL COLUMNS;
ANALYZE TABLE df2 COMPUTE STATISTICS FOR ALL COLUMNS;
Dynamic Partition Pruning (DPP):
spark.conf.set("spark.sql.optimizer.dynamicPartitionPruning.enabled", "true")
12-29-2024 02:56 PM
Continuation of my comments:
Shuffle Hash Join:
spark.conf.set("spark.sql.join.preferSortMergeJoin", "false")
Data Skew Remediation:
skewed_keys = [key1, key2] # Replace with actual skewed keys
df1_skewed = df1.filter(df1.main_key.isin(skewed_keys))
df1_non_skewed = df1.filter(~df1.main_key.isin(skewed_keys))
df2_skewed = df2.filter(df2.main_key.isin(skewed_keys))
df2_non_skewed = df2.filter(~df2.main_key.isin(skewed_keys))
result_skewed = df1_skewed.join(df2_skewed, "main_key")
result_non_skewed = df1_non_skewed.join(df2_non_skewed, "main_key")
result = result_skewed.union(result_non_skewed)
Salting:
from pyspark.sql.functions import col, expr
num_salts = 10
df1_salted = df1.withColumn("salt", expr(f"floor(rand() * {num_salts})"))
df2_salted = df2.withColumn("salt", expr(f"floor(rand() * {num_salts})"))
result = df1_salted.join(df2_salted, (df1_salted.main_key == df2_salted.main_key) & (df1_salted.salt == df2_salted.salt))
Please refer to:
https://docs.databricks.com/ja/archive/legacy/skew-join.html
https://www.databricks.com/discover/pages/optimize-data-workloads-guide
12-31-2024 02:47 AM
@Omri thanks for your question!
To help optimize your complex join further, we need clarification on a few details:
Data Characteristics:
df1
and df2
(in rows and/or size).main_key
in both dataframes—are the top N keys dominating the distribution, and how many keys are significantly skewed?Cluster Configuration:
Data Source:
df1
and df2
? (e.g., Delta tables, Parquet, or external sources like Redshift/S3?)Query Execution Insights:
Previous Attempts:
If you could additionally capture some (traceback) python stacktraces, we can get visibility on the operation that is slow running, and potentially correlate with the Spark UI metrics for the slow SQL ID and Stage.
Passionate about hosting events and connecting people? Help us grow a vibrant local community—sign up today to get started!
Sign Up Now