cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

My Spark SQL join is very slow - what can I do to speed it up?

vida
Contributor II
Contributor II

It's taking 10-12 minutes - can I make it faster?

8 REPLIES 8

vida
Contributor II
Contributor II

If one of your tables is very small, you can do a Broadcast Hash Join to speed up your join. There's notebook on the Databricks Guide on that - search for "BroadcastHashJoin" to find that notebook.

Otherwise, a join operation in Spark SQL does cause a shuffle of your data to have the data transferred over the network, which can be slow. If you query the joined table multiple times, you may consider:

1) Saving your joined table as it's own table.

2) Creating a temporary table that represents the joined table, and then caching that table.

This will prevent needing to shuffle the data and doing the join multiple times.

richard1_558848
New Contributor II

I'm making join between Parquet DB stored on S3

but it's seems that anyway Spark try to read all the data as we not see better performance when changing the queries.

I need to continue to investigate this point because it's not yet clear.

I think I'm experiencing something similar.

Not using S3 yet. But reading Parquet tables into DataFrames, trying tactics like

persist
,
coalesce
,
repartition
after reading from Parquet. Using HiveContext, if that matters. But I get the impression that it's ignoring my attempts to repartition and cache and always recomputing my queries from scratch.

I'm definitely still new at this, so not sure yet how to figure out what's really going on.

MarcLimotte
New Contributor II

@Vida Ha​  I just did "Run All" on a clone of the "BroadcastHashJoin" notebook, and it appears to have errors.

On

%sql ANALYZE TABLE my_small_table COMPUTE STATISTICS noscan

which appears right after "Configure a BroadcastHashJoin for a small table.", we get org.apache.spark.sql.catalyst.analysis.NoSuchTableException. I'm guessing this is b/c at this point in the notebook (on it's first run), my_small_table has only been created with registerTempTable; not with saveAsTable (which I think is required for ANALYZE).

If I run the code blocks lower down to do the saveAsTable and then come back here and rerun the "ANALYZE" step, I get:

Error in SQL statement: com.databricks.backend.daemon.driver.DriverLocal$SQLExecutionException: java.lang.UnsupportedOperationException: Analyze only works for Hive tables, but my_small_table is a LogicalRelation

vida
Contributor II
Contributor II

Hi mlimotte,

This notebook ran for an older version of Spark - we will update it for Spark 1.4 in the next Databricks release. In general these tips still hold true:

1) If you want to figure out if a BroadcastHashJoin is happening or not - use %sql explain select

2) The analyze command is not needed for all table types now - such as when you create a table with ".saveAsTable"

-Vida

DeenarToraskar
New Contributor II

Hi Vida

>>The analyze command is not needed for all table types now - such as when you create a table with ".saveAsTable"

Does this mean statistics are available for any tables created with .saveAsTable (and hence they would be broadcast if their size is under the broadcast limit ?)

actually I got my answer from the BroadcastHashJoin workbook. (which is Yes)

vida
Contributor II
Contributor II

Analyze is not needed with parquet tables that use the databricks parquet package. That is the default now when you use .saveAsTable(), but if you use a different output format - it's possible that analyze may not work yet.

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.