Continuation of my comments:
Shuffle Hash Join:
- Prefer shuffle hash join over sort-merge join if applicable. This can be more efficient in certain scenarios:
spark.conf.set("spark.sql.join.preferSortMergeJoin", "false")
Data Skew Remediation:
- Identify and filter out skewed values if possible. For example, if certain main_key values are causing skew, handle them separately:
skewed_keys = [key1, key2] # Replace with actual skewed keys
df1_skewed = df1.filter(df1.main_key.isin(skewed_keys))
df1_non_skewed = df1.filter(~df1.main_key.isin(skewed_keys))
df2_skewed = df2.filter(df2.main_key.isin(skewed_keys))
df2_non_skewed = df2.filter(~df2.main_key.isin(skewed_keys))
result_skewed = df1_skewed.join(df2_skewed, "main_key")
result_non_skewed = df1_non_skewed.join(df2_non_skewed, "main_key")
result = result_skewed.union(result_non_skewed)
Salting:
- If salting the smaller dataframe and exploding the other did not work effectively, ensure that the salting is done correctly and consider increasing the number of salts:
from pyspark.sql.functions import col, expr
num_salts = 10
df1_salted = df1.withColumn("salt", expr(f"floor(rand() * {num_salts})"))
df2_salted = df2.withColumn("salt", expr(f"floor(rand() * {num_salts})"))
result = df1_salted.join(df2_salted, (df1_salted.main_key == df2_salted.main_key) & (df1_salted.salt == df2_salted.salt))
Please refer to:
https://docs.databricks.com/ja/archive/legacy/skew-join.html
https://www.databricks.com/discover/pages/optimize-data-workloads-guide