cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

Optimizing a complex pyspark join

Omri
New Contributor

I have a complex join that I'm trying to optimize df1 has cols id,main_key,col1,col1_isnull,col2,col2_isnull...col30 df2 has cols id,main_key,col1,col2..col_30

I'm trying to run this sql query on Pyspark

select df1.id, df2.id from df1 join df2 on df1.main_key = df2.main_key AND (df1.col1_is_null OR (df1.col1 = df2.col1)) AND (df1.col2_is_null OR (df1.col2 = df2.col2)) ...

This query takes a very lot of time with just a few long running straggler tasks both dataframes are huge, and the join key is skewed

Things I've tried:

  1. spark.conf.set("spark.sql.adaptive.enabled", "true") spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")

  2. Salting the smaller df, exploding the other

  3. broadcasting the smaller df (sometimes the AQE overrides it with a SortMergeJoin(skew=true))

  4. Filtering just the top 2 most common main_key value first, then doing all the above

  5. Splitting the query to joining on main_key and then filtering using a 2nd query

The tasks execution still is very skewed What more can I do to optimize this further?

3 REPLIES 3

Alberto_Umana
Databricks Employee
Databricks Employee

To optimize your complex join in PySpark, you can try the following additional strategies:

 

Skew Join Optimization Using Skew Hints:

    • You can use skew hints to inform Spark about the skewed keys. This can help Spark to optimize the join by handling the skewed partitions more efficiently. For example:

 

SELECT /*+ SKEW('df1', 'main_key') */ df1.id, df2.id

FROM df1

JOIN df2 ON df1.main_key = df2.main_key

AND (df1.col1_is_null OR (df1.col1 = df2.col1))

AND (df1.col2_is_null OR (df1.col2 = df2.col2))

...

 

 

Adaptive Query Execution (AQE):

  • Ensure that AQE is enabled, which you have already done. AQE can dynamically optimize the query plan at runtime, including handling skewed joins.
  • You can further fine-tune AQE settings, such as adjusting the threshold for skew join detection:

spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionFactor", "5")

spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", "256MB")

 

Cost-Based Optimizer (CBO):

  • Enable and utilize the Cost-Based Optimizer to improve join strategies. Ensure that table statistics are up-to-date:

ANALYZE TABLE df1 COMPUTE STATISTICS FOR ALL COLUMNS;

ANALYZE TABLE df2 COMPUTE STATISTICS FOR ALL COLUMNS;

 

Dynamic Partition Pruning (DPP):

  • Ensure that Dynamic Partition Pruning is enabled, which can help in optimizing joins by pruning unnecessary partitions:

 

spark.conf.set("spark.sql.optimizer.dynamicPartitionPruning.enabled", "true")

Alberto_Umana
Databricks Employee
Databricks Employee

Continuation of my comments: 

Shuffle Hash Join:

  • Prefer shuffle hash join over sort-merge join if applicable. This can be more efficient in certain scenarios:

spark.conf.set("spark.sql.join.preferSortMergeJoin", "false")

 

Data Skew Remediation:

  • Identify and filter out skewed values if possible. For example, if certain main_key values are causing skew, handle them separately:

skewed_keys = [key1, key2]  # Replace with actual skewed keys

df1_skewed = df1.filter(df1.main_key.isin(skewed_keys))

df1_non_skewed = df1.filter(~df1.main_key.isin(skewed_keys))

df2_skewed = df2.filter(df2.main_key.isin(skewed_keys))

df2_non_skewed = df2.filter(~df2.main_key.isin(skewed_keys))

 

result_skewed = df1_skewed.join(df2_skewed, "main_key")

result_non_skewed = df1_non_skewed.join(df2_non_skewed, "main_key")

result = result_skewed.union(result_non_skewed)

 

Salting:

  • If salting the smaller dataframe and exploding the other did not work effectively, ensure that the salting is done correctly and consider increasing the number of salts:

 

from pyspark.sql.functions import col, expr

 

num_salts = 10

df1_salted = df1.withColumn("salt", expr(f"floor(rand() * {num_salts})"))

df2_salted = df2.withColumn("salt", expr(f"floor(rand() * {num_salts})"))

 

result = df1_salted.join(df2_salted, (df1_salted.main_key == df2_salted.main_key) & (df1_salted.salt == df2_salted.salt))

Please refer to:

https://docs.databricks.com/ja/archive/legacy/skew-join.html

https://www.databricks.com/discover/pages/optimize-data-workloads-guide

VZLA
Databricks Employee
Databricks Employee

@Omri thanks for your question!

To help optimize your complex join further, we need clarification on a few details:

 

  • Data Characteristics:

    • Approximate size of df1 and df2 (in rows and/or size).
    • Distribution of main_key in both dataframesโ€”are the top N keys dominating the distribution, and how many keys are significantly skewed?
  • Cluster Configuration:

    • Details of your cluster (e.g., number of nodes, cores, memory per executor).
    • Are you using spot instances or dedicated nodes?
  • Data Source:

    • What is the data source for df1 and df2? (e.g., Delta tables, Parquet, or external sources like Redshift/S3?)
    • Are the datasets partitioned, and if so, how are they partitioned?
  • Query Execution Insights:

    • Have you examined the Spark UI or execution plans? What stages or tasks have the most skew?
    • Are you seeing significant shuffle writes or long-running tasks for specific partitions?
  • Previous Attempts:

    • When salting or exploding, what specific methods did you use? Did you adjust the degree of salting or the granularity of exploding?
    • Did you test alternative methods for handling skewed keys, like key bucketing or additional partitioning?
    • Were there cases where broadcasting or exploding significantly reduced execution time, or did skew persist across all approaches?

If you could additionally capture some (traceback) python stacktraces, we can get visibility on the operation that is slow running, and potentially correlate with the Spark UI metrics for the slow SQL ID and Stage.

 

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโ€™t want to miss the chance to attend and share knowledge.

If there isnโ€™t a group near you, start one and help create a community that brings people together.

Request a New Group