cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Slow PySpark operations after long DAG that contains many joins and transformations

Jonathan_
New Contributor II

We are using PySpark and notice that when we are doing many transformations/aggregations/joins of the data then at some point the execution time of simple task (count, display, union of 2 tables, ...) become very slow even if we have a small data (ex : 4000 rows). It seems or might be related to the DAG.

At first we have tried persist() of intermediate dataframes, but it doesn't do a big difference. Then we tried to write to a catalog intermediate dataframes and doing so it does a big difference, however it doesn't seems logic to be forced to create permanent files for these. We also tried checkpoint() by setting the checpoint directory to ephemeral storage of the running cluster (spark.sparkContext.setCheckpointDir("file:/tmp/checkpoints")), it seems to do the trick however we don't know pros and cons of it and if there is better way to manage all that. 

It's kind of weird that passing the data into memory using pandas gives better performance then using Spark. We would like to have input of the community on what is our options given what we have tried so far. 

3 REPLIES 3

Khaja_Zaffer
Contributor III

Hello @Jonathan_ 

Good day!

To answer first, I think as you mentioned pandas are faster than spark because, spark works better with big data. 

Pandas runs in-memory on a single machine without this distributed overhead, making it faster for small data.
 
As you mentioned, you have joins, aggregations - 
Make sure you have AQE turned on, use broadcast join, or check if you can convern the string like date or any string values into int so that joins like sort aggregations are avoided and hash aggregations are used. 
see if you can use which join is better out of shuffle or sort merge join. Here is a article for you:
 
the default block size is usually 128 MB, see if you use repartition or coalesce to decrease the number of partition so that many small blocks can be joined so that better optimizations can be observed. 
 
Try to minimize shuffling operations and use filter in early stages of the data. 
 
Turn on  spark.eventLog.enabled=true, collect the logs and raise a ticket with azure databricks as these should be checked internally what is the issue, what actually went wrong and nature of the issue. 
 
if you working with delta files, make be optimize can bring all the small files into a big one. 
 
 
I am also open to other solutions from contributors. 

BS_THE_ANALYST
Esteemed Contributor II

@Jonathan_ I think @Khaja_Zaffer has raised some great points. 

With Spark, broadcast joins will ensure the smaller table is in memory across all of the worker nodes. This should certainly help with speed. Shuffling is certainly always going to take some time and things like Joins & Aggregations will prompt a shuffle.

@Khaja_Zaffer whilst Pandas uses a single node (great point about it not having a distributed overhead), Pandas API on Spark can be configured to scale: https://spark.apache.org/pandas-on-spark/ which is really cool. Perhaps there's some benchmarks available for Spark vs Pandas vs Pandas API on Spark 😏? Would be interesting to see 🙂

@Jonathan_ when you say "become very slow", how slow is slow? Is the development process slow? The job execution? The compute startup? Which compute are you using? How many people are accessing the compute when you're experiencing the "slow"? Is "slow" in reference to the speed of execution when compared to pandas?

All the best,
BS

Prajapathy_NKR
Visitor

@Jonathan_  

1. I noticed you had tried to persist your result, just a remainder that dataframe is stored only if an action is performed. So if you would like to store the result in memory, try to add a action like count immediately after using persist. Also, for faster performance try to store the dataframe in deserialized format in memory, as it is going to be faster than serialized format in memory / disk. 

2. Take a look at number of partitions being created, it can be less or more. try to create 2-3 times the number of total cores present across all executors. 

3. As @Khaja_Zaffer mentioned, try to enable AQE, it helps in auto optimization of partitions, shuffle partitions and join strategy. 

4. If you are using UDF or have a piece of query which is running outside of spark, it is going to be slow.

To answer your question for checkpoint, it writes to your location that you have mentioned, it is not in the disk/memory of executor. Considering about data locality, it is far away from executor, hence it will be slower. Pros is if executors fails, it is faster to recover because the information isn't stored in memory or disk of the executor. This is right if the deploy mode is client or cluster. But if it is local then executor disk and your system disk are the same so performance will be same.

Hope it helps. 

Join Us as a Local Community Builder!

Passionate about hosting events and connecting people? Help us grow a vibrant local community—sign up today to get started!

Sign Up Now