Alberto_Umana
Databricks Employee
Databricks Employee

Continuation of my comments: 

Shuffle Hash Join:

  • Prefer shuffle hash join over sort-merge join if applicable. This can be more efficient in certain scenarios:

spark.conf.set("spark.sql.join.preferSortMergeJoin", "false")

 

Data Skew Remediation:

  • Identify and filter out skewed values if possible. For example, if certain main_key values are causing skew, handle them separately:

skewed_keys = [key1, key2]  # Replace with actual skewed keys

df1_skewed = df1.filter(df1.main_key.isin(skewed_keys))

df1_non_skewed = df1.filter(~df1.main_key.isin(skewed_keys))

df2_skewed = df2.filter(df2.main_key.isin(skewed_keys))

df2_non_skewed = df2.filter(~df2.main_key.isin(skewed_keys))

 

result_skewed = df1_skewed.join(df2_skewed, "main_key")

result_non_skewed = df1_non_skewed.join(df2_non_skewed, "main_key")

result = result_skewed.union(result_non_skewed)

 

Salting:

  • If salting the smaller dataframe and exploding the other did not work effectively, ensure that the salting is done correctly and consider increasing the number of salts:

 

from pyspark.sql.functions import col, expr

 

num_salts = 10

df1_salted = df1.withColumn("salt", expr(f"floor(rand() * {num_salts})"))

df2_salted = df2.withColumn("salt", expr(f"floor(rand() * {num_salts})"))

 

result = df1_salted.join(df2_salted, (df1_salted.main_key == df2_salted.main_key) & (df1_salted.salt == df2_salted.salt))

Please refer to:

https://docs.databricks.com/ja/archive/legacy/skew-join.html

https://www.databricks.com/discover/pages/optimize-data-workloads-guide